1 module kafkad.protocol.deserializer;
2 
3 import core.stdc.string;
4 import vibe.core.net;
5 import kafkad.protocol.common;
6 import kafkad.protocol.metadata;
7 import kafkad.exception;
8 
9 // read data up to ChunkSize and then deserialize
10 
11 struct Deserializer {
12     private {
13         ubyte* chunk, p, end;
14         TCPConnection stream;
15         size_t remaining; // bytes remaining in current message
16         size_t chunkSize;
17     }
18     
19     this(TCPConnection stream, size_t chunkSize) {
20         chunk = cast(ubyte*)enforce(GC.malloc(chunkSize, GC.BlkAttr.NO_SCAN));
21         p = chunk;
22         end = chunk;
23         this.stream = stream;
24         this.chunkSize = chunkSize;
25     }
26 
27     // must be called before each message (needed for correct chunk processing)
28     void beginMessage(size_t size) {
29         remaining = size;
30     }
31 
32     void endMessage() {
33         if (remaining)
34             skipBytes(remaining);
35     }
36 
37     // todo: test
38     void skipBytes(size_t size) {
39         auto tail = end - p;
40         if (size <= tail) {
41             p += size;
42         } else {
43             size -= tail;
44             end = chunk + size % chunkSize;
45             p = end;
46             while (size) {
47                 auto toRead = min(size, chunkSize);
48                 stream.read(chunk[0 .. toRead]).rethrow!StreamException("Deserializer.skipBytes() failed");
49                 size -= toRead;
50             }
51         }
52     }
53 
54     void read() {
55         assert(remaining);
56         auto tail = end - p;
57         if (tail && tail < chunkSize)
58             core.stdc..string.memmove(chunk, p, tail);
59         // read up to remaining if it's smaller than chunk size, it will prevent blocking if read buffer is empty
60         // consider: reading up to vibe's leastSize();
61         auto toRead = min(chunkSize, tail + remaining);
62         stream.read(chunk[tail .. toRead]).rethrow!StreamException("Deserializer.read() failed");
63         p = chunk;
64         end = chunk + toRead;
65         remaining -= toRead - tail;
66     }
67 
68     void check(size_t needed) {
69         if (end - p < needed)
70             read();
71     }
72 
73     void getMessage(out int size, out int correlationId) {
74         remaining = 8;
75         check(8);
76         deserialize(size);
77         deserialize(correlationId);
78         size -= 4; // subtract correlationId overhead
79     }
80 
81     // returns chunk slice up to remaining bytes
82     ubyte[] getChunk(size_t needed) {
83         needed = min(needed, chunkSize);
84         check(needed);
85         auto slice = p[0 .. needed];
86         p += needed;
87         return slice;
88     }
89 
90     void deserialize(out byte s) {
91         check(1);
92         s = *p++;
93     }
94     
95     void deserialize(T)(out T s)
96         if (is(T == short) || is(T == int) || is(T == long))
97     {
98         check(T.sizeof);
99         auto pt = cast(T*)p;
100         s = *pt++;
101         p = cast(ubyte*)pt;
102         version (LittleEndian)
103             s = swapEndian(s);
104     }
105 
106     void deserialize(T)(out T s)
107         if (is(T == enum))
108     {
109         OriginalType!T v;
110         deserialize(v);
111         s = cast(T)v;
112     }
113 
114     void deserializeSlice(ubyte[] s) {
115         auto tail = end - p; // amount available in the chunk
116         if (s.length <= tail) {
117             core.stdc..string.memcpy(s.ptr, p, s.length);
118             p += s.length;
119         } else {
120             core.stdc..string.memcpy(s.ptr, p, tail);
121             p += tail;
122             s = s[tail .. $];
123             if (s.length >= chunkSize) {
124                 stream.read(s).rethrow!StreamException("Deserializer.deserializeSlice() failed");
125                 remaining -= s.length;
126             } else {
127                 read();
128                 assert(end - p >= s.length);
129                 core.stdc..string.memcpy(s.ptr, p, s.length);
130 				p += s.length;
131             }
132         }
133     }
134 
135     void deserializeBytes(T)(out ubyte[] s) {
136         T len;
137         deserialize(len);
138         assert(len >= 0);
139         s = new ubyte[len];
140         deserializeSlice(s);
141     }
142 
143     void deserialize(out string s) {
144         ubyte[] b;
145         deserializeBytes!short(b);
146         s = cast(string)b;
147     }
148 
149     void deserialize(out ubyte[] s) {
150         deserializeBytes!int(s);
151     }
152 
153     void deserialize(T)(out T[] s)
154         if (!is(T == ubyte) && !is(T == char))
155     {
156         check(4);
157         int len;
158         deserialize(len);
159         assert(len >= 0);
160         s = new T[len];
161         foreach (ref a; s)
162             deserialize(a);
163     }
164 
165     void deserialize(T)(out T s)
166         if (is(T == struct))
167     {
168         alias Names = FieldNameTuple!T;
169         foreach (N; Names)
170             deserialize(__traits(getMember, s, N));
171     }
172 
173     auto metadataResponse_v0() {
174         Metadata r;
175         deserialize(r);
176         return r;
177     }
178 
179     auto offsetResponse_v0() {
180         OffsetResponse_v0 r;
181         deserialize(r);
182         return r;
183     }
184 }
185 
186 struct OffsetResponse_v0 {
187     static struct PartitionOffsets {
188         int partition;
189         short errorCode;
190         long[] offsets;
191     }
192     static struct Topic {
193         string topic;
194         PartitionOffsets[] partitions;
195     }
196     Topic[] topics;
197 }