1 module kafkad.producer;
2
3 import kafkad.client;
4 import kafkad.exception;
5 import kafkad.queue;
6 import kafkad.worker;
7 import kafkad.utils.snappy;
8 import etc.c.zlib;
9 import std.exception;
10 import core.time;
11 import vibe.core.core;
12 import vibe.core.sync;
13
14 class Producer : IWorker {
15 enum __isWeakIsolatedType = true; // needed to pass this type between vibe.d's tasks
16 private {
17 Client m_client;
18 string m_topic;
19 int m_partition;
20 Queue m_queue;
21 QueueBuffer* m_currentBuffer;
22 TaskCondition m_batchCondition;
23 bool m_batchStarted, m_isFirstMessage, m_bufferReserved;
24 MonoTime m_batchDeadline;
25 Compression m_compression;
26 QueueBuffer* m_compressionBuffer;
27 z_streamp m_zlibContext;
28
29 int m_messageSize;
30 int m_keySize, m_valueSize;
31 ubyte[] m_reservedKey, m_reservedValue;
32 }
33
34 package (kafkad) {
35 @property queue() { return m_queue; }
36 }
37
38 /// Throws an exception in the producer task. This is used to pass the connection exceptions to the user.
39 void throwException(Exception ex) {
40 m_queue.exception = ex;
41 m_queue.condition.notify();
42 }
43
44 @property string topic() { return m_topic; }
45 @property int partition() { return m_partition; }
46
47 @property WorkerType workerType() { return WorkerType.Producer; }
48
49 /// Slice from the internal buffer for the key. Note that reserveMessage() must be called first
50 @property ubyte[] reservedKey() { assert(m_bufferReserved); return m_reservedKey; }
51 /// Slice from the internal buffer for the value. Note that reserveMessage() must be called first
52 @property ubyte[] reservedValue() { assert(m_bufferReserved); return m_reservedValue; }
53
54 /// Params:
55 /// client = the client instance
56 /// topic = producer topic
57 /// partition = producer partition
58 /// compression = the compression type, it uses config.producerCompression by default
59 /// compressionLevel = valid only for GZIP compression, compression level between
60 /// 1 and 9: 1 gives best speed, 9 gives best compression, it uses
61 /// config.producerCompressionLevel by default
62 this(Client client, string topic, int partition, Compression compression, int compressionLevel) {
63 m_client = client;
64 m_topic = topic;
65 m_partition = partition;
66 m_queue = new Queue(this, m_client.config.producerQueueBuffers, m_client.config.producerMaxBytes);
67 m_currentBuffer = null;
68 m_batchCondition = new TaskCondition(m_queue.mutex);
69 m_batchStarted = false;
70 m_compression = compression;
71 m_compressionBuffer = m_compression != Compression.None ? new QueueBuffer(m_client.config.producerMaxBytes) : null;
72 if (m_compression == Compression.GZIP) {
73 m_zlibContext = new z_stream;
74 m_zlibContext.zalloc = null;
75 m_zlibContext.zfree = null;
76 m_zlibContext.opaque = null;
77 enforce(deflateInit2(m_zlibContext, compressionLevel, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) == Z_OK);
78 }
79 scope (failure) if (m_compression == Compression.GZIP)
80 deflateEnd(m_zlibContext);
81
82 client.addNewProducer(this);
83 runTask(&batcherMain);
84 }
85
86 /// Params:
87 /// client = the client instance
88 /// topic = producer topic
89 /// partition = producer partition
90 /// compression = the compression type, it uses config.producerCompression by default
91 this(Client client, string topic, int partition, Compression compression) {
92 this(client, topic, partition, compression, client.config.producerCompressionLevel);
93 }
94
95 /// Params:
96 /// client = the client instance
97 /// topic = producer topic
98 /// partition = producer partition
99 this(Client client, string topic, int partition) {
100 this(client, topic, partition, client.config.producerCompression);
101 }
102
103 ~this() {
104 if (m_compression == Compression.GZIP)
105 deflateEnd(m_zlibContext);
106 }
107
108 private void batcherMain() {
109 for (;;) {
110 MonoTime currTime = MonoTime.currTime;
111 synchronized (m_queue.mutex) {
112 while (!m_batchStarted || currTime < m_batchDeadline || m_bufferReserved) {
113 if (m_batchStarted && currTime < m_batchDeadline) {
114 Duration remaining = m_batchDeadline - currTime;
115 m_batchCondition.wait(remaining);
116 currTime = MonoTime.currTime;
117 } else {
118 m_batchCondition.wait();
119 currTime = MonoTime.currTime;
120 }
121 }
122 // perform batch
123 returnCurrentBuffer();
124 m_currentBuffer = null;
125 m_batchStarted = false;
126 }
127 }
128 }
129
130 private static void setupMessageHeaders(QueueBuffer* buffer, int messageSize, int keySize, int valueSize, Compression compression) {
131 import std.bitmanip, std.digest.crc;
132 buffer.p[0 .. 8] = 0; // offset
133 buffer.p[8 .. 12] = nativeToBigEndian(messageSize);
134 buffer.p += 12; // skip header
135 auto crcPtr = buffer.p;
136 buffer.p[4] = 0; // MagicByte
137 buffer.p[5] = cast(ubyte)compression; // Attributes
138 buffer.p[6 .. 10] = nativeToBigEndian(keySize);
139 buffer.p += 10 + (keySize > 0 ? keySize : 0);
140 buffer.p[0 .. 4] = nativeToBigEndian(valueSize);
141 buffer.p += 4 + (valueSize > 0 ? valueSize : 0);
142 assert((buffer.p - crcPtr) == messageSize);
143 ubyte[4] computedCrc = crc32Of(crcPtr[4 .. messageSize]); // start from 4th byte to skip the crc field
144 crcPtr[0 .. 4] = nativeToBigEndian(*(cast(uint*)&computedCrc)); // set crc field
145 }
146
147 private void returnCurrentBuffer() {
148 import std.algorithm, std.digest.crc, vibe.core.log;
149 if (m_compression != Compression.None) {
150 // Offset + MessageSize + Crc + MagicByte + Attributes + Key length + Value length
151 enum int headersLen = 8 + 4 + 4 + 1 + 1 + 4 + 4;
152 // maximum compressed data size, leaving room for the message headers
153 int maxCompressedSize = m_client.config.producerMaxBytes - headersLen;
154
155 void bufferCompressed(int compressedLen) {
156 int messageSize = 4 + 1 + 1 + 4 + 4 + compressedLen;
157 m_compressionBuffer.rewind();
158 setupMessageHeaders(m_compressionBuffer, messageSize, -1, compressedLen, m_compression);
159 swap(m_currentBuffer, m_compressionBuffer);
160 }
161
162 switch (m_compression) {
163 case Compression.GZIP:
164 m_zlibContext.next_in = m_currentBuffer.buffer;
165 m_zlibContext.avail_in = cast(uint)m_currentBuffer.filled;
166 m_zlibContext.next_out = m_compressionBuffer.buffer + headersLen;
167 m_zlibContext.avail_out = maxCompressedSize;
168 int r = deflate(m_zlibContext, Z_FINISH);
169 if (r == Z_STREAM_END) {
170 bufferCompressed(cast(int)m_zlibContext.total_out);
171 } else {
172 // compressed data was bigger than the input, this may occur on uncompressible input data
173 logDebugV("Skipping GZIP compression");
174 }
175 deflateReset(m_zlibContext);
176 break;
177 case Compression.Snappy:
178 size_t outLen = maxCompressedSize;
179 int r = snappy_compress(m_currentBuffer.buffer, m_currentBuffer.filled,
180 m_compressionBuffer.buffer + headersLen, &outLen);
181 if (r == SNAPPY_OK) {
182 bufferCompressed(cast(int)outLen);
183 } else {
184 // compressed data was bigger than the input, this may occur on uncompressible input data
185 logDebugV("Skipping Snappy compression");
186 }
187 break;
188 default: assert(0);
189 }
190 }
191 m_queue.returnBuffer(BufferType.Filled, m_currentBuffer);
192 m_queue.notifyRequestBundler();
193 }
194
195 /// Reserves the space for the next message using specified key and value sizes. This may
196 /// be used to avoid double-copy. Instead of filling the user's buffer and passing it to
197 /// pushMessage(), the user may call the reserveMessage() and fill the data directly in
198 /// the internal buffer using reservedKey and reservedValue slices. This function is used
199 /// internally by pushMessage() function.
200 /// Params:
201 /// keySize = number of bytes to reserve for the key, -1 for null key
202 /// valueSize = number of bytes to reserve for the value, -1 for the null value
203 /// See_Also:
204 /// commitMessage
205 void reserveMessage(int keySize, int valueSize) {
206 assert(keySize >= -1);
207 assert(valueSize >= -1);
208 // Crc + MagicByte + Attributes + KeyLength + ValueLength
209 int messageSize = 4 + 1 + 1 + 4 + 4;
210 if (keySize > 0)
211 messageSize += keySize;
212 if (valueSize > 0)
213 messageSize += valueSize;
214 // Offset + MessageSize
215 size_t messageSetOverhead = 8 + 4 + messageSize;
216 enforce(messageSetOverhead <= m_client.config.consumerMaxBytes,
217 "Message is too big to fit into message set");
218
219 synchronized (m_queue.mutex) {
220 if (!m_currentBuffer) {
221 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free);
222 m_batchStarted = false;
223 m_isFirstMessage = true;
224 } else if (m_currentBuffer.remaining < messageSetOverhead) {
225 returnCurrentBuffer();
226 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free);
227 m_batchStarted = false;
228 m_isFirstMessage = true;
229 }
230 m_bufferReserved = true;
231 }
232
233 auto p = m_currentBuffer.p + 22;
234 m_reservedKey = keySize > 0 ? p[0 .. keySize] : null;
235 p += m_reservedKey.length + 4;
236 m_reservedValue = valueSize > 0 ? p[0 .. valueSize] : null;
237 assert(p + m_reservedValue.length <= m_currentBuffer.end);
238
239 m_messageSize = messageSize;
240 m_keySize = keySize;
241 m_valueSize = valueSize;
242 }
243
244 /// Commits previously reserved space and builds up the message
245 /// See_Also:
246 /// reserveMessage
247 void commitMessage() {
248 Exception ex = m_queue.exception;
249 if (ex)
250 throw ex;
251
252 setupMessageHeaders(m_currentBuffer, m_messageSize, m_keySize, m_valueSize, Compression.None);
253
254 if (m_isFirstMessage) {
255 m_isFirstMessage = false;
256 synchronized (m_queue.mutex) {
257 m_batchStarted = true;
258 m_batchDeadline = MonoTime.currTime + m_client.config.producerBatchTimeout.msecs;
259 }
260 }
261 m_bufferReserved = false;
262 m_batchCondition.notify();
263 }
264
265 /// Pushes the message to the cluster
266 /// Params:
267 /// key = the message key, may be null
268 /// value = the message value, may be null
269 void pushMessage(const(ubyte)[] key, const(ubyte)[] value) {
270 int keySize = key == null ? -1 : cast(int)key.length;
271 int valueSize = value == null ? -1 : cast(int)value.length;
272 import vibe.core.log;
273 logTrace("keySize: %d, valueSize: %d", keySize, valueSize);
274 reserveMessage(keySize, valueSize);
275 logTrace("m_keySize: %d, m_valueSize: %d", m_reservedKey.length, m_reservedValue.length);
276 if (key.length)
277 m_reservedKey[] = key; // copy into buffer
278 if (value.length)
279 m_reservedValue[] = value; // copy into buffer
280 commitMessage();
281 }
282 }