1 module kafkad.protocol.deserializer;
2
3 import core.stdc.string;
4 import vibe.core.net;
5 import kafkad.protocol.common;
6 import kafkad.protocol.metadata;
7 import kafkad.exception;
8
9 // read data up to ChunkSize and then deserialize
10
11 struct Deserializer {
12 private {
13 ubyte* chunk, p, end;
14 TCPConnection stream;
15 size_t remaining; // bytes remaining in current message
16 size_t chunkSize;
17 }
18
19 this(TCPConnection stream, size_t chunkSize) {
20 chunk = cast(ubyte*)enforce(GC.malloc(chunkSize, GC.BlkAttr.NO_SCAN));
21 p = chunk;
22 end = chunk;
23 this.stream = stream;
24 this.chunkSize = chunkSize;
25 }
26
27 // must be called before each message (needed for correct chunk processing)
28 void beginMessage(size_t size) {
29 remaining = size;
30 }
31
32 void endMessage() {
33 if (remaining)
34 skipBytes(remaining);
35 }
36
37 // todo: test
38 void skipBytes(size_t size) {
39 auto tail = end - p;
40 if (size <= tail) {
41 p += size;
42 } else {
43 size -= tail;
44 end = chunk + size % chunkSize;
45 p = end;
46 while (size) {
47 auto toRead = min(size, chunkSize);
48 stream.read(chunk[0 .. toRead]).rethrow!StreamException("Deserializer.skipBytes() failed");
49 size -= toRead;
50 }
51 }
52 }
53
54 void read() {
55 assert(remaining);
56 auto tail = end - p;
57 if (tail && tail < chunkSize)
58 core.stdc..string.memmove(chunk, p, tail);
59 // read up to remaining if it's smaller than chunk size, it will prevent blocking if read buffer is empty
60 // consider: reading up to vibe's leastSize();
61 auto toRead = min(chunkSize, tail + remaining);
62 stream.read(chunk[tail .. toRead]).rethrow!StreamException("Deserializer.read() failed");
63 p = chunk;
64 end = chunk + toRead;
65 remaining -= toRead - tail;
66 }
67
68 void check(size_t needed) {
69 if (end - p < needed)
70 read();
71 }
72
73 void getMessage(out int size, out int correlationId) {
74 remaining = 8;
75 check(8);
76 deserialize(size);
77 deserialize(correlationId);
78 size -= 4; // subtract correlationId overhead
79 }
80
81 // returns chunk slice up to remaining bytes
82 ubyte[] getChunk(size_t needed) {
83 needed = min(needed, chunkSize);
84 check(needed);
85 auto slice = p[0 .. needed];
86 p += needed;
87 return slice;
88 }
89
90 void deserialize(out byte s) {
91 check(1);
92 s = *p++;
93 }
94
95 void deserialize(T)(out T s)
96 if (is(T == short) || is(T == int) || is(T == long))
97 {
98 check(T.sizeof);
99 auto pt = cast(T*)p;
100 s = *pt++;
101 p = cast(ubyte*)pt;
102 version (LittleEndian)
103 s = swapEndian(s);
104 }
105
106 void deserialize(T)(out T s)
107 if (is(T == enum))
108 {
109 OriginalType!T v;
110 deserialize(v);
111 s = cast(T)v;
112 }
113
114 void deserializeSlice(ubyte[] s) {
115 auto tail = end - p; // amount available in the chunk
116 if (s.length <= tail) {
117 core.stdc..string.memcpy(s.ptr, p, s.length);
118 p += s.length;
119 } else {
120 core.stdc..string.memcpy(s.ptr, p, tail);
121 p += tail;
122 s = s[tail .. $];
123 if (s.length >= chunkSize) {
124 stream.read(s).rethrow!StreamException("Deserializer.deserializeSlice() failed");
125 remaining -= s.length;
126 } else {
127 read();
128 assert(end - p >= s.length);
129 core.stdc..string.memcpy(s.ptr, p, s.length);
130 p += s.length;
131 }
132 }
133 }
134
135 void deserializeBytes(T)(out ubyte[] s) {
136 T len;
137 deserialize(len);
138 assert(len >= 0);
139 s = new ubyte[len];
140 deserializeSlice(s);
141 }
142
143 void deserialize(out string s) {
144 ubyte[] b;
145 deserializeBytes!short(b);
146 s = cast(string)b;
147 }
148
149 void deserialize(out ubyte[] s) {
150 deserializeBytes!int(s);
151 }
152
153 void deserialize(T)(out T[] s)
154 if (!is(T == ubyte) && !is(T == char))
155 {
156 check(4);
157 int len;
158 deserialize(len);
159 assert(len >= 0);
160 s = new T[len];
161 foreach (ref a; s)
162 deserialize(a);
163 }
164
165 void deserialize(T)(out T s)
166 if (is(T == struct))
167 {
168 alias Names = FieldNameTuple!T;
169 foreach (N; Names)
170 deserialize(__traits(getMember, s, N));
171 }
172
173 auto metadataResponse_v0() {
174 Metadata r;
175 deserialize(r);
176 return r;
177 }
178
179 auto offsetResponse_v0() {
180 OffsetResponse_v0 r;
181 deserialize(r);
182 return r;
183 }
184 }
185
186 struct OffsetResponse_v0 {
187 static struct PartitionOffsets {
188 int partition;
189 short errorCode;
190 long[] offsets;
191 }
192 static struct Topic {
193 string topic;
194 PartitionOffsets[] partitions;
195 }
196 Topic[] topics;
197 }