1 module kafkad.protocol.deserializer; 2 3 import core.stdc.string; 4 import vibe.core.net; 5 import kafkad.protocol.common; 6 import kafkad.protocol.metadata; 7 import kafkad.exception; 8 9 // read data up to ChunkSize and then deserialize 10 11 struct Deserializer { 12 private { 13 ubyte* chunk, p, end; 14 TCPConnection stream; 15 size_t remaining; // bytes remaining in current message 16 size_t chunkSize; 17 } 18 19 this(TCPConnection stream, size_t chunkSize) { 20 chunk = cast(ubyte*)enforce(GC.malloc(chunkSize, GC.BlkAttr.NO_SCAN)); 21 p = chunk; 22 end = chunk; 23 this.stream = stream; 24 this.chunkSize = chunkSize; 25 } 26 27 // must be called before each message (needed for correct chunk processing) 28 void beginMessage(size_t size) { 29 remaining = size; 30 } 31 32 void endMessage() { 33 if (remaining) 34 skipBytes(remaining); 35 } 36 37 // todo: test 38 void skipBytes(size_t size) { 39 auto tail = end - p; 40 if (size <= tail) { 41 p += size; 42 } else { 43 size -= tail; 44 end = chunk + size % chunkSize; 45 p = end; 46 while (size) { 47 auto toRead = min(size, chunkSize); 48 stream.read(chunk[0 .. toRead]).rethrow!StreamException("Deserializer.skipBytes() failed"); 49 size -= toRead; 50 } 51 } 52 } 53 54 void read() { 55 assert(remaining); 56 auto tail = end - p; 57 if (tail && tail < chunkSize) 58 core.stdc..string.memmove(chunk, p, tail); 59 // read up to remaining if it's smaller than chunk size, it will prevent blocking if read buffer is empty 60 // consider: reading up to vibe's leastSize(); 61 auto toRead = min(chunkSize, tail + remaining); 62 stream.read(chunk[tail .. toRead]).rethrow!StreamException("Deserializer.read() failed"); 63 p = chunk; 64 end = chunk + toRead; 65 remaining -= toRead - tail; 66 } 67 68 void check(size_t needed) { 69 if (end - p < needed) 70 read(); 71 } 72 73 void getMessage(out int size, out int correlationId) { 74 remaining = 8; 75 check(8); 76 deserialize(size); 77 deserialize(correlationId); 78 size -= 4; // subtract correlationId overhead 79 } 80 81 // returns chunk slice up to remaining bytes 82 ubyte[] getChunk(size_t needed) { 83 needed = min(needed, chunkSize); 84 check(needed); 85 auto slice = p[0 .. needed]; 86 p += needed; 87 return slice; 88 } 89 90 void deserialize(out byte s) { 91 check(1); 92 s = *p++; 93 } 94 95 void deserialize(T)(out T s) 96 if (is(T == short) || is(T == int) || is(T == long)) 97 { 98 check(T.sizeof); 99 auto pt = cast(T*)p; 100 s = *pt++; 101 p = cast(ubyte*)pt; 102 version (LittleEndian) 103 s = swapEndian(s); 104 } 105 106 void deserialize(T)(out T s) 107 if (is(T == enum)) 108 { 109 OriginalType!T v; 110 deserialize(v); 111 s = cast(T)v; 112 } 113 114 void deserializeSlice(ubyte[] s) { 115 auto tail = end - p; // amount available in the chunk 116 if (s.length <= tail) { 117 core.stdc..string.memcpy(s.ptr, p, s.length); 118 p += s.length; 119 } else { 120 core.stdc..string.memcpy(s.ptr, p, tail); 121 p += tail; 122 s = s[tail .. $]; 123 if (s.length >= chunkSize) { 124 stream.read(s).rethrow!StreamException("Deserializer.deserializeSlice() failed"); 125 remaining -= s.length; 126 } else { 127 read(); 128 assert(end - p >= s.length); 129 core.stdc..string.memcpy(s.ptr, p, s.length); 130 p += s.length; 131 } 132 } 133 } 134 135 void deserializeBytes(T)(out ubyte[] s) { 136 T len; 137 deserialize(len); 138 assert(len >= 0); 139 s = new ubyte[len]; 140 deserializeSlice(s); 141 } 142 143 void deserialize(out string s) { 144 ubyte[] b; 145 deserializeBytes!short(b); 146 s = cast(string)b; 147 } 148 149 void deserialize(out ubyte[] s) { 150 deserializeBytes!int(s); 151 } 152 153 void deserialize(T)(out T[] s) 154 if (!is(T == ubyte) && !is(T == char)) 155 { 156 check(4); 157 int len; 158 deserialize(len); 159 assert(len >= 0); 160 s = new T[len]; 161 foreach (ref a; s) 162 deserialize(a); 163 } 164 165 void deserialize(T)(out T s) 166 if (is(T == struct)) 167 { 168 alias Names = FieldNameTuple!T; 169 foreach (N; Names) 170 deserialize(__traits(getMember, s, N)); 171 } 172 173 auto metadataResponse_v0() { 174 Metadata r; 175 deserialize(r); 176 return r; 177 } 178 179 auto offsetResponse_v0() { 180 OffsetResponse_v0 r; 181 deserialize(r); 182 return r; 183 } 184 } 185 186 struct OffsetResponse_v0 { 187 static struct PartitionOffsets { 188 int partition; 189 short errorCode; 190 long[] offsets; 191 } 192 static struct Topic { 193 string topic; 194 PartitionOffsets[] partitions; 195 } 196 Topic[] topics; 197 }