1 module kafkad.client;
2 
3 import kafkad.connection;
4 import kafkad.protocol;
5 import kafkad.worker;
6 import kafkad.queue;
7 import core.time;
8 import std.container.dlist;
9 import std.exception;
10 import std.conv;
11 import vibe.core.core;
12 import vibe.core.log;
13 import vibe.core.net : NetworkAddress;
14 import vibe.core.sync : TaskMutex, TaskCondition;
15 public import kafkad.config;
16 public import kafkad.consumer;
17 public import kafkad.producer;
18 public import kafkad.exception;
19 
20 struct BrokerAddress {
21     string host;
22     ushort port;
23 
24     this(string host, ushort port)
25     {
26         this.host = host;
27         this.port = port;
28     }
29     
30     this(string address)
31     {
32         import std.algorithm : splitter;
33         import std.array : array;
34         auto splitted = address.splitter(":").array;
35         enforce(splitted.length == 2, "BrokerAddress supplied is incomplete");
36         this.host = splitted[0];
37         this.port = splitted[1].to!ushort;
38     }
39 }
40 
41 unittest 
42 {   
43     string hostname = "127.0.0.1";
44     ushort port = 9292;
45     auto address = hostname ~ ":" ~ port.to!string;
46     auto b = BrokerAddress(address);
47     assert(b.host == hostname, "hostname in BrokerAddress constructor not working");
48     assert(b.port == port, "port in BrokerAddress constructor not working");
49 }
50 
51 /// The client acts as a router between brokers, consumers and producers. Consumers and producers
52 /// connect to the client and it handles connections to the brokers for them. It transparently handles
53 /// connection failures, leader switches and translates topic/partitions to respective broker connections.
54 class Client {
55     enum __isWeakIsolatedType = true; // needed to pass this type between vibe.d's tasks
56     private {
57         Configuration m_config;
58         BrokerAddress[] m_bootstrapBrokers;
59         string m_clientId;
60         BrokerConnection[NetworkAddress] m_conns;
61         NetworkAddress[int] m_hostCache; // broker id to NetworkAddress cache
62         Metadata m_metadata;
63         bool m_gotMetadata;
64 
65         DList!IWorker m_workers, m_brokerlessWorkers;
66         TaskMutex m_mutex;
67         TaskCondition m_brokerlessWorkersEmpty;
68         Task m_connectionManager;
69     }
70 
71     @property auto clientId() { return m_clientId; }
72     @property auto clientId(string v) { return m_clientId = v; }
73 
74     @property ref const(Configuration) config() { return m_config; }
75 
76     import std.string, std.process;
77     this(BrokerAddress[] bootstrapBrokers, string clientId = format("kafka-d-%d",thisProcessID),
78         Configuration config = Configuration())
79     {
80         m_config = config;
81         enforce(bootstrapBrokers.length);
82         m_bootstrapBrokers = bootstrapBrokers;
83         m_clientId = clientId;
84         m_mutex = new TaskMutex();
85         m_brokerlessWorkersEmpty = new TaskCondition(m_mutex);
86         m_connectionManager = runTask(&connectionManagerMain);
87         m_gotMetadata = false;
88     }
89 
90     /// Refreshes the metadata and stores it in the cache. Call it before using the getTopics/getPartitions to get the most recent metadata.
91     /// Metadata is also refreshed internally on the first use and on each consumer/producer failure.
92     void refreshMetadata() {
93         synchronized (m_mutex) {
94             Exception lastException = null;
95             auto remainingRetries = m_config.metadataRefreshRetryCount;
96             while (!m_config.metadataRefreshRetryCount || remainingRetries--) {
97                 foreach (brokerAddr; m_bootstrapBrokers) {
98                     try {
99                         auto conn = getConn(brokerAddr);
100                         auto host = conn.addr;
101                         m_metadata = conn.getMetadata([]);
102                         enforce(m_metadata.brokers.length, new ConnectionException("Empty metadata, this may indicate there are no defined topics in the cluster"));
103                         m_hostCache = null; // clear the cache
104 
105                         int bootstrapBrokerId = -1;
106                         // look up this host in the metadata to obtain its node id
107                         // also, fill the nodeid cache
108                         foreach (ref b; m_metadata.brokers) {
109                             enforce(b.port >= 0 && b.port <= ushort.max);
110                             auto bhost = resolveBrokerAddr(BrokerAddress(b.host, cast(ushort)b.port));
111                             if (bhost == host)
112                                 bootstrapBrokerId = b.id;
113                             m_hostCache[b.id] = bhost;
114                         }
115 
116                         if(bootstrapBrokerId < 0)
117                         {
118                             import std.range : takeOne, front;
119                             bootstrapBrokerId = m_metadata.brokers.takeOne.front.id;
120                             logWarn("Your Bootstrap Broker is not in the advertised brokers! using broker %s from available Brokers: %s", bootstrapBrokerId, m_metadata.brokers);
121                         }
122 
123                         conn.id = bootstrapBrokerId;
124 
125                         debug {
126                             logDebug("Broker list:");
127                             foreach (ref b; m_metadata.brokers) {
128                                 logDebug("\tBroker ID: %d, host: %s, port: %d", b.id, b.host, b.port);
129                             }
130                             logDebug("Topic list:");
131                             foreach (ref t; m_metadata.topics) {
132                                 logDebug("\tTopic: %s, partitions:", t.topic);
133                                 foreach (ref p; t.partitions) {
134                                     logDebug("\t\tPartition: %d, Leader ID: %d, Replicas: %s, In sync replicas: %s",
135                                         p.id, p.leader, p.replicas, p.isr);
136                                 }
137                             }
138                         }
139 
140                         m_gotMetadata = true;
141                         return;
142                     } catch (ConnectionException ex) {
143                         lastException = ex;
144                         continue;
145                     }
146                 }
147                 sleep(m_config.metadataRefreshRetryTimeout.msecs);
148             }
149             // fatal error, we couldn't get the new metadata from the bootstrap brokers
150             assert(lastException);
151             throw lastException;
152         }
153     }
154 
155     private NetworkAddress resolveBrokerAddr(BrokerAddress brokerAddr) {
156 		import vibe.core.net : resolveHost;
157         auto netAddr = resolveHost(brokerAddr.host).rethrow!ConnectionException("Could not resolve host " ~ brokerAddr.host);
158         netAddr.port = brokerAddr.port; 
159         return netAddr;
160     }
161 
162     private BrokerConnection getConn(BrokerAddress brokerAddr) {
163         auto netAddr = resolveBrokerAddr(brokerAddr);
164         return getConn(netAddr);
165     }
166 
167     private BrokerConnection getConn(NetworkAddress netAddr) {
168 		import vibe.core.net : connectTCP;
169         auto pconn = netAddr in m_conns;
170         if (!pconn) {
171             auto tcpConn = connectTCP(netAddr).rethrow!ConnectionException("TCP connect to address " ~ netAddr.toString() ~ " failed");
172             auto conn = new BrokerConnection(this, tcpConn);
173             m_conns[netAddr] = conn;
174             pconn = &conn;
175         }
176         return *pconn;
177     }
178 
179     private auto getConn(int id) {
180         assert(id in m_hostCache);
181         auto netAddr = m_hostCache[id];
182         auto conn = getConn(netAddr);
183         conn.id = id;
184         return conn;
185     }
186 
187     string[] getTopics() {
188         if (!m_gotMetadata)
189             refreshMetadata();
190         string[] topics;
191         foreach (ref t; m_metadata.topics) {
192             topics ~= t.topic;
193         }
194         return topics;
195     }
196 
197     int[] getPartitions(string topic) {
198         if (!m_gotMetadata)
199             refreshMetadata();
200         int[] partitions;
201         auto tm = m_metadata.findTopicMetadata(topic);
202         foreach (ref p; tm.partitions) {
203             partitions ~= p.id;
204         }
205         return partitions;
206     }
207 
208     // This task tries to reconnect consumers and producers (workers) to the brokers in the background.
209     // When the connection fails or the leader is changed for a partition, the worker needs to switch
210     // the connection to the other broker. Worker is added to the brokerlessWorkers list each time
211     // the connection becomes invalid (it's also added upon the worker class instantiation).
212     // In such situation, consumer queue is still valid and may be processed by the user's task. It may happen
213     // that the connection is switched before the queue is exhausted, and the new connection fills the queue up
214     // again in a short time, so that the consumer doesn't need to wait for the messages at all. For the consumer,
215     // it would be completely transparent.
216     private void connectionManagerMain() {
217     mainLoop:
218         for (;;) {
219             IWorker worker;
220             synchronized (m_mutex) {
221                 while (m_brokerlessWorkers.empty)
222                     m_brokerlessWorkersEmpty.wait();
223                 worker = m_brokerlessWorkers.front;
224                 m_brokerlessWorkers.removeFront();
225             }
226 
227             PartitionMetadata pm;
228 
229             // get the new partition metadata and wait for leader election if needed
230             auto remainingRetries = m_config.leaderElectionRetryCount;
231             while (!m_config.leaderElectionRetryCount || remainingRetries--) {
232                 try {
233                     refreshMetadata();
234                     pm = m_metadata.findTopicMetadata(worker.topic).
235                                     findPartitionMetadata(worker.partition);
236                 } catch (MetadataException ex) {
237                     // no topic and/or partition on this broker
238                     worker.throwException(ex);
239                     continue mainLoop;
240                 } catch (ConnectionException ex) {
241                     // couldn't connect to the broker
242                     worker.throwException(ex);
243                     continue mainLoop;
244                 }
245                 if (pm.leader >= 0)
246                     break;
247                 sleep(m_config.leaderElectionRetryTimeout.msecs);
248             }
249 
250             if (pm.leader < 0) {
251                 // all retries failed, we still dont have a leader for the consumer's topic/partition
252                 worker.throwException(new Exception("Leader election timed out"));
253                 continue;
254             }
255 
256             try {
257                 BrokerConnection conn = getConn(pm.leader);
258                 auto consumer = cast(Consumer)worker;
259                 if (consumer) {
260                     if (consumer.queue.offset < 0) {
261                         // get earliest or latest offset
262                         auto offset = conn.getStartingOffset(consumer.topic, consumer.partition, consumer.queue.offset);
263                         consumer.queue.offset = offset;
264                     }
265                     conn.consumerRequestBundler.addQueue(consumer.queue, BufferType.Free);
266                 } else {
267                     auto producer = cast(Producer)worker;
268                     assert(producer);
269                     conn.producerRequestBundler.addQueue(producer.queue, BufferType.Filled);
270                 }
271             } catch (ConnectionException) {
272                 // couldn't connect to the leader, readd this worker to brokerless workers to try again
273                 synchronized (m_mutex) {
274                     m_brokerlessWorkers.insertBack(worker);
275                 }
276             }
277         }
278     }
279 
280     private void checkWorkerExistence(IWorker worker, string name) {
281         foreach (w; m_workers) {
282             if (w.workerType == worker.workerType && w.topic == worker.topic && w.partition == worker.partition)
283                 throw new Exception(format("This client already has a %s for topic %s and partition %d",
284                         name, w.topic, w.partition));
285         }
286     }
287     
288 package: // functions below are used by the consumer and producer classes
289 
290     void addNewConsumer(Consumer consumer) {
291         synchronized (m_mutex) {
292             checkWorkerExistence(consumer, "consumer");
293             m_workers.insertBack(consumer);
294             m_brokerlessWorkers.insertBack(consumer);
295             m_brokerlessWorkersEmpty.notify();
296         }
297     }
298 
299     void removeConsumer(Consumer consumer) {
300         import std.algorithm;
301         synchronized (m_mutex) {
302             m_workers.remove(find(m_workers[], consumer));
303         }
304     }
305 
306     void addNewProducer(Producer producer) {
307         synchronized (m_mutex) {
308             checkWorkerExistence(producer, "producer");
309             m_workers.insertBack(producer);
310             m_brokerlessWorkers.insertBack(producer);
311             m_brokerlessWorkersEmpty.notify();
312         }
313     }
314 
315     void addToBrokerless(Queue queue, bool notify = false) {
316         m_brokerlessWorkers.insertBack(queue.worker);
317         if (notify)
318             m_brokerlessWorkersEmpty.notify();
319     }
320 
321     void connectionLost(BrokerConnection conn) {
322         synchronized (m_mutex) {
323 			synchronized(conn.consumerRequestBundler.mutex) {
324 				synchronized(conn.producerRequestBundler.mutex) {
325             		foreach (pair; m_conns.byKeyValue) {
326             		    if (pair.value == conn) {
327             		        m_conns.remove(pair.key);
328             		        break;
329             		    }
330             		}
331             		foreach (q; &conn.consumerRequestBundler.queues) {
332             		    addToBrokerless(q);
333             		    synchronized (q.mutex) {
334             		        q.requestBundler = null;
335             		        q.requestPending = false;
336             		    }
337             		}
338             		foreach (q; &conn.producerRequestBundler.queues) {
339             		    addToBrokerless(q);
340             		    synchronized (q.mutex) {
341             		        q.requestBundler = null;
342             		        q.requestPending = false;
343             		    }
344             		}
345             		m_brokerlessWorkersEmpty.notify();
346 				}
347         	}
348     	}
349 	}
350 }