1 module kafkad.queue; 2 3 import kafkad.worker; 4 import kafkad.bundler; 5 import kafkad.utils.lists; 6 import vibe.core.sync; 7 import std.bitmanip; 8 import std.exception; 9 import core.atomic; 10 import core.memory; 11 12 // Represents a single buffer of the worker queue 13 struct QueueBuffer { 14 ubyte* buffer, p, end; 15 size_t messageSetSize; 16 long requestedOffset; 17 18 QueueBuffer* next; 19 20 this(size_t size) { 21 buffer = cast(ubyte*)enforce(GC.malloc(size, GC.BlkAttr.NO_SCAN)); 22 p = buffer; 23 end = buffer + size; 24 } 25 26 @property size_t filled() { 27 return p - buffer; 28 } 29 30 @property size_t remaining() { 31 return end - p; 32 } 33 34 @property ubyte[] filledSlice() { 35 return buffer[0 .. filled]; 36 } 37 38 void rewind() { 39 p = buffer; 40 } 41 42 long findNextOffset() { 43 ubyte* p = this.p; 44 ubyte* end = this.p + messageSetSize; 45 long offset = -2; 46 while (end - p > 12) { 47 offset = bigEndianToNative!long(p[0 .. 8]); 48 int messageSize = bigEndianToNative!int(p[8 .. 12]); 49 p += 12 + messageSize; 50 if (p > end) { 51 // this is last, partial message 52 return offset; 53 } 54 } 55 return offset + 1; 56 } 57 } 58 59 enum BufferType { Free, Filled } 60 61 // Holds the buffers for the workers 62 final class Queue { 63 private { 64 IWorker m_worker; 65 List!QueueBuffer[2] m_buffers; 66 TaskMutex m_mutex; 67 TaskCondition m_condition; 68 bool m_requestPending; 69 RequestBundler m_bundler; 70 long m_offset; 71 shared Exception m_exception; 72 } 73 74 // this is also updated in the fetch/push task 75 bool requestPending() { return m_requestPending; } 76 bool requestPending(bool v) { return m_requestPending = v; } 77 78 @property auto worker() { return m_worker; } 79 80 @property auto mutex() { return m_mutex; } 81 @property auto condition() { return m_condition; } 82 83 @property auto requestBundler() { return m_bundler; } 84 @property auto requestBundler(RequestBundler v) { return m_bundler = v; } 85 86 @property auto offset() { return m_offset; } 87 @property auto offset(long v) { return m_offset = v; } 88 89 @property auto exception() { return m_exception.atomicLoad; } 90 @property void exception(Exception v) { m_exception.atomicStore(cast(shared)v); } 91 92 this(IWorker worker, size_t bufferCount, size_t bufferSize) { 93 import std.algorithm : max; 94 m_worker = worker; 95 auto m_freeBufferCount = max(2, bufferCount); // at least 2 96 foreach (n; 0 .. m_freeBufferCount) { 97 auto qbuf = new QueueBuffer(bufferSize); 98 m_buffers[BufferType.Free].pushBack(qbuf); 99 } 100 m_mutex = new TaskMutex(); 101 m_condition = new TaskCondition(m_mutex); 102 m_requestPending = false; 103 m_bundler = null; 104 m_exception = null; 105 } 106 107 bool hasBuffer(BufferType bufferType) { 108 return !m_buffers[bufferType].empty; 109 } 110 111 QueueBuffer* getBuffer(BufferType bufferType) { 112 return m_buffers[bufferType].popFront(); 113 } 114 115 QueueBuffer* waitForBuffer(BufferType bufferType) { 116 List!QueueBuffer* buffers = &m_buffers[bufferType]; 117 while (!m_exception && (!m_bundler || buffers.empty)) 118 m_condition.wait(); 119 if (m_exception) 120 throw m_exception; 121 QueueBuffer* buffer = buffers.popFront(); 122 return buffer; 123 } 124 125 void returnBuffer(BufferType bufferType, QueueBuffer* buffer) { 126 assert(buffer); 127 m_buffers[bufferType].pushBack(buffer); 128 } 129 130 void notifyRequestBundler() { 131 // notify the fetch/push task that there are ready buffers 132 // the fetch/push task will then make a batch request for all queues with ready buffers 133 // do not notify the task if there is a pending request for this queue 134 if (m_bundler && !m_requestPending) { 135 synchronized (m_bundler.mutex) { 136 m_bundler.queueHasReadyBuffers(this); 137 } 138 m_requestPending = true; 139 } 140 } 141 }