1 module kafkad.consumer;
2 
3 import kafkad.client;
4 import kafkad.exception;
5 import kafkad.queue;
6 import kafkad.worker;
7 import kafkad.utils.snappy;
8 import etc.c.zlib;
9 import std.algorithm : swap;
10 import std.exception;
11 import core.atomic;
12 
13 alias Offset = long;
14 enum Offsets : Offset { Latest = -1, Earliest = -2 }
15 
16 /// Message returned by the consumer
17 struct Message {
18     /// Message offset in the log
19     long offset;
20     /// The key of the message, may be null
21     ubyte[] key;
22     /// The value of the message, may be null
23     ubyte[] value;
24 }
25 
26 class Consumer : IWorker {
27     private {
28         Client m_client;
29         string m_topic;
30         int m_partition;
31         Queue m_queue;
32         QueueBuffer* m_currentBuffer, m_compressionBuffer;
33         bool m_isDecompressedBuffer;
34         z_stream m_zlibContext;
35         Message m_message;
36     }
37 
38     package (kafkad) {
39         @property queue() { return m_queue; }
40     }
41 
42     /// Throws an exception in the consumer task. This is used to pass the connection exceptions to the user.
43     void throwException(Exception ex) {
44         m_queue.exception = ex;
45         m_queue.condition.notify();
46     }
47 
48     @property string topic() { return m_topic; }
49     @property int partition() { return m_partition; }
50 
51     @property WorkerType workerType() { return WorkerType.Consumer; }
52 
53     this(Client client, string topic, int partition, Offset startingOffset) {
54         enforce(startingOffset >= -2);
55         m_client = client;
56         m_topic = topic;
57         m_partition = partition;
58         m_queue = new Queue(this, m_client.config.consumerQueueBuffers, m_client.config.consumerMaxBytes);
59         m_queue.offset = startingOffset;
60         m_currentBuffer = null;
61         m_compressionBuffer = new QueueBuffer(m_client.config.consumerMaxBytes);
62         m_isDecompressedBuffer = false;
63 
64         m_zlibContext.next_in = null;
65         m_zlibContext.avail_in = 0;
66         m_zlibContext.zalloc = null;
67         m_zlibContext.zfree = null;
68         m_zlibContext.opaque = null;
69         enforce(inflateInit2(&m_zlibContext, 15 + 32) == Z_OK);
70         scope (failure)
71             inflateEnd(&m_zlibContext);
72 
73         client.addNewConsumer(this);
74         popFront(); // seed message
75     }
76 
77     private void swapBuffers(bool isDecompressedBuffer) {
78         swap(m_currentBuffer, m_compressionBuffer);
79         m_isDecompressedBuffer = isDecompressedBuffer;
80     }
81 
82     Message front()
83     {
84         return m_message;
85     }
86 
87     void popFront()
88     {
89         m_message = fetchMessage();
90     }
91 
92     enum empty = false;
93 
94     // TODO: make private
95     Message getMessage() {
96         auto m = front;
97         popFront;
98         return m;
99     }
100 
101 private:
102     Message fetchMessage() {
103         if (!m_currentBuffer) {
104             synchronized (m_queue.mutex) {
105                 m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled);
106             }
107         }
108     processBuffer:
109         if (m_currentBuffer.messageSetSize > 12 /* Offset + Message Size */) {
110             import std.bitmanip, std.digest.crc;
111 
112             long offset = bigEndianToNative!long(m_currentBuffer.p[0 .. 8]);
113             int messageSize = bigEndianToNative!int(m_currentBuffer.p[8 .. 12]);
114             m_currentBuffer.p += 12;
115             m_currentBuffer.messageSetSize -= 12;
116             if (m_currentBuffer.messageSetSize >= messageSize) {
117                 // we got full message here
118                 void skipMessage() {
119                     m_currentBuffer.p += messageSize;
120                     m_currentBuffer.messageSetSize -= messageSize;
121                 }
122                 if (offset < m_currentBuffer.requestedOffset) {
123                     // In general, the return messages will have offsets larger than or equal
124                     // to the starting offset. However, with compressed messages, it's possible for the returned
125                     // messages to have offsets smaller than the starting offset. The number of such messages is
126                     // typically small and the caller is responsible for filtering out those messages.
127                     skipMessage();
128                     goto processBuffer;
129                 }
130                 uint messageCrc = bigEndianToNative!uint(m_currentBuffer.p[0 .. 4]);
131                 // check remainder bytes with CRC32 and compare
132                 ubyte[4] computedCrc = crc32Of(m_currentBuffer.p[4 .. messageSize]);
133                 if (*cast(uint*)&computedCrc != messageCrc) {
134                     // handle CRC error
135                     skipMessage();
136                     throw new CrcException("Invalid message checksum");
137                 }
138                 byte magicByte = m_currentBuffer.p[4];
139                 enforce(magicByte == 0);
140                 byte attributes = m_currentBuffer.p[5];
141                 int keyLen = bigEndianToNative!int(m_currentBuffer.p[6 .. 10]);
142                 ubyte[] key = null;
143                 if (keyLen >= 0) {
144                     // 14 = crc(4) + magicByte(1) + attributes(1) + keyLen(4) + valueLen(4)
145                     enforce(keyLen <= messageSize - 14);
146                     key = m_currentBuffer.p[10 .. 10 + keyLen];
147                 }
148                 auto pValue = m_currentBuffer.p + 10 + key.length;
149                 int valueLen = bigEndianToNative!int(pValue[0 .. 4]);
150                 ubyte[] value = null;
151                 if (valueLen >= 0) {
152                     enforce(valueLen <= messageSize - 14 - key.length);
153                     pValue += 4;
154                     value = pValue[0 .. valueLen];
155                 }
156 
157                 byte compression = attributes & 3;
158                 if (compression != 0) {
159                     enforce(!m_isDecompressedBuffer, new ProtocolException("Recursive compression is not supported"));
160                     enforce(value.length);
161                     switch (compression) {
162                         case Compression.GZIP:
163                             inflateReset(&m_zlibContext);
164                             m_zlibContext.next_in = value.ptr;
165                             m_zlibContext.avail_in = cast(uint)value.length;
166                             m_zlibContext.next_out = m_compressionBuffer.buffer;
167                             m_zlibContext.avail_out = m_client.config.consumerMaxBytes;
168                             int r = inflate(&m_zlibContext, Z_FINISH);
169                             if (r == Z_STREAM_END) {
170                                 m_compressionBuffer.rewind();
171                                 m_compressionBuffer.messageSetSize = m_zlibContext.total_out;
172                             } else {
173                                 if (m_zlibContext.avail_out)
174                                     throw new ProtocolException("GZIP decompressed message set is too big to fit into the buffer");
175                                 else
176                                     throw new ProtocolException("GZIP could not decompress the message set");
177                             }
178                             break;
179                         case Compression.Snappy:
180                             size_t outLen = m_client.config.consumerMaxBytes;
181                             int r = snappy_java_uncompress(value.ptr, value.length, m_compressionBuffer.buffer, &outLen);
182                             if (r == SNAPPY_OK) {
183                                 m_compressionBuffer.rewind();
184                                 m_compressionBuffer.messageSetSize = outLen;
185                             } else {
186                                 if (r == SNAPPY_BUFFER_TOO_SMALL)
187                                     throw new ProtocolException("Snappy decompressed message set is too big to fit into the buffer");
188                                 else
189                                     throw new ProtocolException("Snappy could not decompress the message set");
190                             }
191                             break;
192                         default: throw new ProtocolException("Unsupported compression type");
193                     }
194 
195                     m_compressionBuffer.requestedOffset = m_currentBuffer.requestedOffset;
196                     skipMessage();
197                     swapBuffers(true);
198                     goto processBuffer;
199                 } else {
200                     // no compression, just return the message
201                     skipMessage();
202                     return Message(offset, key, value);
203                 }
204             } else {
205                 // this is the last, partial message, skip it
206                 if (m_isDecompressedBuffer) {
207                     swapBuffers(false);
208                     goto processBuffer;
209                 }
210                 synchronized (m_queue.mutex) {
211                     m_queue.returnBuffer(BufferType.Free, m_currentBuffer);
212                     m_queue.notifyRequestBundler();
213                     m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled);
214                 }
215                 goto processBuffer;
216             }
217         } else {
218             // no more messages, get next buffer
219             if (m_isDecompressedBuffer) {
220                 swapBuffers(false);
221                 goto processBuffer;
222             }
223             synchronized (m_queue.mutex) {
224                 m_queue.returnBuffer(BufferType.Free, m_currentBuffer);
225                 m_queue.notifyRequestBundler();
226                 m_currentBuffer = m_queue.waitForBuffer(BufferType.Filled);
227             }
228             goto processBuffer;
229         }
230     }
231 }