1 module kafkad.queue;
2 
3 import kafkad.worker;
4 import kafkad.bundler;
5 import kafkad.utils.lists;
6 import vibe.core.sync;
7 import std.bitmanip;
8 import std.exception;
9 import core.atomic;
10 import core.memory;
11 
12 // Represents a single buffer of the worker queue
13 struct QueueBuffer {
14     ubyte* buffer, p, end;
15     size_t messageSetSize;
16     long requestedOffset;
17 
18     QueueBuffer* next;
19 
20     this(size_t size) {
21         buffer = cast(ubyte*)enforce(GC.malloc(size, GC.BlkAttr.NO_SCAN));
22         p = buffer;
23         end = buffer + size;
24     }
25 
26     @property size_t filled() {
27         return p - buffer;
28     }
29 
30     @property size_t remaining() {
31         return end - p;
32     }
33 
34     @property ubyte[] filledSlice() {
35         return buffer[0 .. filled];
36     }
37 
38     void rewind() {
39         p = buffer;
40     }
41 
42     long findNextOffset() {
43         ubyte* p = this.p;
44         ubyte* end = this.p + messageSetSize;
45         long offset = -2;
46         while (end - p > 12) {
47             offset = bigEndianToNative!long(p[0 .. 8]);
48             int messageSize = bigEndianToNative!int(p[8 .. 12]);
49             p += 12 + messageSize;
50             if (p > end) {
51                 // this is last, partial message
52                 return offset;
53             }
54         }
55         return offset + 1;
56     }
57 }
58 
59 enum BufferType { Free, Filled }
60 
61 // Holds the buffers for the workers
62 final class Queue {
63     private {
64         IWorker m_worker;
65         List!QueueBuffer[2] m_buffers;
66         TaskMutex m_mutex;
67         TaskCondition m_condition;
68         bool m_requestPending;
69         RequestBundler m_bundler;
70         long m_offset;
71         shared Exception m_exception;
72     }
73 
74     // this is also updated in the fetch/push task
75     bool requestPending() { return m_requestPending; }
76     bool requestPending(bool v) { return m_requestPending = v; }
77 
78     @property auto worker() { return m_worker; }
79 
80     @property auto mutex() { return m_mutex; }
81     @property auto condition() { return m_condition; }
82 
83     @property auto requestBundler() { return m_bundler; }
84     @property auto requestBundler(RequestBundler v) { return m_bundler = v; }
85 
86     @property auto offset() { return m_offset; }
87     @property auto offset(long v) { return m_offset = v; }
88 
89     @property auto exception() { return m_exception.atomicLoad; }
90     @property void exception(Exception v) { m_exception.atomicStore(cast(shared)v); }
91 
92     this(IWorker worker, size_t bufferCount, size_t bufferSize) {
93         import std.algorithm : max;
94         m_worker = worker;
95         auto m_freeBufferCount = max(2, bufferCount); // at least 2
96         foreach (n; 0 .. m_freeBufferCount) {
97             auto qbuf = new QueueBuffer(bufferSize);
98             m_buffers[BufferType.Free].pushBack(qbuf);
99         }
100         m_mutex = new TaskMutex();
101         m_condition = new TaskCondition(m_mutex);
102         m_requestPending = false;
103         m_bundler = null;
104         m_exception = null;
105     }
106 
107     bool hasBuffer(BufferType bufferType) {
108         return !m_buffers[bufferType].empty;
109     }
110 
111     QueueBuffer* getBuffer(BufferType bufferType) {
112         return m_buffers[bufferType].popFront();
113     }
114 
115     QueueBuffer* waitForBuffer(BufferType bufferType) {
116         List!QueueBuffer* buffers = &m_buffers[bufferType];
117         while (!m_exception && (!m_bundler || buffers.empty))
118             m_condition.wait();
119         if (m_exception)
120             throw m_exception;
121         QueueBuffer* buffer = buffers.popFront();
122         return buffer;
123     }
124 
125     void returnBuffer(BufferType bufferType, QueueBuffer* buffer) {
126         assert(buffer);
127         m_buffers[bufferType].pushBack(buffer);
128     }
129 
130     void notifyRequestBundler() {
131         // notify the fetch/push task that there are ready buffers
132         // the fetch/push task will then make a batch request for all queues with ready buffers
133         // do not notify the task if there is a pending request for this queue
134         if (m_bundler && !m_requestPending) {
135             synchronized (m_bundler.mutex) {
136                 m_bundler.queueHasReadyBuffers(this);
137             }
138             m_requestPending = true;
139         }
140     }
141 }