1 module kafkad.client; 2 3 import kafkad.connection; 4 import kafkad.protocol; 5 import kafkad.worker; 6 import kafkad.queue; 7 import core.time; 8 import std.container.dlist; 9 import std.exception; 10 import std.conv; 11 import vibe.core.core; 12 import vibe.core.log; 13 import vibe.core.net : NetworkAddress; 14 import vibe.core.sync : TaskMutex, TaskCondition; 15 public import kafkad.config; 16 public import kafkad.consumer; 17 public import kafkad.producer; 18 public import kafkad.exception; 19 20 struct BrokerAddress { 21 string host; 22 ushort port; 23 24 this(string host, ushort port) 25 { 26 this.host = host; 27 this.port = port; 28 } 29 30 this(string address) 31 { 32 import std.algorithm : splitter; 33 import std.array : array; 34 auto splitted = address.splitter(":").array; 35 enforce(splitted.length == 2, "BrokerAddress supplied is incomplete"); 36 this.host = splitted[0]; 37 this.port = splitted[1].to!ushort; 38 } 39 } 40 41 unittest 42 { 43 string hostname = "127.0.0.1"; 44 ushort port = 9292; 45 auto address = hostname ~ ":" ~ port.to!string; 46 auto b = BrokerAddress(address); 47 assert(b.host == hostname, "hostname in BrokerAddress constructor not working"); 48 assert(b.port == port, "port in BrokerAddress constructor not working"); 49 } 50 51 /// The client acts as a router between brokers, consumers and producers. Consumers and producers 52 /// connect to the client and it handles connections to the brokers for them. It transparently handles 53 /// connection failures, leader switches and translates topic/partitions to respective broker connections. 54 class Client { 55 enum __isWeakIsolatedType = true; // needed to pass this type between vibe.d's tasks 56 private { 57 Configuration m_config; 58 BrokerAddress[] m_bootstrapBrokers; 59 string m_clientId; 60 BrokerConnection[NetworkAddress] m_conns; 61 NetworkAddress[int] m_hostCache; // broker id to NetworkAddress cache 62 Metadata m_metadata; 63 bool m_gotMetadata; 64 65 DList!IWorker m_workers, m_brokerlessWorkers; 66 TaskMutex m_mutex; 67 TaskCondition m_brokerlessWorkersEmpty; 68 Task m_connectionManager; 69 } 70 71 @property auto clientId() { return m_clientId; } 72 @property auto clientId(string v) { return m_clientId = v; } 73 74 @property ref const(Configuration) config() { return m_config; } 75 76 import std.string, std.process; 77 this(BrokerAddress[] bootstrapBrokers, string clientId = format("kafka-d-%d",thisProcessID), 78 Configuration config = Configuration()) 79 { 80 m_config = config; 81 enforce(bootstrapBrokers.length); 82 m_bootstrapBrokers = bootstrapBrokers; 83 m_clientId = clientId; 84 m_mutex = new TaskMutex(); 85 m_brokerlessWorkersEmpty = new TaskCondition(m_mutex); 86 m_connectionManager = runTask(&connectionManagerMain); 87 m_gotMetadata = false; 88 } 89 90 /// Refreshes the metadata and stores it in the cache. Call it before using the getTopics/getPartitions to get the most recent metadata. 91 /// Metadata is also refreshed internally on the first use and on each consumer/producer failure. 92 void refreshMetadata() { 93 synchronized (m_mutex) { 94 Exception lastException = null; 95 auto remainingRetries = m_config.metadataRefreshRetryCount; 96 while (!m_config.metadataRefreshRetryCount || remainingRetries--) { 97 foreach (brokerAddr; m_bootstrapBrokers) { 98 try { 99 auto conn = getConn(brokerAddr); 100 auto host = conn.addr; 101 m_metadata = conn.getMetadata([]); 102 enforce(m_metadata.brokers.length, new ConnectionException("Empty metadata, this may indicate there are no defined topics in the cluster")); 103 m_hostCache = null; // clear the cache 104 105 int bootstrapBrokerId = -1; 106 // look up this host in the metadata to obtain its node id 107 // also, fill the nodeid cache 108 foreach (ref b; m_metadata.brokers) { 109 enforce(b.port >= 0 && b.port <= ushort.max); 110 auto bhost = resolveBrokerAddr(BrokerAddress(b.host, cast(ushort)b.port)); 111 if (bhost == host) 112 bootstrapBrokerId = b.id; 113 m_hostCache[b.id] = bhost; 114 } 115 116 if(bootstrapBrokerId < 0) 117 { 118 import std.range : takeOne, front; 119 bootstrapBrokerId = m_metadata.brokers.takeOne.front.id; 120 logWarn("Your Bootstrap Broker is not in the advertised brokers! using broker %s from available Brokers: %s", bootstrapBrokerId, m_metadata.brokers); 121 } 122 123 conn.id = bootstrapBrokerId; 124 125 debug { 126 logDebug("Broker list:"); 127 foreach (ref b; m_metadata.brokers) { 128 logDebug("\tBroker ID: %d, host: %s, port: %d", b.id, b.host, b.port); 129 } 130 logDebug("Topic list:"); 131 foreach (ref t; m_metadata.topics) { 132 logDebug("\tTopic: %s, partitions:", t.topic); 133 foreach (ref p; t.partitions) { 134 logDebug("\t\tPartition: %d, Leader ID: %d, Replicas: %s, In sync replicas: %s", 135 p.id, p.leader, p.replicas, p.isr); 136 } 137 } 138 } 139 140 m_gotMetadata = true; 141 return; 142 } catch (ConnectionException ex) { 143 lastException = ex; 144 continue; 145 } 146 } 147 sleep(m_config.metadataRefreshRetryTimeout.msecs); 148 } 149 // fatal error, we couldn't get the new metadata from the bootstrap brokers 150 assert(lastException); 151 throw lastException; 152 } 153 } 154 155 private NetworkAddress resolveBrokerAddr(BrokerAddress brokerAddr) { 156 import vibe.core.net : resolveHost; 157 auto netAddr = resolveHost(brokerAddr.host).rethrow!ConnectionException("Could not resolve host " ~ brokerAddr.host); 158 netAddr.port = brokerAddr.port; 159 return netAddr; 160 } 161 162 private BrokerConnection getConn(BrokerAddress brokerAddr) { 163 auto netAddr = resolveBrokerAddr(brokerAddr); 164 return getConn(netAddr); 165 } 166 167 private BrokerConnection getConn(NetworkAddress netAddr) { 168 import vibe.core.net : connectTCP; 169 auto pconn = netAddr in m_conns; 170 if (!pconn) { 171 auto tcpConn = connectTCP(netAddr).rethrow!ConnectionException("TCP connect to address " ~ netAddr.toString() ~ " failed"); 172 auto conn = new BrokerConnection(this, tcpConn); 173 m_conns[netAddr] = conn; 174 pconn = &conn; 175 } 176 return *pconn; 177 } 178 179 private auto getConn(int id) { 180 assert(id in m_hostCache); 181 auto netAddr = m_hostCache[id]; 182 auto conn = getConn(netAddr); 183 conn.id = id; 184 return conn; 185 } 186 187 string[] getTopics() { 188 if (!m_gotMetadata) 189 refreshMetadata(); 190 string[] topics; 191 foreach (ref t; m_metadata.topics) { 192 topics ~= t.topic; 193 } 194 return topics; 195 } 196 197 int[] getPartitions(string topic) { 198 if (!m_gotMetadata) 199 refreshMetadata(); 200 int[] partitions; 201 auto tm = m_metadata.findTopicMetadata(topic); 202 foreach (ref p; tm.partitions) { 203 partitions ~= p.id; 204 } 205 return partitions; 206 } 207 208 // This task tries to reconnect consumers and producers (workers) to the brokers in the background. 209 // When the connection fails or the leader is changed for a partition, the worker needs to switch 210 // the connection to the other broker. Worker is added to the brokerlessWorkers list each time 211 // the connection becomes invalid (it's also added upon the worker class instantiation). 212 // In such situation, consumer queue is still valid and may be processed by the user's task. It may happen 213 // that the connection is switched before the queue is exhausted, and the new connection fills the queue up 214 // again in a short time, so that the consumer doesn't need to wait for the messages at all. For the consumer, 215 // it would be completely transparent. 216 private void connectionManagerMain() { 217 mainLoop: 218 for (;;) { 219 IWorker worker; 220 synchronized (m_mutex) { 221 while (m_brokerlessWorkers.empty) 222 m_brokerlessWorkersEmpty.wait(); 223 worker = m_brokerlessWorkers.front; 224 m_brokerlessWorkers.removeFront(); 225 } 226 227 PartitionMetadata pm; 228 229 // get the new partition metadata and wait for leader election if needed 230 auto remainingRetries = m_config.leaderElectionRetryCount; 231 while (!m_config.leaderElectionRetryCount || remainingRetries--) { 232 try { 233 refreshMetadata(); 234 pm = m_metadata.findTopicMetadata(worker.topic). 235 findPartitionMetadata(worker.partition); 236 } catch (MetadataException ex) { 237 // no topic and/or partition on this broker 238 worker.throwException(ex); 239 continue mainLoop; 240 } catch (ConnectionException ex) { 241 // couldn't connect to the broker 242 worker.throwException(ex); 243 continue mainLoop; 244 } 245 if (pm.leader >= 0) 246 break; 247 sleep(m_config.leaderElectionRetryTimeout.msecs); 248 } 249 250 if (pm.leader < 0) { 251 // all retries failed, we still dont have a leader for the consumer's topic/partition 252 worker.throwException(new Exception("Leader election timed out")); 253 continue; 254 } 255 256 try { 257 BrokerConnection conn = getConn(pm.leader); 258 auto consumer = cast(Consumer)worker; 259 if (consumer) { 260 if (consumer.queue.offset < 0) { 261 // get earliest or latest offset 262 auto offset = conn.getStartingOffset(consumer.topic, consumer.partition, consumer.queue.offset); 263 consumer.queue.offset = offset; 264 } 265 conn.consumerRequestBundler.addQueue(consumer.queue, BufferType.Free); 266 } else { 267 auto producer = cast(Producer)worker; 268 assert(producer); 269 conn.producerRequestBundler.addQueue(producer.queue, BufferType.Filled); 270 } 271 } catch (ConnectionException) { 272 // couldn't connect to the leader, readd this worker to brokerless workers to try again 273 synchronized (m_mutex) { 274 m_brokerlessWorkers.insertBack(worker); 275 } 276 } 277 } 278 } 279 280 private void checkWorkerExistence(IWorker worker, string name) { 281 foreach (w; m_workers) { 282 if (w.workerType == worker.workerType && w.topic == worker.topic && w.partition == worker.partition) 283 throw new Exception(format("This client already has a %s for topic %s and partition %d", 284 name, w.topic, w.partition)); 285 } 286 } 287 288 package: // functions below are used by the consumer and producer classes 289 290 void addNewConsumer(Consumer consumer) { 291 synchronized (m_mutex) { 292 checkWorkerExistence(consumer, "consumer"); 293 m_workers.insertBack(consumer); 294 m_brokerlessWorkers.insertBack(consumer); 295 m_brokerlessWorkersEmpty.notify(); 296 } 297 } 298 299 void removeConsumer(Consumer consumer) { 300 import std.algorithm; 301 synchronized (m_mutex) { 302 m_workers.remove(find(m_workers[], consumer)); 303 } 304 } 305 306 void addNewProducer(Producer producer) { 307 synchronized (m_mutex) { 308 checkWorkerExistence(producer, "producer"); 309 m_workers.insertBack(producer); 310 m_brokerlessWorkers.insertBack(producer); 311 m_brokerlessWorkersEmpty.notify(); 312 } 313 } 314 315 void addToBrokerless(Queue queue, bool notify = false) { 316 m_brokerlessWorkers.insertBack(queue.worker); 317 if (notify) 318 m_brokerlessWorkersEmpty.notify(); 319 } 320 321 void connectionLost(BrokerConnection conn) { 322 synchronized (m_mutex) { 323 synchronized(conn.consumerRequestBundler.mutex) { 324 synchronized(conn.producerRequestBundler.mutex) { 325 foreach (pair; m_conns.byKeyValue) { 326 if (pair.value == conn) { 327 m_conns.remove(pair.key); 328 break; 329 } 330 } 331 foreach (q; &conn.consumerRequestBundler.queues) { 332 addToBrokerless(q); 333 synchronized (q.mutex) { 334 q.requestBundler = null; 335 q.requestPending = false; 336 } 337 } 338 foreach (q; &conn.producerRequestBundler.queues) { 339 addToBrokerless(q); 340 synchronized (q.mutex) { 341 q.requestBundler = null; 342 q.requestPending = false; 343 } 344 } 345 m_brokerlessWorkersEmpty.notify(); 346 } 347 } 348 } 349 } 350 }