1 module kafkad.bundler;
2 
3 import core.sync.mutex;
4 import kafkad.queue;
5 import vibe.core.sync;
6 
7 struct Partition {
8     int partition;
9     Queue queue;
10     QueueBuffer* buffer;
11 
12     /// field used to build the partition linked list for each topic for the dynamic fetch/produce request
13     Partition* next;
14 }
15 
16 struct Topic {
17     string topic;
18     Partition*[int] partitions;
19     Partition* requestPartitionsFront, requestPartitionsBack;
20 
21     // Fields used to build the fetch/produce request. This struct acts both as a holder for a topic and
22     // as a linked list node for the request. The request is built dynamically, part by part when a
23     // buffer becomes ready in the queue (if there is no pending request for this topic/partition).
24     // Buffer readiness for the consumer is when there are free buffers to be filled up, and for the
25     // producer when there are filled buffers to be pushed to the cluster.
26 
27     /// false if this topic is not in the linked list, true if it is already added
28     bool isInRequest;
29     /// number of partition in the partition linked list for this topic
30     size_t partitionsInRequest;
31     /// field used to build the topic linked list for the dynamic fetch/produce request
32     Topic* next;
33 
34     Partition* findPartition(int partition) {
35         auto ppPartition = partition in partitions;
36         return ppPartition ? *ppPartition : null;
37     }
38 }
39 
40 // Request bundler build internal data structure which allow to perform fast batched fetch/produce
41 // requests for multiple topics and partitions. Such batching (or bundling) of the requests is very
42 // desirable feature, because requests on the broker are processed serially, one by one and each
43 // fetch/produce request specifies a timeout. If many such requests are issued, each one may wait
44 // up to config.consumerMaxWaitTime/produceRequestTimeout and thus, may block the others waiting
45 // in the request queue. For example, if maximum wait time will me 100 ms, and there will be 10
46 // consumers, then without the request bundling, it may happen that the last consumer will be
47 // waiting for 1 second for the next buffer. In general view, many separate fetch/produce requests
48 // can significantly reduce performance.
49 class RequestBundler {
50     private {
51         Topic*[string] m_topics;
52         TaskMutex m_mutex;
53         InterruptibleTaskCondition m_readyCondition; // notified when there are queues with ready buffers
54         size_t m_queueCount;
55         size_t m_requestsCollected;
56     }
57 
58     Topic* requestTopicsFront, requestTopicsBack;
59 
60     this() {
61         m_mutex = new TaskMutex();
62         m_readyCondition = new InterruptibleTaskCondition(cast(core.sync.mutex.Mutex)m_mutex);
63     }
64 
65     @property auto topics() { return m_topics; }
66     @property auto mutex() { return m_mutex; }
67     @property auto readyCondition() { return m_readyCondition; }
68     @property auto queueCount() { return m_queueCount; }
69     @property auto requestsCollected() { return m_requestsCollected; }
70 
71     void clearRequestLists() {
72         Topic* cur = requestTopicsFront;
73         while (cur) {
74             cur.isInRequest = false;
75             cur = cur.next;
76         }
77         requestTopicsFront = null;
78         m_queueCount = 0;
79         m_requestsCollected = 0;
80     }
81 
82     void queueHasReadyBuffers(Queue queue) {
83         auto ptopic = findTopic(queue.worker.topic);
84         assert(ptopic);
85         auto ppartition = ptopic.findPartition(queue.worker.partition);
86         assert(ppartition);
87         queueHasReadyBuffers(ptopic, ppartition);
88     }
89 
90     void queueHasReadyBuffers(Topic* ptopic, Partition* ppartition) {
91         assert(ptopic);
92         assert(ppartition);
93         if (!ptopic.isInRequest) {
94             if (!requestTopicsFront) {
95                 // this will be the first topic in the following request
96                 requestTopicsFront = ptopic;
97                 requestTopicsBack = ptopic;
98             } else {
99                 assert(requestTopicsBack);
100                 requestTopicsBack.next = ptopic;
101                 requestTopicsBack = ptopic;
102             }
103             ptopic.next = null;
104             // this will be the first partition in this topic in the following request
105             ptopic.requestPartitionsFront = ppartition;
106             ptopic.requestPartitionsBack = ppartition;
107             ptopic.partitionsInRequest = 1;
108             ptopic.isInRequest = true;
109             ppartition.next = null;
110         } else {
111             // the topic is already in the list, so there must be at least one partition
112             assert(ptopic.requestPartitionsBack);
113             ptopic.requestPartitionsBack.next = ppartition;
114             ptopic.requestPartitionsBack = ppartition;
115             ++ptopic.partitionsInRequest;
116             ppartition.next = null;
117         }
118         ++m_requestsCollected;
119         m_readyCondition.notify();
120     }
121 
122     Topic* getOrCreateTopic(string topic) {
123         auto ptopic = findTopic(topic);
124         if (!ptopic) {
125             ptopic = new Topic;
126             ptopic.topic = topic;
127             m_topics[topic] = ptopic;
128         }
129         return ptopic;
130     }
131 
132     void addQueue(Queue queue, BufferType readyBufferType) {
133         synchronized (m_mutex) {
134 			synchronized(queue.mutex) {
135             	auto ptopic = getOrCreateTopic(queue.worker.topic);
136             	auto ppartition = new Partition(queue.worker.partition, queue);
137             	ptopic.partitions[queue.worker.partition] = ppartition;
138 
139             	queue.requestBundler = this;
140             	if (queue.hasBuffer(readyBufferType)) {
141             	    queueHasReadyBuffers(ptopic, ppartition);
142             	    queue.requestPending = true;
143             	} else {
144             	    queue.requestPending = false;
145             	}
146 
147             	BufferType workerBufferType = readyBufferType == BufferType.Free ? BufferType.Filled : BufferType.Free;
148             	if (queue.hasBuffer(workerBufferType))
149             	    queue.condition.notify();
150             	++m_queueCount;
151 			}
152         }
153     }
154 
155     // this is called when m_mutex is already locked
156     void removeQueue(Topic* topic, Partition* partition) {
157         // remove the partition from the Topic
158         assert(topic.partitions.remove(partition.partition));
159         if (!topic.partitions.length) {
160             // remove the topic from the RequestBundler
161             assert(m_topics.remove(topic.topic));
162         }
163         synchronized (partition.queue.mutex) {
164             partition.queue.requestBundler = null;
165             partition.queue.requestPending = false;
166         }
167         --m_queueCount;
168     }
169 
170     Topic* findTopic(string topic) {
171         auto pptopic = topic in m_topics;
172         return pptopic ? *pptopic : null;
173     }
174 
175     int queues(int delegate(Queue) dg) {
176         foreach (t; m_topics.byValue) {
177             foreach (p; t.partitions.byValue) {
178                 auto ret = dg(p.queue);
179                 if (ret)
180                     return ret;
181             }
182         }
183         return 0;
184     }
185 
186 }