1 module kafkad.connection;
2 
3 import kafkad.client;
4 import kafkad.protocol;
5 import kafkad.exception;
6 import kafkad.bundler;
7 import kafkad.queue;
8 import kafkad.utils.lists;
9 import vibe.core.core;
10 import vibe.core.net;
11 import vibe.core.task;
12 import vibe.core.sync;
13 import vibe.core.log;
14 import std.format;
15 import core.time;
16 import std.concurrency;
17 
18 package:
19 
20 enum RequestType { Metadata, Fetch, Produce, Offset };
21 
22 struct Request {
23     RequestType type;
24     Tid tid;
25 
26     Request* next;
27 }
28 
29 class BrokerConnection {
30     private {
31         Client m_client;
32         TCPConnection m_conn;
33         Serializer m_ser;
34         Deserializer m_des;
35         TaskMutex m_mutex;
36         RequestBundler m_consumerRequestBundler;
37         RequestBundler m_producerRequestBundler;
38         FreeList!Request m_requests;
39         Task m_fetcherTask, m_pusherTask, m_receiverTask;
40         ubyte[] m_topicNameBuffer;
41     }
42 
43     int id = -1;
44 
45     @property NetworkAddress addr() {
46         return m_conn.remoteAddress.rethrow!ConnectionException("Could not get connection's remote address");
47     }
48 
49     @property RequestBundler consumerRequestBundler() { return m_consumerRequestBundler; }
50     @property RequestBundler producerRequestBundler() { return m_producerRequestBundler; }
51 
52     this(Client client, TCPConnection conn) {
53         m_client = client;
54         m_conn = conn;
55         m_ser = Serializer(conn, client.config.serializerChunkSize);
56         m_des = Deserializer(conn, client.config.deserializerChunkSize);
57         m_mutex = new TaskMutex();
58         m_consumerRequestBundler = new RequestBundler();
59         m_producerRequestBundler = new RequestBundler();
60         m_topicNameBuffer = new ubyte[short.max];
61         m_fetcherTask = runTask(&fetcherMain);
62         m_pusherTask = runTask(&pusherMain);
63         m_receiverTask = runTask(&receiverMain);
64     }
65 
66     void fetcherMain() {
67         int size, correlationId;
68         bool gotFirstRequest = false;
69         MonoTime startTime;
70         try {
71             // send requests
72             for (;;) {
73                 synchronized (m_consumerRequestBundler.mutex) {
74                     auto minRequests = min(m_client.config.fetcherBundleMinRequests, m_consumerRequestBundler.queueCount);
75                     if (!gotFirstRequest) {
76                         // wait for the first fetch request
77                         while (!m_consumerRequestBundler.requestTopicsFront) {
78                             m_consumerRequestBundler.readyCondition.wait();
79                         }
80                         if (m_consumerRequestBundler.requestsCollected < minRequests) {
81                             gotFirstRequest = true;
82                             // start the timer
83                             startTime = MonoTime.currTime;
84                             // wait for more requests
85                             continue;
86                         }
87                     } else {
88                         // wait up to configured wait time or up to configured request count
89                         while (m_consumerRequestBundler.requestsCollected < minRequests) {
90                             Duration elapsedTime = MonoTime.currTime - startTime;
91                             if (elapsedTime >= m_client.config.fetcherBundleMaxWaitTime.msecs)
92                                 break; // timeout reached
93                             Duration remaining = m_client.config.fetcherBundleMaxWaitTime.msecs - elapsedTime;
94                             if (!m_consumerRequestBundler.readyCondition.wait(remaining))
95                                 break; // timeout reached
96                         }
97                         gotFirstRequest = false;
98                     }
99 
100                     synchronized (m_mutex) {
101                         m_ser.fetchRequest_v0(0, m_client.clientId, m_client.config, m_consumerRequestBundler);
102                         m_ser.flush();
103 
104                         // add request for each fetch
105                         auto req = m_requests.getNodeToFill();
106                         req.type = RequestType.Fetch;
107                         m_requests.pushFilledNode(req);
108                     }
109 
110                     m_consumerRequestBundler.clearRequestLists();
111                 }
112             }
113         } catch (StreamException) {
114             // stream error, typically connection loss
115             m_pusherTask.interrupt();
116             m_receiverTask.interrupt();
117             m_client.connectionLost(this);
118         } catch (InterruptException) {
119             // do nothing
120             logDebugV("FETCHER INT");
121         }
122     }
123 
124     void pusherMain() {
125         int size, correlationId;
126         bool gotFirstRequest = false;
127         MonoTime startTime;
128         try {
129             // send requests
130             for (;;) {
131                 synchronized (m_producerRequestBundler.mutex) {
132                     auto minRequests = min(m_client.config.pusherBundleMinRequests, m_producerRequestBundler.queueCount);
133                     if (!gotFirstRequest) {
134                         // wait for the first produce request
135                         while (!m_producerRequestBundler.requestTopicsFront) {
136                             m_producerRequestBundler.readyCondition.wait();
137                         }
138                         if (m_producerRequestBundler.requestsCollected < minRequests) {
139                             gotFirstRequest = true;
140                             // start the timer
141                             startTime = MonoTime.currTime;
142                             // wait for more requests
143                             continue;
144                         }
145                     } else {
146                         // wait up to configured wait time or up to configured request count
147                         while (m_producerRequestBundler.requestsCollected < minRequests) {
148                             Duration elapsedTime = MonoTime.currTime - startTime;
149                             if (elapsedTime >= m_client.config.pusherBundleMaxWaitTime.msecs)
150                                 break; // timeout reached
151                             Duration remaining = m_client.config.pusherBundleMaxWaitTime.msecs - elapsedTime;
152                             if (!m_producerRequestBundler.readyCondition.wait(remaining))
153                                 break; // timeout reached
154                         }
155                         gotFirstRequest = false;
156                     }
157 
158                     synchronized (m_mutex) {
159                         m_ser.produceRequest_v0(0, m_client.clientId, m_client.config, m_producerRequestBundler);
160                         m_ser.flush();
161                         
162                         // add request for each fetch
163                         auto req = m_requests.getNodeToFill();
164                         req.type = RequestType.Produce;
165                         m_requests.pushFilledNode(req);
166                     }
167 
168                     m_producerRequestBundler.clearRequestLists();
169                 }
170             }
171         } catch (StreamException) {
172             // stream error, typically connection loss
173             m_fetcherTask.interrupt();
174             m_receiverTask.interrupt();
175             m_client.connectionLost(this);
176         } catch (InterruptException) {
177             // do nothing
178             logDebugV("PUSHER INT");
179         }
180     }
181 
182     void receiverMain() {
183         try {
184             int size, correlationId;
185             for (;;) {
186                 m_des.getMessage(size, correlationId);
187                 m_des.beginMessage(size);
188                 scope (success)
189                     m_des.endMessage();
190 
191                 // requests are always processed in order on a single TCP connection,
192                 // and we rely on that order rather than on the correlationId
193                 // requests are pushed to the request queue by the consumer and producer
194                 // and they are popped here in the order they were sent
195                 Request req = void;
196                 synchronized (m_mutex) {
197                     assert(!m_requests.empty);
198                     auto node = m_requests.getNodeToProcess();
199                     req = *node;
200                     m_requests.returnProcessedNode(node);
201                 }
202 
203                 switch (req.type) {
204                     case RequestType.Metadata:
205                         Metadata metadata = m_des.metadataResponse_v0();
206                         send(req.tid, cast(immutable)metadata);
207                         break;
208                     case RequestType.Offset:
209                         OffsetResponse_v0 resp = m_des.offsetResponse_v0();
210                         send(req.tid, cast(immutable)resp);
211                         break;
212                     case RequestType.Fetch:
213                         // parse the fetch response, move returned messages to the correct queues,
214                         // and handle partition errors if needed
215                         int numtopics;
216                         m_des.deserialize(numtopics);
217                         assert(numtopics > 0);
218                         foreach (nt; 0 .. numtopics) {
219                             string topic;
220                             int numpartitions;
221                             short topicNameLen;
222                             m_des.deserialize(topicNameLen);
223 
224                             ubyte[] topicSlice = m_topicNameBuffer[0 .. topicNameLen];
225                             m_des.deserializeSlice(topicSlice);
226                             topic = cast(string)topicSlice;
227                             m_des.deserialize(numpartitions);
228                             assert(numpartitions > 0);
229 
230                             synchronized (m_consumerRequestBundler.mutex) {
231                                 Topic* queueTopic = m_consumerRequestBundler.findTopic(topic);
232 
233                                 foreach (np; 0 .. numpartitions) {
234                                     static struct FetchPartitionInfo {
235                                         int partition;
236                                         short errorCode;
237                                         long endOffset;
238                                         int messageSetSize;
239                                     }
240                                     FetchPartitionInfo fpi;
241                                     m_des.deserialize(fpi);
242 
243                                     Partition* queuePartition = null;
244                                     if (queueTopic)
245                                         queuePartition = queueTopic.findPartition(fpi.partition);
246 
247                                     if (!queuePartition) {
248                                         // skip the partition
249                                         m_des.skipBytes(fpi.messageSetSize);
250                                         continue;
251                                     }
252 
253                                     Queue queue = queuePartition.queue;
254 
255                                     switch (cast(ApiError)fpi.errorCode) {
256                                         case ApiError.NoError: break;
257                                         case ApiError.UnknownTopicOrPartition:
258                                         case ApiError.LeaderNotAvailable:
259                                         case ApiError.NotLeaderForPartition:
260                                             // We need to refresh the metadata, get the new connection and
261                                             // retry the request. To do so, we remove the consumer from this
262                                             // connection and add it to the client brokerlessConsumers list.
263                                             // The client will do the rest.
264                                             m_consumerRequestBundler.removeQueue(queueTopic, queuePartition);
265                                             m_client.addToBrokerless(queue, true);
266                                             m_des.skipBytes(fpi.messageSetSize);
267                                             continue;
268                                         case ApiError.OffsetOutOfRange:
269                                             m_consumerRequestBundler.removeQueue(queueTopic, queuePartition);
270                                             m_client.removeConsumer(cast(Consumer)queue.worker);
271                                             queue.worker.throwException(new OffsetOutOfRangeException(format(
272                                                         "Offset %d is out of range for topic %s, partition %d",
273                                                         queue.offset, queueTopic.topic, queuePartition.partition)));
274                                             m_des.skipBytes(fpi.messageSetSize);
275                                             continue;
276                                         default: throw new ProtocolException(format("Unexpected fetch response error: %d", fpi.errorCode));
277                                     }
278 
279                                     if (fpi.messageSetSize > m_client.config.consumerMaxBytes) {
280                                         m_consumerRequestBundler.removeQueue(queueTopic, queuePartition);
281                                         queue.worker.throwException(new ProtocolException("MessageSet is too big to fit into a buffer"));
282                                         m_des.skipBytes(fpi.messageSetSize);
283                                         continue;
284                                     }
285 
286                                     if (fpi.messageSetSize < 26) {
287                                         // we got a message set that is smaller than minimum message size, make another request
288                                         m_consumerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition);
289                                         m_des.skipBytes(fpi.messageSetSize);
290                                         continue;
291                                     }
292 
293                                     QueueBuffer* qbuf;
294 
295                                     synchronized (queue.mutex)
296                                         qbuf = queue.getBuffer(BufferType.Free);
297 
298                                     // copy message set to the buffer
299                                     m_des.deserializeSlice(qbuf.buffer[0 .. fpi.messageSetSize]);
300                                     qbuf.p = qbuf.buffer;
301                                     qbuf.messageSetSize = fpi.messageSetSize;
302                                     qbuf.requestedOffset = queue.offset;
303 
304                                     // find the next offset to fetch
305                                     long nextOffset = qbuf.findNextOffset();
306                                     enforce(nextOffset > 0);
307 
308                                     synchronized (queue.mutex) {
309                                         queue.offset = nextOffset;
310                                         queue.returnBuffer(BufferType.Filled, qbuf);
311                                         queue.condition.notify();
312                                         // queue.fetchPending is always true here
313                                         if (queue.hasBuffer(BufferType.Free))
314                                             m_consumerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition);
315                                         else
316                                             queue.requestPending = false;
317                                     }
318                                 }
319                             }
320                         }
321                         break;
322                     case RequestType.Produce:
323                         int numtopics;
324                         m_des.deserialize(numtopics);
325                         assert(numtopics > 0);
326                         foreach (nt; 0 .. numtopics) {
327                             string topic;
328                             int numpartitions;
329                             short topicNameLen;
330                             m_des.deserialize(topicNameLen);
331                             
332                             ubyte[] topicSlice = m_topicNameBuffer[0 .. topicNameLen];
333                             m_des.deserializeSlice(topicSlice);
334                             topic = cast(string)topicSlice;
335                             m_des.deserialize(numpartitions);
336                             assert(numpartitions > 0);
337                             
338                             synchronized (m_producerRequestBundler.mutex) {
339                                 Topic* queueTopic = m_producerRequestBundler.findTopic(topic);
340 
341                                 foreach (np; 0 .. numpartitions) {
342                                     static struct ProducePartitionInfo {
343                                         int partition;
344                                         short errorCode;
345                                         long offset;
346                                     }
347                                     ProducePartitionInfo ppi;
348                                     m_des.deserialize(ppi);
349                                     
350                                     Partition* queuePartition = null;
351                                     if (queueTopic)
352                                         queuePartition = queueTopic.findPartition(ppi.partition);
353 
354                                     assert(queuePartition);
355                                     if (!queuePartition) {
356                                         // skip the partition
357                                         continue;
358                                     }
359 
360                                     Queue queue = queuePartition.queue;
361 
362                                     switch (cast(ApiError)ppi.errorCode) {
363                                         case ApiError.NoError: break;
364                                         case ApiError.UnknownTopicOrPartition:
365                                         case ApiError.LeaderNotAvailable:
366                                         case ApiError.NotLeaderForPartition:
367                                             // We need to refresh the metadata, get the new connection and
368                                             // retry the request. To do so, we remove the producer from this
369                                             // connection and add it to the client brokerlessWorkers list.
370                                             // The client will do the rest.
371                                             m_producerRequestBundler.removeQueue(queueTopic, queuePartition);
372                                             m_client.addToBrokerless(queue, true);
373                                             continue;
374                                         default: throw new ProtocolException(format("Unexpected produce response error: %d", ppi.errorCode));
375                                     }
376 
377                                     synchronized (queue.mutex) {
378                                         //queue.returnBuffer(BufferType.Filled);
379                                         // queue.requestPending is always true here
380                                         if (queue.hasBuffer(BufferType.Filled))
381                                             m_producerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition);
382                                         else
383                                             queue.requestPending = false;
384                                     }
385                                 }
386                             }
387                         }
388                         break;
389                     default: assert(0); // FIXME
390                 }
391             }
392         }
393         catch (StreamException) {
394             // stream error, typically connection loss
395             m_fetcherTask.interrupt();
396             m_pusherTask.interrupt();
397             m_client.connectionLost(this);
398         } catch (InterruptException) {
399             // do nothing
400             logDebugV("RECEIVER INT");
401         }
402     }
403 
404     Metadata getMetadata(string[] topics) {
405         synchronized (m_mutex) {
406             m_ser.metadataRequest_v0(0, m_client.clientId, topics);
407             m_ser.flush();
408 
409             auto req = m_requests.getNodeToFill();
410             req.type = RequestType.Metadata;
411             req.tid = thisTid;
412             m_requests.pushFilledNode(req);
413         }
414         Metadata ret;
415         receive((immutable Metadata v) { ret = cast()v; });
416         return ret;
417     }
418 
419     long getStartingOffset(string topic, int partition, long offset) {
420         assert(offset == -1 || offset == -2);
421         OffsetRequestParams_v0.PartTimeMax[1] p;
422         p[0].partition = partition;
423         p[0].time = offset;
424         p[0].maxOffsets = 1;
425         OffsetRequestParams_v0.Topic[1] t;
426         t[0].topic = topic;
427         t[0].partitions = p;
428         OffsetRequestParams_v0 params;
429         params.replicaId = id;
430         params.topics = t;
431         synchronized (m_mutex) {
432             m_ser.offsetRequest_v0(0, m_client.clientId, params);
433             m_ser.flush();
434 
435             auto req = m_requests.getNodeToFill();
436             req.type = RequestType.Offset;
437             req.tid = thisTid;
438             m_requests.pushFilledNode(req);
439         }
440         long return_offset;
441         receive(
442             (immutable OffsetResponse_v0 resp) { 
443                 enforce(resp.topics.length == 1);
444                 enforce(resp.topics[0].partitions.length == 1);
445                 import std.format;
446                 enforce(resp.topics[0].partitions[0].errorCode == 0,format("Could not get starting offset for topic %s and partition %d", topic, partition));
447                 enforce(resp.topics[0].partitions[0].offsets.length == 1);
448                 return_offset = resp.topics[0].partitions[0].offsets[0];
449              });
450 
451         return return_offset;
452     }
453 }