1 module kafkad.producer; 2 3 import kafkad.client; 4 import kafkad.exception; 5 import kafkad.queue; 6 import kafkad.worker; 7 import kafkad.utils.snappy; 8 import etc.c.zlib; 9 import std.exception; 10 import core.time; 11 import vibe.core.core; 12 import vibe.core.sync; 13 14 class Producer : IWorker { 15 enum __isWeakIsolatedType = true; // needed to pass this type between vibe.d's tasks 16 private { 17 Client m_client; 18 string m_topic; 19 int m_partition; 20 Queue m_queue; 21 QueueBuffer* m_currentBuffer; 22 TaskCondition m_batchCondition; 23 bool m_batchStarted, m_isFirstMessage, m_bufferReserved; 24 MonoTime m_batchDeadline; 25 Compression m_compression; 26 QueueBuffer* m_compressionBuffer; 27 z_streamp m_zlibContext; 28 29 int m_messageSize; 30 int m_keySize, m_valueSize; 31 ubyte[] m_reservedKey, m_reservedValue; 32 } 33 34 package (kafkad) { 35 @property queue() { return m_queue; } 36 } 37 38 /// Throws an exception in the producer task. This is used to pass the connection exceptions to the user. 39 void throwException(Exception ex) { 40 m_queue.exception = ex; 41 m_queue.condition.notify(); 42 } 43 44 @property string topic() { return m_topic; } 45 @property int partition() { return m_partition; } 46 47 @property WorkerType workerType() { return WorkerType.Producer; } 48 49 /// Slice from the internal buffer for the key. Note that reserveMessage() must be called first 50 @property ubyte[] reservedKey() { assert(m_bufferReserved); return m_reservedKey; } 51 /// Slice from the internal buffer for the value. Note that reserveMessage() must be called first 52 @property ubyte[] reservedValue() { assert(m_bufferReserved); return m_reservedValue; } 53 54 /// Params: 55 /// client = the client instance 56 /// topic = producer topic 57 /// partition = producer partition 58 /// compression = the compression type, it uses config.producerCompression by default 59 /// compressionLevel = valid only for GZIP compression, compression level between 60 /// 1 and 9: 1 gives best speed, 9 gives best compression, it uses 61 /// config.producerCompressionLevel by default 62 this(Client client, string topic, int partition, Compression compression, int compressionLevel) { 63 m_client = client; 64 m_topic = topic; 65 m_partition = partition; 66 m_queue = new Queue(this, m_client.config.producerQueueBuffers, m_client.config.producerMaxBytes); 67 m_currentBuffer = null; 68 m_batchCondition = new TaskCondition(m_queue.mutex); 69 m_batchStarted = false; 70 m_compression = compression; 71 m_compressionBuffer = m_compression != Compression.None ? new QueueBuffer(m_client.config.producerMaxBytes) : null; 72 if (m_compression == Compression.GZIP) { 73 m_zlibContext = new z_stream; 74 m_zlibContext.zalloc = null; 75 m_zlibContext.zfree = null; 76 m_zlibContext.opaque = null; 77 enforce(deflateInit2(m_zlibContext, compressionLevel, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) == Z_OK); 78 } 79 scope (failure) if (m_compression == Compression.GZIP) 80 deflateEnd(m_zlibContext); 81 82 client.addNewProducer(this); 83 runTask(&batcherMain); 84 } 85 86 /// Params: 87 /// client = the client instance 88 /// topic = producer topic 89 /// partition = producer partition 90 /// compression = the compression type, it uses config.producerCompression by default 91 this(Client client, string topic, int partition, Compression compression) { 92 this(client, topic, partition, compression, client.config.producerCompressionLevel); 93 } 94 95 /// Params: 96 /// client = the client instance 97 /// topic = producer topic 98 /// partition = producer partition 99 this(Client client, string topic, int partition) { 100 this(client, topic, partition, client.config.producerCompression); 101 } 102 103 ~this() { 104 if (m_compression == Compression.GZIP) 105 deflateEnd(m_zlibContext); 106 } 107 108 private void batcherMain() { 109 for (;;) { 110 MonoTime currTime = MonoTime.currTime; 111 synchronized (m_queue.mutex) { 112 while (!m_batchStarted || currTime < m_batchDeadline || m_bufferReserved) { 113 if (m_batchStarted && currTime < m_batchDeadline) { 114 Duration remaining = m_batchDeadline - currTime; 115 m_batchCondition.wait(remaining); 116 currTime = MonoTime.currTime; 117 } else { 118 m_batchCondition.wait(); 119 currTime = MonoTime.currTime; 120 } 121 } 122 // perform batch 123 returnCurrentBuffer(); 124 m_currentBuffer = null; 125 m_batchStarted = false; 126 } 127 } 128 } 129 130 private static void setupMessageHeaders(QueueBuffer* buffer, int messageSize, int keySize, int valueSize, Compression compression) { 131 import std.bitmanip, std.digest.crc; 132 buffer.p[0 .. 8] = 0; // offset 133 buffer.p[8 .. 12] = nativeToBigEndian(messageSize); 134 buffer.p += 12; // skip header 135 auto crcPtr = buffer.p; 136 buffer.p[4] = 0; // MagicByte 137 buffer.p[5] = cast(ubyte)compression; // Attributes 138 buffer.p[6 .. 10] = nativeToBigEndian(keySize); 139 buffer.p += 10 + (keySize > 0 ? keySize : 0); 140 buffer.p[0 .. 4] = nativeToBigEndian(valueSize); 141 buffer.p += 4 + (valueSize > 0 ? valueSize : 0); 142 assert((buffer.p - crcPtr) == messageSize); 143 ubyte[4] computedCrc = crc32Of(crcPtr[4 .. messageSize]); // start from 4th byte to skip the crc field 144 crcPtr[0 .. 4] = nativeToBigEndian(*(cast(uint*)&computedCrc)); // set crc field 145 } 146 147 private void returnCurrentBuffer() { 148 import std.algorithm, std.digest.crc, vibe.core.log; 149 if (m_compression != Compression.None) { 150 // Offset + MessageSize + Crc + MagicByte + Attributes + Key length + Value length 151 enum int headersLen = 8 + 4 + 4 + 1 + 1 + 4 + 4; 152 // maximum compressed data size, leaving room for the message headers 153 int maxCompressedSize = m_client.config.producerMaxBytes - headersLen; 154 155 void bufferCompressed(int compressedLen) { 156 int messageSize = 4 + 1 + 1 + 4 + 4 + compressedLen; 157 m_compressionBuffer.rewind(); 158 setupMessageHeaders(m_compressionBuffer, messageSize, -1, compressedLen, m_compression); 159 swap(m_currentBuffer, m_compressionBuffer); 160 } 161 162 switch (m_compression) { 163 case Compression.GZIP: 164 m_zlibContext.next_in = m_currentBuffer.buffer; 165 m_zlibContext.avail_in = cast(uint)m_currentBuffer.filled; 166 m_zlibContext.next_out = m_compressionBuffer.buffer + headersLen; 167 m_zlibContext.avail_out = maxCompressedSize; 168 int r = deflate(m_zlibContext, Z_FINISH); 169 if (r == Z_STREAM_END) { 170 bufferCompressed(cast(int)m_zlibContext.total_out); 171 } else { 172 // compressed data was bigger than the input, this may occur on uncompressible input data 173 logDebugV("Skipping GZIP compression"); 174 } 175 deflateReset(m_zlibContext); 176 break; 177 case Compression.Snappy: 178 size_t outLen = maxCompressedSize; 179 int r = snappy_compress(m_currentBuffer.buffer, m_currentBuffer.filled, 180 m_compressionBuffer.buffer + headersLen, &outLen); 181 if (r == SNAPPY_OK) { 182 bufferCompressed(cast(int)outLen); 183 } else { 184 // compressed data was bigger than the input, this may occur on uncompressible input data 185 logDebugV("Skipping Snappy compression"); 186 } 187 break; 188 default: assert(0); 189 } 190 } 191 m_queue.returnBuffer(BufferType.Filled, m_currentBuffer); 192 m_queue.notifyRequestBundler(); 193 } 194 195 /// Reserves the space for the next message using specified key and value sizes. This may 196 /// be used to avoid double-copy. Instead of filling the user's buffer and passing it to 197 /// pushMessage(), the user may call the reserveMessage() and fill the data directly in 198 /// the internal buffer using reservedKey and reservedValue slices. This function is used 199 /// internally by pushMessage() function. 200 /// Params: 201 /// keySize = number of bytes to reserve for the key, -1 for null key 202 /// valueSize = number of bytes to reserve for the value, -1 for the null value 203 /// See_Also: 204 /// commitMessage 205 void reserveMessage(int keySize, int valueSize) { 206 assert(keySize >= -1); 207 assert(valueSize >= -1); 208 // Crc + MagicByte + Attributes + KeyLength + ValueLength 209 int messageSize = 4 + 1 + 1 + 4 + 4; 210 if (keySize > 0) 211 messageSize += keySize; 212 if (valueSize > 0) 213 messageSize += valueSize; 214 // Offset + MessageSize 215 size_t messageSetOverhead = 8 + 4 + messageSize; 216 enforce(messageSetOverhead <= m_client.config.consumerMaxBytes, 217 "Message is too big to fit into message set"); 218 219 synchronized (m_queue.mutex) { 220 if (!m_currentBuffer) { 221 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free); 222 m_batchStarted = false; 223 m_isFirstMessage = true; 224 } else if (m_currentBuffer.remaining < messageSetOverhead) { 225 returnCurrentBuffer(); 226 m_currentBuffer = m_queue.waitForBuffer(BufferType.Free); 227 m_batchStarted = false; 228 m_isFirstMessage = true; 229 } 230 m_bufferReserved = true; 231 } 232 233 auto p = m_currentBuffer.p + 22; 234 m_reservedKey = keySize > 0 ? p[0 .. keySize] : null; 235 p += m_reservedKey.length + 4; 236 m_reservedValue = valueSize > 0 ? p[0 .. valueSize] : null; 237 assert(p + m_reservedValue.length <= m_currentBuffer.end); 238 239 m_messageSize = messageSize; 240 m_keySize = keySize; 241 m_valueSize = valueSize; 242 } 243 244 /// Commits previously reserved space and builds up the message 245 /// See_Also: 246 /// reserveMessage 247 void commitMessage() { 248 Exception ex = m_queue.exception; 249 if (ex) 250 throw ex; 251 252 setupMessageHeaders(m_currentBuffer, m_messageSize, m_keySize, m_valueSize, Compression.None); 253 254 if (m_isFirstMessage) { 255 m_isFirstMessage = false; 256 synchronized (m_queue.mutex) { 257 m_batchStarted = true; 258 m_batchDeadline = MonoTime.currTime + m_client.config.producerBatchTimeout.msecs; 259 } 260 } 261 m_bufferReserved = false; 262 m_batchCondition.notify(); 263 } 264 265 /// Pushes the message to the cluster 266 /// Params: 267 /// key = the message key, may be null 268 /// value = the message value, may be null 269 void pushMessage(const(ubyte)[] key, const(ubyte)[] value) { 270 int keySize = key == null ? -1 : cast(int)key.length; 271 int valueSize = value == null ? -1 : cast(int)value.length; 272 import vibe.core.log; 273 logTrace("keySize: %d, valueSize: %d", keySize, valueSize); 274 reserveMessage(keySize, valueSize); 275 logTrace("m_keySize: %d, m_valueSize: %d", m_reservedKey.length, m_reservedValue.length); 276 if (key.length) 277 m_reservedKey[] = key; // copy into buffer 278 if (value.length) 279 m_reservedValue[] = value; // copy into buffer 280 commitMessage(); 281 } 282 }