1 module kafkad.producer;
2 
3 import kafkad.client;
4 import kafkad.exception;
5 import kafkad.queue;
6 import kafkad.worker;
7 import kafkad.utils.snappy;
8 import etc.c.zlib;
9 import std.exception;
10 import core.time;
11 import vibe.core.core;
12 import vibe.core.sync;
13 
14 class Producer : IWorker {
15     enum __isWeakIsolatedType = true; // needed to pass this type between vibe.d's tasks
16     private {
17         Client m_client;
18         string m_topic;
19         int m_partition;
20         Queue m_queue;
21         QueueBuffer* m_currentBuffer;
22         TaskCondition m_batchCondition;
23         bool m_batchStarted, m_isFirstMessage, m_bufferReserved;
24         MonoTime m_batchDeadline;
25         Compression m_compression;
26         QueueBuffer* m_compressionBuffer;
27         z_streamp m_zlibContext;
28 
29         int m_messageSize;
30         int m_keySize, m_valueSize;
31         ubyte[] m_reservedKey, m_reservedValue;
32     }
33 
34     package (kafkad) {
35         @property queue() { return m_queue; }
36     }
37 
38     /// Throws an exception in the producer task. This is used to pass the connection exceptions to the user.
39     void throwException(Exception ex) {
40         m_queue.exception = ex;
41         m_queue.condition.notify();
42     }
43 
44     @property string topic() { return m_topic; }
45     @property int partition() { return m_partition; }
46 
47     @property WorkerType workerType() { return WorkerType.Producer; }
48 
49     /// Slice from the internal buffer for the key. Note that reserveMessage() must be called first
50     @property ubyte[] reservedKey() { assert(m_bufferReserved); return m_reservedKey; }
51     /// Slice from the internal buffer for the value. Note that reserveMessage() must be called first
52     @property ubyte[] reservedValue() { assert(m_bufferReserved); return m_reservedValue; }
53 
54     /// Params:
55     ///     client = the client instance
56     ///     topic = producer topic
57     ///     partition = producer partition
58     ///     compression = the compression type, it uses config.producerCompression by default
59     ///     compressionLevel = valid only for GZIP compression, compression level between
60     ///                        1 and 9: 1 gives best speed, 9 gives best compression, it uses
61     ///                        config.producerCompressionLevel by default
62     this(Client client, string topic, int partition, Compression compression, int compressionLevel) {
63         m_client = client;
64         m_topic = topic;
65         m_partition = partition;
66         m_queue = new Queue(this, m_client.config.producerQueueBuffers, m_client.config.producerMaxBytes);
67         m_currentBuffer = null;
68         m_batchCondition = new TaskCondition(m_queue.mutex);
69         m_batchStarted = false;
70         m_compression = compression;
71         m_compressionBuffer = m_compression != Compression.None ? new QueueBuffer(m_client.config.producerMaxBytes) : null;
72         if (m_compression == Compression.GZIP) {
73             m_zlibContext = new z_stream;
74             m_zlibContext.zalloc = null;
75             m_zlibContext.zfree = null;
76             m_zlibContext.opaque = null;
77             enforce(deflateInit2(m_zlibContext, compressionLevel, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) == Z_OK);
78         }
79         scope (failure) if (m_compression == Compression.GZIP)
80             deflateEnd(m_zlibContext);
81 
82         client.addNewProducer(this);
83         runTask(&batcherMain);
84     }
85 
86     /// Params:
87     ///     client = the client instance
88     ///     topic = producer topic
89     ///     partition = producer partition
90     ///     compression = the compression type, it uses config.producerCompression by default
91     this(Client client, string topic, int partition, Compression compression) {
92         this(client, topic, partition, compression, client.config.producerCompressionLevel);
93     }
94 
95     /// Params:
96     ///     client = the client instance
97     ///     topic = producer topic
98     ///     partition = producer partition
99     this(Client client, string topic, int partition) {
100         this(client, topic, partition, client.config.producerCompression);
101     }
102 
103     ~this() {
104         if (m_compression == Compression.GZIP)
105             deflateEnd(m_zlibContext);
106     }
107 
108     private void batcherMain() {
109         for (;;) {
110             MonoTime currTime = MonoTime.currTime;
111             synchronized (m_queue.mutex) {
112                 while (!m_batchStarted || currTime < m_batchDeadline || m_bufferReserved) {
113                     if (m_batchStarted && currTime < m_batchDeadline) {
114                         Duration remaining = m_batchDeadline - currTime;
115                         m_batchCondition.wait(remaining);
116                         currTime = MonoTime.currTime;
117                     } else {
118                         m_batchCondition.wait();
119                         currTime = MonoTime.currTime;
120                     }
121                 }
122                 // perform batch
123                 returnCurrentBuffer();
124                 m_currentBuffer = null;
125                 m_batchStarted = false;
126             }
127         }
128     }
129 
130     private static void setupMessageHeaders(QueueBuffer* buffer, int messageSize, int keySize, int valueSize, Compression compression) {
131         import std.bitmanip, std.digest.crc;
132         buffer.p[0 .. 8] = 0; // offset
133         buffer.p[8 .. 12] = nativeToBigEndian(messageSize);
134         buffer.p += 12; // skip header
135         auto crcPtr = buffer.p;
136         buffer.p[4] = 0; // MagicByte
137         buffer.p[5] = cast(ubyte)compression; // Attributes
138         buffer.p[6 .. 10] = nativeToBigEndian(keySize);
139         buffer.p += 10 + (keySize > 0 ? keySize : 0);
140         buffer.p[0 .. 4] = nativeToBigEndian(valueSize);
141         buffer.p += 4 + (valueSize > 0 ? valueSize : 0);
142         assert((buffer.p - crcPtr) == messageSize);
143         ubyte[4] computedCrc = crc32Of(crcPtr[4 .. messageSize]); // start from 4th byte to skip the crc field
144         crcPtr[0 .. 4] = nativeToBigEndian(*(cast(uint*)&computedCrc)); // set crc field
145     }
146 
147     private void returnCurrentBuffer() {
148         import std.algorithm, std.digest.crc, vibe.core.log;
149         if (m_compression != Compression.None) {
150             // Offset + MessageSize + Crc + MagicByte + Attributes + Key length + Value length
151             enum int headersLen = 8 + 4 + 4 + 1 + 1 + 4 + 4;
152             // maximum compressed data size, leaving room for the message headers
153             int maxCompressedSize = m_client.config.producerMaxBytes - headersLen;
154 
155             void bufferCompressed(int compressedLen) {
156                 int messageSize = 4 + 1 + 1 + 4 + 4 + compressedLen;
157                 m_compressionBuffer.rewind();
158                 setupMessageHeaders(m_compressionBuffer, messageSize, -1, compressedLen, m_compression);
159                 swap(m_currentBuffer, m_compressionBuffer);
160             }
161             
162             switch (m_compression) {
163                 case Compression.GZIP:
164                     m_zlibContext.next_in = m_currentBuffer.buffer;
165                     m_zlibContext.avail_in = cast(uint)m_currentBuffer.filled;
166                     m_zlibContext.next_out = m_compressionBuffer.buffer + headersLen;
167                     m_zlibContext.avail_out = maxCompressedSize;
168                     int r = deflate(m_zlibContext, Z_FINISH);
169                     if (r == Z_STREAM_END) {
170                         bufferCompressed(cast(int)m_zlibContext.total_out);
171                     } else {
172                         // compressed data was bigger than the input, this may occur on uncompressible input data
173                         logDebugV("Skipping GZIP compression");
174                     }
175                     deflateReset(m_zlibContext);
176                     break;
177                 case Compression.Snappy:
178                     size_t outLen = maxCompressedSize;
179                     int r = snappy_compress(m_currentBuffer.buffer, m_currentBuffer.filled,
180                         m_compressionBuffer.buffer + headersLen, &outLen);
181                     if (r == SNAPPY_OK) {
182                         bufferCompressed(cast(int)outLen);
183                     } else {
184                         // compressed data was bigger than the input, this may occur on uncompressible input data
185                         logDebugV("Skipping Snappy compression");
186                     }
187                     break;
188                 default: assert(0);
189             }
190         }
191         m_queue.returnBuffer(BufferType.Filled, m_currentBuffer);
192         m_queue.notifyRequestBundler();
193     }
194 
195     /// Reserves the space for the next message using specified key and value sizes. This may
196     /// be used to avoid double-copy. Instead of filling the user's buffer and passing it to
197     /// pushMessage(), the user may call the reserveMessage() and fill the data directly in
198     /// the internal buffer using reservedKey and reservedValue slices. This function is used
199     /// internally by pushMessage() function.
200     /// Params:
201     ///     keySize = number of bytes to reserve for the key, -1 for null key
202     ///     valueSize = number of bytes to reserve for the value, -1 for the null value
203     /// See_Also:
204     ///     commitMessage
205     void reserveMessage(int keySize, int valueSize) {
206         assert(keySize >= -1);
207         assert(valueSize >= -1);
208         // Crc + MagicByte + Attributes + KeyLength + ValueLength
209         int messageSize = 4 + 1 + 1 + 4 + 4;
210         if (keySize > 0)
211             messageSize += keySize;
212         if (valueSize > 0)
213             messageSize += valueSize;
214         // Offset + MessageSize
215         size_t messageSetOverhead = 8 + 4 + messageSize;
216         enforce(messageSetOverhead <= m_client.config.consumerMaxBytes,
217             "Message is too big to fit into message set");
218 
219         synchronized (m_queue.mutex) {
220             if (!m_currentBuffer) {
221                 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free);
222                 m_batchStarted = false;
223                 m_isFirstMessage = true;
224             } else if (m_currentBuffer.remaining < messageSetOverhead) {
225                 returnCurrentBuffer();
226                 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free);
227                 m_batchStarted = false;
228                 m_isFirstMessage = true;
229             }
230             m_bufferReserved = true;
231         }
232 
233         auto p = m_currentBuffer.p + 22;
234         m_reservedKey = keySize > 0 ? p[0 .. keySize] : null;
235         p += m_reservedKey.length + 4;
236         m_reservedValue = valueSize > 0 ? p[0 .. valueSize] : null;
237         assert(p + m_reservedValue.length <= m_currentBuffer.end);
238 
239         m_messageSize = messageSize;
240         m_keySize = keySize;
241         m_valueSize = valueSize;
242     }
243 
244     /// Commits previously reserved space and builds up the message
245     /// See_Also:
246     ///     reserveMessage
247     void commitMessage() {
248         Exception ex = m_queue.exception;
249         if (ex)
250             throw ex;
251 
252         setupMessageHeaders(m_currentBuffer, m_messageSize, m_keySize, m_valueSize, Compression.None);
253 
254         if (m_isFirstMessage) {
255             m_isFirstMessage = false;
256             synchronized (m_queue.mutex) {
257                 m_batchStarted = true;
258                 m_batchDeadline = MonoTime.currTime + m_client.config.producerBatchTimeout.msecs;
259             }
260         }
261         m_bufferReserved = false;
262         m_batchCondition.notify();
263     }
264 
265     /// Pushes the message to the cluster
266     /// Params:
267     ///     key = the message key, may be null
268     ///     value = the message value, may be null
269     void pushMessage(const(ubyte)[] key, const(ubyte)[] value) {
270         int keySize = key == null ? -1 : cast(int)key.length;
271         int valueSize = value == null ? -1 : cast(int)value.length;
272         import vibe.core.log;
273         logTrace("keySize: %d, valueSize: %d", keySize, valueSize);
274         reserveMessage(keySize, valueSize);
275         logTrace("m_keySize: %d, m_valueSize: %d", m_reservedKey.length, m_reservedValue.length);
276         if (key.length)
277             m_reservedKey[] = key; // copy into buffer
278         if (value.length)
279             m_reservedValue[] = value; // copy into buffer
280         commitMessage();
281     }
282 }