module kafkad.protocol.metadata; import kafkad.protocol.common; import kafkad.exception; /* Please note that the types and the order of the fields of these structs must not be changed, * as they are used by the deserializer to parse the metadata response */ struct Broker { int id; string host; int port; } struct PartitionMetadata { ApiError errorCode; int id; int leader; int[] replicas; int[] isr; } struct TopicMetadata { short errorCode; string topic; PartitionMetadata[] partitions; PartitionMetadata findPartitionMetadata(int id) { foreach (ref p; partitions) { if (p.id == id) return p; } import std.conv; throw new MetadataException("Partition " ~ id.to!string ~ " for topic " ~ topic ~ " does not exist"); } } struct Metadata { Broker[] brokers; TopicMetadata[] topics; TopicMetadata findTopicMetadata(string topic) { foreach (ref t; topics) { if (t.topic == topic) return t; } throw new MetadataException("Topic " ~ topic ~ " does not exist"); } }