1 module kafkad.bundler; 2 3 import core.sync.mutex; 4 import kafkad.queue; 5 import vibe.core.sync; 6 7 struct Partition { 8 int partition; 9 Queue queue; 10 QueueBuffer* buffer; 11 12 /// field used to build the partition linked list for each topic for the dynamic fetch/produce request 13 Partition* next; 14 } 15 16 struct Topic { 17 string topic; 18 Partition*[int] partitions; 19 Partition* requestPartitionsFront, requestPartitionsBack; 20 21 // Fields used to build the fetch/produce request. This struct acts both as a holder for a topic and 22 // as a linked list node for the request. The request is built dynamically, part by part when a 23 // buffer becomes ready in the queue (if there is no pending request for this topic/partition). 24 // Buffer readiness for the consumer is when there are free buffers to be filled up, and for the 25 // producer when there are filled buffers to be pushed to the cluster. 26 27 /// false if this topic is not in the linked list, true if it is already added 28 bool isInRequest; 29 /// number of partition in the partition linked list for this topic 30 size_t partitionsInRequest; 31 /// field used to build the topic linked list for the dynamic fetch/produce request 32 Topic* next; 33 34 Partition* findPartition(int partition) { 35 auto ppPartition = partition in partitions; 36 return ppPartition ? *ppPartition : null; 37 } 38 } 39 40 // Request bundler build internal data structure which allow to perform fast batched fetch/produce 41 // requests for multiple topics and partitions. Such batching (or bundling) of the requests is very 42 // desirable feature, because requests on the broker are processed serially, one by one and each 43 // fetch/produce request specifies a timeout. If many such requests are issued, each one may wait 44 // up to config.consumerMaxWaitTime/produceRequestTimeout and thus, may block the others waiting 45 // in the request queue. For example, if maximum wait time will me 100 ms, and there will be 10 46 // consumers, then without the request bundling, it may happen that the last consumer will be 47 // waiting for 1 second for the next buffer. In general view, many separate fetch/produce requests 48 // can significantly reduce performance. 49 class RequestBundler { 50 private { 51 Topic*[string] m_topics; 52 TaskMutex m_mutex; 53 InterruptibleTaskCondition m_readyCondition; // notified when there are queues with ready buffers 54 size_t m_queueCount; 55 size_t m_requestsCollected; 56 } 57 58 Topic* requestTopicsFront, requestTopicsBack; 59 60 this() { 61 m_mutex = new TaskMutex(); 62 m_readyCondition = new InterruptibleTaskCondition(cast(core.sync.mutex.Mutex)m_mutex); 63 } 64 65 @property auto topics() { return m_topics; } 66 @property auto mutex() { return m_mutex; } 67 @property auto readyCondition() { return m_readyCondition; } 68 @property auto queueCount() { return m_queueCount; } 69 @property auto requestsCollected() { return m_requestsCollected; } 70 71 void clearRequestLists() { 72 Topic* cur = requestTopicsFront; 73 while (cur) { 74 cur.isInRequest = false; 75 cur = cur.next; 76 } 77 requestTopicsFront = null; 78 m_queueCount = 0; 79 m_requestsCollected = 0; 80 } 81 82 void queueHasReadyBuffers(Queue queue) { 83 auto ptopic = findTopic(queue.worker.topic); 84 assert(ptopic); 85 auto ppartition = ptopic.findPartition(queue.worker.partition); 86 assert(ppartition); 87 queueHasReadyBuffers(ptopic, ppartition); 88 } 89 90 void queueHasReadyBuffers(Topic* ptopic, Partition* ppartition) { 91 assert(ptopic); 92 assert(ppartition); 93 if (!ptopic.isInRequest) { 94 if (!requestTopicsFront) { 95 // this will be the first topic in the following request 96 requestTopicsFront = ptopic; 97 requestTopicsBack = ptopic; 98 } else { 99 assert(requestTopicsBack); 100 requestTopicsBack.next = ptopic; 101 requestTopicsBack = ptopic; 102 } 103 ptopic.next = null; 104 // this will be the first partition in this topic in the following request 105 ptopic.requestPartitionsFront = ppartition; 106 ptopic.requestPartitionsBack = ppartition; 107 ptopic.partitionsInRequest = 1; 108 ptopic.isInRequest = true; 109 ppartition.next = null; 110 } else { 111 // the topic is already in the list, so there must be at least one partition 112 assert(ptopic.requestPartitionsBack); 113 ptopic.requestPartitionsBack.next = ppartition; 114 ptopic.requestPartitionsBack = ppartition; 115 ++ptopic.partitionsInRequest; 116 ppartition.next = null; 117 } 118 ++m_requestsCollected; 119 m_readyCondition.notify(); 120 } 121 122 Topic* getOrCreateTopic(string topic) { 123 auto ptopic = findTopic(topic); 124 if (!ptopic) { 125 ptopic = new Topic; 126 ptopic.topic = topic; 127 m_topics[topic] = ptopic; 128 } 129 return ptopic; 130 } 131 132 void addQueue(Queue queue, BufferType readyBufferType) { 133 synchronized (m_mutex) { 134 synchronized(queue.mutex) { 135 auto ptopic = getOrCreateTopic(queue.worker.topic); 136 auto ppartition = new Partition(queue.worker.partition, queue); 137 ptopic.partitions[queue.worker.partition] = ppartition; 138 139 queue.requestBundler = this; 140 if (queue.hasBuffer(readyBufferType)) { 141 queueHasReadyBuffers(ptopic, ppartition); 142 queue.requestPending = true; 143 } else { 144 queue.requestPending = false; 145 } 146 147 BufferType workerBufferType = readyBufferType == BufferType.Free ? BufferType.Filled : BufferType.Free; 148 if (queue.hasBuffer(workerBufferType)) 149 queue.condition.notify(); 150 ++m_queueCount; 151 } 152 } 153 } 154 155 // this is called when m_mutex is already locked 156 void removeQueue(Topic* topic, Partition* partition) { 157 // remove the partition from the Topic 158 assert(topic.partitions.remove(partition.partition)); 159 if (!topic.partitions.length) { 160 // remove the topic from the RequestBundler 161 assert(m_topics.remove(topic.topic)); 162 } 163 synchronized (partition.queue.mutex) { 164 partition.queue.requestBundler = null; 165 partition.queue.requestPending = false; 166 } 167 --m_queueCount; 168 } 169 170 Topic* findTopic(string topic) { 171 auto pptopic = topic in m_topics; 172 return pptopic ? *pptopic : null; 173 } 174 175 int queues(int delegate(Queue) dg) { 176 foreach (t; m_topics.byValue) { 177 foreach (p; t.partitions.byValue) { 178 auto ret = dg(p.queue); 179 if (ret) 180 return ret; 181 } 182 } 183 return 0; 184 } 185 186 }