1 module kafkad.consumer; 2 3 import kafkad.client; 4 import kafkad.exception; 5 import kafkad.queue; 6 import kafkad.worker; 7 import kafkad.utils.snappy; 8 import etc.c.zlib; 9 import std.algorithm : swap; 10 import std.exception; 11 import core.atomic; 12 13 alias Offset = long; 14 enum Offsets : Offset { Latest = -1, Earliest = -2 } 15 16 /// Message returned by the consumer 17 struct Message { 18 /// Message offset in the log 19 long offset; 20 /// The key of the message, may be null 21 ubyte[] key; 22 /// The value of the message, may be null 23 ubyte[] value; 24 } 25 26 class Consumer : IWorker { 27 private { 28 Client m_client; 29 string m_topic; 30 int m_partition; 31 Queue m_queue; 32 QueueBuffer* m_currentBuffer, m_compressionBuffer; 33 bool m_isDecompressedBuffer; 34 z_stream m_zlibContext; 35 Message m_message; 36 } 37 38 package (kafkad) { 39 @property queue() { return m_queue; } 40 } 41 42 /// Throws an exception in the consumer task. This is used to pass the connection exceptions to the user. 43 void throwException(Exception ex) { 44 m_queue.exception = ex; 45 m_queue.condition.notify(); 46 } 47 48 @property string topic() { return m_topic; } 49 @property int partition() { return m_partition; } 50 51 @property WorkerType workerType() { return WorkerType.Consumer; } 52 53 this(Client client, string topic, int partition, Offset startingOffset) { 54 enforce(startingOffset >= -2); 55 m_client = client; 56 m_topic = topic; 57 m_partition = partition; 58 m_queue = new Queue(this, m_client.config.consumerQueueBuffers, m_client.config.consumerMaxBytes); 59 m_queue.offset = startingOffset; 60 m_currentBuffer = null; 61 m_compressionBuffer = new QueueBuffer(m_client.config.consumerMaxBytes); 62 m_isDecompressedBuffer = false; 63 64 m_zlibContext.next_in = null; 65 m_zlibContext.avail_in = 0; 66 m_zlibContext.zalloc = null; 67 m_zlibContext.zfree = null; 68 m_zlibContext.opaque = null; 69 enforce(inflateInit2(&m_zlibContext, 15 + 32) == Z_OK); 70 scope (failure) 71 inflateEnd(&m_zlibContext); 72 73 client.addNewConsumer(this); 74 popFront(); // seed message 75 } 76 77 private void swapBuffers(bool isDecompressedBuffer) { 78 swap(m_currentBuffer, m_compressionBuffer); 79 m_isDecompressedBuffer = isDecompressedBuffer; 80 } 81 82 Message front() 83 { 84 return m_message; 85 } 86 87 void popFront() 88 { 89 m_message = fetchMessage(); 90 } 91 92 enum empty = false; 93 94 // TODO: make private 95 Message getMessage() { 96 auto m = front; 97 popFront; 98 return m; 99 } 100 101 private: 102 Message fetchMessage() { 103 if (!m_currentBuffer) { 104 synchronized (m_queue.mutex) { 105 m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled); 106 } 107 } 108 processBuffer: 109 if (m_currentBuffer.messageSetSize > 12 /* Offset + Message Size */) { 110 import std.bitmanip, std.digest.crc; 111 112 long offset = bigEndianToNative!long(m_currentBuffer.p[0 .. 8]); 113 int messageSize = bigEndianToNative!int(m_currentBuffer.p[8 .. 12]); 114 m_currentBuffer.p += 12; 115 m_currentBuffer.messageSetSize -= 12; 116 if (m_currentBuffer.messageSetSize >= messageSize) { 117 // we got full message here 118 void skipMessage() { 119 m_currentBuffer.p += messageSize; 120 m_currentBuffer.messageSetSize -= messageSize; 121 } 122 if (offset < m_currentBuffer.requestedOffset) { 123 // In general, the return messages will have offsets larger than or equal 124 // to the starting offset. However, with compressed messages, it's possible for the returned 125 // messages to have offsets smaller than the starting offset. The number of such messages is 126 // typically small and the caller is responsible for filtering out those messages. 127 skipMessage(); 128 goto processBuffer; 129 } 130 uint messageCrc = bigEndianToNative!uint(m_currentBuffer.p[0 .. 4]); 131 // check remainder bytes with CRC32 and compare 132 ubyte[4] computedCrc = crc32Of(m_currentBuffer.p[4 .. messageSize]); 133 if (*cast(uint*)&computedCrc != messageCrc) { 134 // handle CRC error 135 skipMessage(); 136 throw new CrcException("Invalid message checksum"); 137 } 138 byte magicByte = m_currentBuffer.p[4]; 139 enforce(magicByte == 0); 140 byte attributes = m_currentBuffer.p[5]; 141 int keyLen = bigEndianToNative!int(m_currentBuffer.p[6 .. 10]); 142 ubyte[] key = null; 143 if (keyLen >= 0) { 144 // 14 = crc(4) + magicByte(1) + attributes(1) + keyLen(4) + valueLen(4) 145 enforce(keyLen <= messageSize - 14); 146 key = m_currentBuffer.p[10 .. 10 + keyLen]; 147 } 148 auto pValue = m_currentBuffer.p + 10 + key.length; 149 int valueLen = bigEndianToNative!int(pValue[0 .. 4]); 150 ubyte[] value = null; 151 if (valueLen >= 0) { 152 enforce(valueLen <= messageSize - 14 - key.length); 153 pValue += 4; 154 value = pValue[0 .. valueLen]; 155 } 156 157 byte compression = attributes & 3; 158 if (compression != 0) { 159 enforce(!m_isDecompressedBuffer, new ProtocolException("Recursive compression is not supported")); 160 enforce(value.length); 161 switch (compression) { 162 case Compression.GZIP: 163 inflateReset(&m_zlibContext); 164 m_zlibContext.next_in = value.ptr; 165 m_zlibContext.avail_in = cast(uint)value.length; 166 m_zlibContext.next_out = m_compressionBuffer.buffer; 167 m_zlibContext.avail_out = m_client.config.consumerMaxBytes; 168 int r = inflate(&m_zlibContext, Z_FINISH); 169 if (r == Z_STREAM_END) { 170 m_compressionBuffer.rewind(); 171 m_compressionBuffer.messageSetSize = m_zlibContext.total_out; 172 } else { 173 if (m_zlibContext.avail_out) 174 throw new ProtocolException("GZIP decompressed message set is too big to fit into the buffer"); 175 else 176 throw new ProtocolException("GZIP could not decompress the message set"); 177 } 178 break; 179 case Compression.Snappy: 180 size_t outLen = m_client.config.consumerMaxBytes; 181 int r = snappy_java_uncompress(value.ptr, value.length, m_compressionBuffer.buffer, &outLen); 182 if (r == SNAPPY_OK) { 183 m_compressionBuffer.rewind(); 184 m_compressionBuffer.messageSetSize = outLen; 185 } else { 186 if (r == SNAPPY_BUFFER_TOO_SMALL) 187 throw new ProtocolException("Snappy decompressed message set is too big to fit into the buffer"); 188 else 189 throw new ProtocolException("Snappy could not decompress the message set"); 190 } 191 break; 192 default: throw new ProtocolException("Unsupported compression type"); 193 } 194 195 m_compressionBuffer.requestedOffset = m_currentBuffer.requestedOffset; 196 skipMessage(); 197 swapBuffers(true); 198 goto processBuffer; 199 } else { 200 // no compression, just return the message 201 skipMessage(); 202 return Message(offset, key, value); 203 } 204 } else { 205 // this is the last, partial message, skip it 206 if (m_isDecompressedBuffer) { 207 swapBuffers(false); 208 goto processBuffer; 209 } 210 synchronized (m_queue.mutex) { 211 m_queue.returnBuffer(BufferType.Free, m_currentBuffer); 212 m_queue.notifyRequestBundler(); 213 m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled); 214 } 215 goto processBuffer; 216 } 217 } else { 218 // no more messages, get next buffer 219 if (m_isDecompressedBuffer) { 220 swapBuffers(false); 221 goto processBuffer; 222 } 223 synchronized (m_queue.mutex) { 224 m_queue.returnBuffer(BufferType.Free, m_currentBuffer); 225 m_queue.notifyRequestBundler(); 226 m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled); 227 } 228 goto processBuffer; 229 } 230 } 231 }