1 module kafkad.protocol.serializer;
2 
3 import core.stdc.string;
4 import vibe.core.net;
5 import kafkad.protocol.common;
6 import kafkad.config;
7 import kafkad.exception;
8 import kafkad.bundler;
9 import kafkad.queue;
10 
11 /* serialize data up to ChunkSize, this is not zero-copy unfortunately, as vibe.d's drivers and kernel may do
12  * buffering on their own, however, it should minimize the overhead of many, small write() calls to the driver */
13 
14 struct Serializer {
15     private {
16         ubyte* chunk, p, end;
17         TCPConnection stream;
18         size_t chunkSize;
19     }
20 
21     this(TCPConnection stream, size_t chunkSize) {
22         chunk = cast(ubyte*)enforce(GC.malloc(chunkSize, GC.BlkAttr.NO_SCAN));
23         p = chunk;
24         end = chunk + chunkSize;
25         this.stream = stream;
26         this.chunkSize = chunkSize;
27     }
28 
29     void flush() {
30         if (p - chunk) {
31             stream.write(chunk[0 .. p - chunk]).rethrow!StreamException("Serializer.flush() failed");
32             p = chunk;
33         }
34     }
35 
36     void check(size_t needed) {
37         if (end - p < needed)
38             flush();
39     }
40 
41     void serialize(byte s) {
42         check(1);
43         *p++ = s;
44     }
45 
46     void serialize(T)(T s)
47         if (is(T == short) || is(T == int) || is(T == long))
48     {
49         check(T.sizeof);
50         version (LittleEndian)
51             s = swapEndian(s);
52         auto pt = cast(T*)p;
53         *pt++ = s;
54         p = cast(ubyte*)pt;
55     }
56 
57     void serializeSlice(const(ubyte)[] s) {
58         if (p - chunk) {
59             auto rem = end - p;
60             auto toCopy = min(rem, s.length);
61             core.stdc..string.memcpy(p, s.ptr, toCopy);
62             p += toCopy;
63             s = s[toCopy .. $];
64             if (p == end)
65                 flush();
66         }
67         if (s.length) {
68             if (s.length >= chunkSize) {
69                 stream.write(s).rethrow!StreamException("Serializer.serializeSlice() failed");
70             } else {
71                 core.stdc..string.memcpy(chunk, s.ptr, s.length);
72                 p = chunk + s.length;
73             }
74         }
75     }
76 
77     void serialize(string s) {
78         assert(s.length <= short.max, "UTF8 string must not be longer than 32767 bytes");
79         serialize(cast(short)s.length);
80         serializeSlice(cast(ubyte[])s);
81     }
82 
83     void serialize(const(ubyte)[] s) {
84         assert(s.length <= int.max, "Byte array must not be larger than 4 GB"); // just in case
85         serialize(cast(int)s.length);
86         serializeSlice(s);
87     }
88 
89     private void arrayLength(size_t length) {
90         assert(length <= int.max, "Arrays must not be longer that 2^31 items"); // just in case, maybe set some configurable (and saner) limits?
91         serialize(cast(int)length);
92     }
93 
94     void serialize(T)(T[] s)
95         if (!is(T == ubyte) && !is(T == char))
96     {
97         serialize!int( cast(int) s.length);
98         foreach (ref a; s)
99             serialize(a);
100     }
101 
102     void serialize(T)(ref T s)
103         if (is(T == struct))
104     {
105         alias Names = FieldNameTuple!T;
106         foreach (N; Names)
107             serialize(__traits(getMember, s, N));
108     }
109 
110     private void request(size_t size, ApiKey apiKey, short apiVersion, int correlationId, string clientId) {
111         size += 2 + 2 + 4 + stringSize(clientId);
112         serialize(cast(int)size);
113         serialize(cast(short)apiKey);
114         serialize(apiVersion);
115         serialize(correlationId);
116         serialize(clientId);
117     }
118 
119     private enum arrayOverhead = 4; // int32
120     private auto stringSize(string s) { return 2 + s.length; } // int16 plus string
121 
122     // version 0
123     void metadataRequest_v0(int correlationId, string clientId, string[] topics) {
124         auto size = arrayOverhead;
125         foreach (t; topics)
126             size += stringSize(t);
127         request(size, ApiKey.MetadataRequest, 0, correlationId, clientId);
128         arrayLength(topics.length);
129         foreach (t; topics)
130             serialize(t);
131     }
132 
133     // version 0
134     void fetchRequest_v0(int correlationId, string clientId, in Configuration config, RequestBundler requestBundler) {
135         size_t topics = 0;
136         auto size = 4 + 4 + 4 + arrayOverhead;
137         Topic* t = requestBundler.requestTopicsFront;
138         while (t) {
139             size += stringSize(t.topic) + arrayOverhead + t.partitionsInRequest * (4 + 8 + 4);
140             ++topics;
141             t = t.next;
142         }
143         request(size, ApiKey.FetchRequest, 0, correlationId, clientId);
144         serialize!int(-1); // ReplicaId
145         serialize!int(config.consumerMaxWaitTime); // MaxWaitTime
146         serialize!int(config.consumerMinBytes); // MinBytes
147         arrayLength(topics);
148         t = requestBundler.requestTopicsFront;
149         while (t) {
150             serialize(t.topic);
151             arrayLength(t.partitionsInRequest);
152             Partition* p = t.requestPartitionsFront;
153             while (p) {
154                 serialize(p.partition);
155                 serialize(p.queue.offset);
156                 serialize!int(config.consumerMaxBytes); // MaxBytes
157                 p = p.next;
158             }
159             t = t.next;
160         }
161     }
162 
163     // version 0
164     void produceRequest_v0(int correlationId, string clientId, in Configuration config, RequestBundler requestBundler) {
165         size_t topics = 0;
166         auto size = 2 + 4 + arrayOverhead;
167         Topic* t = requestBundler.requestTopicsFront;
168         while (t) {
169             size += stringSize(t.topic) + arrayOverhead + t.partitionsInRequest * (4 + 4);
170             Partition* p = t.requestPartitionsFront;
171             while (p) {
172                 synchronized (p.queue.mutex) {
173                     p.buffer = p.queue.getBuffer(BufferType.Filled);
174                 }
175                 size += p.buffer.filled;
176                 p = p.next;
177             }
178             ++topics;
179             t = t.next;
180         }
181         import vibe.core.log; logDebug("produce request size: %d", size);
182         request(size, ApiKey.ProduceRequest, 0, correlationId, clientId);
183         serialize!short(config.producerRequiredAcks); // RequiredAcks
184         serialize!int(config.produceRequestTimeout); // Timeout
185         arrayLength(topics);
186         t = requestBundler.requestTopicsFront;
187         while (t) {
188             serialize(t.topic);
189             arrayLength(t.partitionsInRequest);
190             Partition* p = t.requestPartitionsFront;
191             while (p) {
192                 serialize!int(p.partition);
193                 serialize!int(cast(int)p.buffer.filled);
194                 serializeSlice(p.buffer.filledSlice);
195                 synchronized (p.queue.mutex) {
196                     p.buffer.rewind();
197                     p.queue.returnBuffer(BufferType.Free, p.buffer);
198                     p.queue.condition.notify();
199                 }
200                 p = p.next;
201             }
202             t = t.next;
203         }
204     }
205 
206     // version 0
207     void offsetRequest_v0(int correlationId, string clientId, OffsetRequestParams_v0 params) {
208         auto size = 4 + arrayOverhead;
209         foreach (ref t; params.topics) {
210             size += stringSize(t.topic) + arrayOverhead + t.partitions.length * (4 + 8 + 4);
211         }
212         request(size, ApiKey.OffsetRequest, 0, correlationId, clientId);
213         serialize(params);
214     }
215 }
216 
217 struct OffsetRequestParams_v0 {
218     static struct PartTimeMax {
219         int partition;
220         long time;
221         int maxOffsets;
222     }
223     static struct Topic {
224         string topic;
225         PartTimeMax[] partitions;
226     }
227     int replicaId;
228     Topic[] topics;
229 }