1 module kafkad.protocol.serializer; 2 3 import core.stdc.string; 4 import vibe.core.net; 5 import kafkad.protocol.common; 6 import kafkad.config; 7 import kafkad.exception; 8 import kafkad.bundler; 9 import kafkad.queue; 10 11 /* serialize data up to ChunkSize, this is not zero-copy unfortunately, as vibe.d's drivers and kernel may do 12 * buffering on their own, however, it should minimize the overhead of many, small write() calls to the driver */ 13 14 struct Serializer { 15 private { 16 ubyte* chunk, p, end; 17 TCPConnection stream; 18 size_t chunkSize; 19 } 20 21 this(TCPConnection stream, size_t chunkSize) { 22 chunk = cast(ubyte*)enforce(GC.malloc(chunkSize, GC.BlkAttr.NO_SCAN)); 23 p = chunk; 24 end = chunk + chunkSize; 25 this.stream = stream; 26 this.chunkSize = chunkSize; 27 } 28 29 void flush() { 30 if (p - chunk) { 31 stream.write(chunk[0 .. p - chunk]).rethrow!StreamException("Serializer.flush() failed"); 32 p = chunk; 33 } 34 } 35 36 void check(size_t needed) { 37 if (end - p < needed) 38 flush(); 39 } 40 41 void serialize(byte s) { 42 check(1); 43 *p++ = s; 44 } 45 46 void serialize(T)(T s) 47 if (is(T == short) || is(T == int) || is(T == long)) 48 { 49 check(T.sizeof); 50 version (LittleEndian) 51 s = swapEndian(s); 52 auto pt = cast(T*)p; 53 *pt++ = s; 54 p = cast(ubyte*)pt; 55 } 56 57 void serializeSlice(const(ubyte)[] s) { 58 if (p - chunk) { 59 auto rem = end - p; 60 auto toCopy = min(rem, s.length); 61 core.stdc..string.memcpy(p, s.ptr, toCopy); 62 p += toCopy; 63 s = s[toCopy .. $]; 64 if (p == end) 65 flush(); 66 } 67 if (s.length) { 68 if (s.length >= chunkSize) { 69 stream.write(s).rethrow!StreamException("Serializer.serializeSlice() failed"); 70 } else { 71 core.stdc..string.memcpy(chunk, s.ptr, s.length); 72 p = chunk + s.length; 73 } 74 } 75 } 76 77 void serialize(string s) { 78 assert(s.length <= short.max, "UTF8 string must not be longer than 32767 bytes"); 79 serialize(cast(short)s.length); 80 serializeSlice(cast(ubyte[])s); 81 } 82 83 void serialize(const(ubyte)[] s) { 84 assert(s.length <= int.max, "Byte array must not be larger than 4 GB"); // just in case 85 serialize(cast(int)s.length); 86 serializeSlice(s); 87 } 88 89 private void arrayLength(size_t length) { 90 assert(length <= int.max, "Arrays must not be longer that 2^31 items"); // just in case, maybe set some configurable (and saner) limits? 91 serialize(cast(int)length); 92 } 93 94 void serialize(T)(T[] s) 95 if (!is(T == ubyte) && !is(T == char)) 96 { 97 serialize!int( cast(int) s.length); 98 foreach (ref a; s) 99 serialize(a); 100 } 101 102 void serialize(T)(ref T s) 103 if (is(T == struct)) 104 { 105 alias Names = FieldNameTuple!T; 106 foreach (N; Names) 107 serialize(__traits(getMember, s, N)); 108 } 109 110 private void request(size_t size, ApiKey apiKey, short apiVersion, int correlationId, string clientId) { 111 size += 2 + 2 + 4 + stringSize(clientId); 112 serialize(cast(int)size); 113 serialize(cast(short)apiKey); 114 serialize(apiVersion); 115 serialize(correlationId); 116 serialize(clientId); 117 } 118 119 private enum arrayOverhead = 4; // int32 120 private auto stringSize(string s) { return 2 + s.length; } // int16 plus string 121 122 // version 0 123 void metadataRequest_v0(int correlationId, string clientId, string[] topics) { 124 auto size = arrayOverhead; 125 foreach (t; topics) 126 size += stringSize(t); 127 request(size, ApiKey.MetadataRequest, 0, correlationId, clientId); 128 arrayLength(topics.length); 129 foreach (t; topics) 130 serialize(t); 131 } 132 133 // version 0 134 void fetchRequest_v0(int correlationId, string clientId, in Configuration config, RequestBundler requestBundler) { 135 size_t topics = 0; 136 auto size = 4 + 4 + 4 + arrayOverhead; 137 Topic* t = requestBundler.requestTopicsFront; 138 while (t) { 139 size += stringSize(t.topic) + arrayOverhead + t.partitionsInRequest * (4 + 8 + 4); 140 ++topics; 141 t = t.next; 142 } 143 request(size, ApiKey.FetchRequest, 0, correlationId, clientId); 144 serialize!int(-1); // ReplicaId 145 serialize!int(config.consumerMaxWaitTime); // MaxWaitTime 146 serialize!int(config.consumerMinBytes); // MinBytes 147 arrayLength(topics); 148 t = requestBundler.requestTopicsFront; 149 while (t) { 150 serialize(t.topic); 151 arrayLength(t.partitionsInRequest); 152 Partition* p = t.requestPartitionsFront; 153 while (p) { 154 serialize(p.partition); 155 serialize(p.queue.offset); 156 serialize!int(config.consumerMaxBytes); // MaxBytes 157 p = p.next; 158 } 159 t = t.next; 160 } 161 } 162 163 // version 0 164 void produceRequest_v0(int correlationId, string clientId, in Configuration config, RequestBundler requestBundler) { 165 size_t topics = 0; 166 auto size = 2 + 4 + arrayOverhead; 167 Topic* t = requestBundler.requestTopicsFront; 168 while (t) { 169 size += stringSize(t.topic) + arrayOverhead + t.partitionsInRequest * (4 + 4); 170 Partition* p = t.requestPartitionsFront; 171 while (p) { 172 synchronized (p.queue.mutex) { 173 p.buffer = p.queue.getBuffer(BufferType.Filled); 174 } 175 size += p.buffer.filled; 176 p = p.next; 177 } 178 ++topics; 179 t = t.next; 180 } 181 import vibe.core.log; logDebug("produce request size: %d", size); 182 request(size, ApiKey.ProduceRequest, 0, correlationId, clientId); 183 serialize!short(config.producerRequiredAcks); // RequiredAcks 184 serialize!int(config.produceRequestTimeout); // Timeout 185 arrayLength(topics); 186 t = requestBundler.requestTopicsFront; 187 while (t) { 188 serialize(t.topic); 189 arrayLength(t.partitionsInRequest); 190 Partition* p = t.requestPartitionsFront; 191 while (p) { 192 serialize!int(p.partition); 193 serialize!int(cast(int)p.buffer.filled); 194 serializeSlice(p.buffer.filledSlice); 195 synchronized (p.queue.mutex) { 196 p.buffer.rewind(); 197 p.queue.returnBuffer(BufferType.Free, p.buffer); 198 p.queue.condition.notify(); 199 } 200 p = p.next; 201 } 202 t = t.next; 203 } 204 } 205 206 // version 0 207 void offsetRequest_v0(int correlationId, string clientId, OffsetRequestParams_v0 params) { 208 auto size = 4 + arrayOverhead; 209 foreach (ref t; params.topics) { 210 size += stringSize(t.topic) + arrayOverhead + t.partitions.length * (4 + 8 + 4); 211 } 212 request(size, ApiKey.OffsetRequest, 0, correlationId, clientId); 213 serialize(params); 214 } 215 } 216 217 struct OffsetRequestParams_v0 { 218 static struct PartTimeMax { 219 int partition; 220 long time; 221 int maxOffsets; 222 } 223 static struct Topic { 224 string topic; 225 PartTimeMax[] partitions; 226 } 227 int replicaId; 228 Topic[] topics; 229 }