1 module kafkad.connection; 2 3 import kafkad.client; 4 import kafkad.protocol; 5 import kafkad.exception; 6 import kafkad.bundler; 7 import kafkad.queue; 8 import kafkad.utils.lists; 9 import vibe.core.core; 10 import vibe.core.net; 11 import vibe.core.task; 12 import vibe.core.sync; 13 import vibe.core.log; 14 import std.format; 15 import core.time; 16 import std.concurrency; 17 18 package: 19 20 enum RequestType { Metadata, Fetch, Produce, Offset }; 21 22 struct Request { 23 RequestType type; 24 Tid tid; 25 26 Request* next; 27 } 28 29 class BrokerConnection { 30 private { 31 Client m_client; 32 TCPConnection m_conn; 33 Serializer m_ser; 34 Deserializer m_des; 35 TaskMutex m_mutex; 36 RequestBundler m_consumerRequestBundler; 37 RequestBundler m_producerRequestBundler; 38 FreeList!Request m_requests; 39 Task m_fetcherTask, m_pusherTask, m_receiverTask; 40 ubyte[] m_topicNameBuffer; 41 } 42 43 int id = -1; 44 45 @property NetworkAddress addr() { 46 return m_conn.remoteAddress.rethrow!ConnectionException("Could not get connection's remote address"); 47 } 48 49 @property RequestBundler consumerRequestBundler() { return m_consumerRequestBundler; } 50 @property RequestBundler producerRequestBundler() { return m_producerRequestBundler; } 51 52 this(Client client, TCPConnection conn) { 53 m_client = client; 54 m_conn = conn; 55 m_ser = Serializer(conn, client.config.serializerChunkSize); 56 m_des = Deserializer(conn, client.config.deserializerChunkSize); 57 m_mutex = new TaskMutex(); 58 m_consumerRequestBundler = new RequestBundler(); 59 m_producerRequestBundler = new RequestBundler(); 60 m_topicNameBuffer = new ubyte[short.max]; 61 m_fetcherTask = runTask(&fetcherMain); 62 m_pusherTask = runTask(&pusherMain); 63 m_receiverTask = runTask(&receiverMain); 64 } 65 66 void fetcherMain() { 67 int size, correlationId; 68 bool gotFirstRequest = false; 69 MonoTime startTime; 70 try { 71 // send requests 72 for (;;) { 73 synchronized (m_consumerRequestBundler.mutex) { 74 auto minRequests = min(m_client.config.fetcherBundleMinRequests, m_consumerRequestBundler.queueCount); 75 if (!gotFirstRequest) { 76 // wait for the first fetch request 77 while (!m_consumerRequestBundler.requestTopicsFront) { 78 m_consumerRequestBundler.readyCondition.wait(); 79 } 80 if (m_consumerRequestBundler.requestsCollected < minRequests) { 81 gotFirstRequest = true; 82 // start the timer 83 startTime = MonoTime.currTime; 84 // wait for more requests 85 continue; 86 } 87 } else { 88 // wait up to configured wait time or up to configured request count 89 while (m_consumerRequestBundler.requestsCollected < minRequests) { 90 Duration elapsedTime = MonoTime.currTime - startTime; 91 if (elapsedTime >= m_client.config.fetcherBundleMaxWaitTime.msecs) 92 break; // timeout reached 93 Duration remaining = m_client.config.fetcherBundleMaxWaitTime.msecs - elapsedTime; 94 if (!m_consumerRequestBundler.readyCondition.wait(remaining)) 95 break; // timeout reached 96 } 97 gotFirstRequest = false; 98 } 99 100 synchronized (m_mutex) { 101 m_ser.fetchRequest_v0(0, m_client.clientId, m_client.config, m_consumerRequestBundler); 102 m_ser.flush(); 103 104 // add request for each fetch 105 auto req = m_requests.getNodeToFill(); 106 req.type = RequestType.Fetch; 107 m_requests.pushFilledNode(req); 108 } 109 110 m_consumerRequestBundler.clearRequestLists(); 111 } 112 } 113 } catch (StreamException) { 114 // stream error, typically connection loss 115 m_pusherTask.interrupt(); 116 m_receiverTask.interrupt(); 117 m_client.connectionLost(this); 118 } catch (InterruptException) { 119 // do nothing 120 logDebugV("FETCHER INT"); 121 } 122 } 123 124 void pusherMain() { 125 int size, correlationId; 126 bool gotFirstRequest = false; 127 MonoTime startTime; 128 try { 129 // send requests 130 for (;;) { 131 synchronized (m_producerRequestBundler.mutex) { 132 auto minRequests = min(m_client.config.pusherBundleMinRequests, m_producerRequestBundler.queueCount); 133 if (!gotFirstRequest) { 134 // wait for the first produce request 135 while (!m_producerRequestBundler.requestTopicsFront) { 136 m_producerRequestBundler.readyCondition.wait(); 137 } 138 if (m_producerRequestBundler.requestsCollected < minRequests) { 139 gotFirstRequest = true; 140 // start the timer 141 startTime = MonoTime.currTime; 142 // wait for more requests 143 continue; 144 } 145 } else { 146 // wait up to configured wait time or up to configured request count 147 while (m_producerRequestBundler.requestsCollected < minRequests) { 148 Duration elapsedTime = MonoTime.currTime - startTime; 149 if (elapsedTime >= m_client.config.pusherBundleMaxWaitTime.msecs) 150 break; // timeout reached 151 Duration remaining = m_client.config.pusherBundleMaxWaitTime.msecs - elapsedTime; 152 if (!m_producerRequestBundler.readyCondition.wait(remaining)) 153 break; // timeout reached 154 } 155 gotFirstRequest = false; 156 } 157 158 synchronized (m_mutex) { 159 m_ser.produceRequest_v0(0, m_client.clientId, m_client.config, m_producerRequestBundler); 160 m_ser.flush(); 161 162 // add request for each fetch 163 auto req = m_requests.getNodeToFill(); 164 req.type = RequestType.Produce; 165 m_requests.pushFilledNode(req); 166 } 167 168 m_producerRequestBundler.clearRequestLists(); 169 } 170 } 171 } catch (StreamException) { 172 // stream error, typically connection loss 173 m_fetcherTask.interrupt(); 174 m_receiverTask.interrupt(); 175 m_client.connectionLost(this); 176 } catch (InterruptException) { 177 // do nothing 178 logDebugV("PUSHER INT"); 179 } 180 } 181 182 void receiverMain() { 183 try { 184 int size, correlationId; 185 for (;;) { 186 m_des.getMessage(size, correlationId); 187 m_des.beginMessage(size); 188 scope (success) 189 m_des.endMessage(); 190 191 // requests are always processed in order on a single TCP connection, 192 // and we rely on that order rather than on the correlationId 193 // requests are pushed to the request queue by the consumer and producer 194 // and they are popped here in the order they were sent 195 Request req = void; 196 synchronized (m_mutex) { 197 assert(!m_requests.empty); 198 auto node = m_requests.getNodeToProcess(); 199 req = *node; 200 m_requests.returnProcessedNode(node); 201 } 202 203 switch (req.type) { 204 case RequestType.Metadata: 205 Metadata metadata = m_des.metadataResponse_v0(); 206 send(req.tid, cast(immutable)metadata); 207 break; 208 case RequestType.Offset: 209 OffsetResponse_v0 resp = m_des.offsetResponse_v0(); 210 send(req.tid, cast(immutable)resp); 211 break; 212 case RequestType.Fetch: 213 // parse the fetch response, move returned messages to the correct queues, 214 // and handle partition errors if needed 215 int numtopics; 216 m_des.deserialize(numtopics); 217 assert(numtopics > 0); 218 foreach (nt; 0 .. numtopics) { 219 string topic; 220 int numpartitions; 221 short topicNameLen; 222 m_des.deserialize(topicNameLen); 223 224 ubyte[] topicSlice = m_topicNameBuffer[0 .. topicNameLen]; 225 m_des.deserializeSlice(topicSlice); 226 topic = cast(string)topicSlice; 227 m_des.deserialize(numpartitions); 228 assert(numpartitions > 0); 229 230 synchronized (m_consumerRequestBundler.mutex) { 231 Topic* queueTopic = m_consumerRequestBundler.findTopic(topic); 232 233 foreach (np; 0 .. numpartitions) { 234 static struct FetchPartitionInfo { 235 int partition; 236 short errorCode; 237 long endOffset; 238 int messageSetSize; 239 } 240 FetchPartitionInfo fpi; 241 m_des.deserialize(fpi); 242 243 Partition* queuePartition = null; 244 if (queueTopic) 245 queuePartition = queueTopic.findPartition(fpi.partition); 246 247 if (!queuePartition) { 248 // skip the partition 249 m_des.skipBytes(fpi.messageSetSize); 250 continue; 251 } 252 253 Queue queue = queuePartition.queue; 254 255 switch (cast(ApiError)fpi.errorCode) { 256 case ApiError.NoError: break; 257 case ApiError.UnknownTopicOrPartition: 258 case ApiError.LeaderNotAvailable: 259 case ApiError.NotLeaderForPartition: 260 // We need to refresh the metadata, get the new connection and 261 // retry the request. To do so, we remove the consumer from this 262 // connection and add it to the client brokerlessConsumers list. 263 // The client will do the rest. 264 m_consumerRequestBundler.removeQueue(queueTopic, queuePartition); 265 m_client.addToBrokerless(queue, true); 266 m_des.skipBytes(fpi.messageSetSize); 267 continue; 268 case ApiError.OffsetOutOfRange: 269 m_consumerRequestBundler.removeQueue(queueTopic, queuePartition); 270 m_client.removeConsumer(cast(Consumer)queue.worker); 271 queue.worker.throwException(new OffsetOutOfRangeException(format( 272 "Offset %d is out of range for topic %s, partition %d", 273 queue.offset, queueTopic.topic, queuePartition.partition))); 274 m_des.skipBytes(fpi.messageSetSize); 275 continue; 276 default: throw new ProtocolException(format("Unexpected fetch response error: %d", fpi.errorCode)); 277 } 278 279 if (fpi.messageSetSize > m_client.config.consumerMaxBytes) { 280 m_consumerRequestBundler.removeQueue(queueTopic, queuePartition); 281 queue.worker.throwException(new ProtocolException("MessageSet is too big to fit into a buffer")); 282 m_des.skipBytes(fpi.messageSetSize); 283 continue; 284 } 285 286 if (fpi.messageSetSize < 26) { 287 // we got a message set that is smaller than minimum message size, make another request 288 m_consumerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition); 289 m_des.skipBytes(fpi.messageSetSize); 290 continue; 291 } 292 293 QueueBuffer* qbuf; 294 295 synchronized (queue.mutex) 296 qbuf = queue.getBuffer(BufferType.Free); 297 298 // copy message set to the buffer 299 m_des.deserializeSlice(qbuf.buffer[0 .. fpi.messageSetSize]); 300 qbuf.p = qbuf.buffer; 301 qbuf.messageSetSize = fpi.messageSetSize; 302 qbuf.requestedOffset = queue.offset; 303 304 // find the next offset to fetch 305 long nextOffset = qbuf.findNextOffset(); 306 enforce(nextOffset > 0); 307 308 synchronized (queue.mutex) { 309 queue.offset = nextOffset; 310 queue.returnBuffer(BufferType.Filled, qbuf); 311 queue.condition.notify(); 312 // queue.fetchPending is always true here 313 if (queue.hasBuffer(BufferType.Free)) 314 m_consumerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition); 315 else 316 queue.requestPending = false; 317 } 318 } 319 } 320 } 321 break; 322 case RequestType.Produce: 323 int numtopics; 324 m_des.deserialize(numtopics); 325 assert(numtopics > 0); 326 foreach (nt; 0 .. numtopics) { 327 string topic; 328 int numpartitions; 329 short topicNameLen; 330 m_des.deserialize(topicNameLen); 331 332 ubyte[] topicSlice = m_topicNameBuffer[0 .. topicNameLen]; 333 m_des.deserializeSlice(topicSlice); 334 topic = cast(string)topicSlice; 335 m_des.deserialize(numpartitions); 336 assert(numpartitions > 0); 337 338 synchronized (m_producerRequestBundler.mutex) { 339 Topic* queueTopic = m_producerRequestBundler.findTopic(topic); 340 341 foreach (np; 0 .. numpartitions) { 342 static struct ProducePartitionInfo { 343 int partition; 344 short errorCode; 345 long offset; 346 } 347 ProducePartitionInfo ppi; 348 m_des.deserialize(ppi); 349 350 Partition* queuePartition = null; 351 if (queueTopic) 352 queuePartition = queueTopic.findPartition(ppi.partition); 353 354 assert(queuePartition); 355 if (!queuePartition) { 356 // skip the partition 357 continue; 358 } 359 360 Queue queue = queuePartition.queue; 361 362 switch (cast(ApiError)ppi.errorCode) { 363 case ApiError.NoError: break; 364 case ApiError.UnknownTopicOrPartition: 365 case ApiError.LeaderNotAvailable: 366 case ApiError.NotLeaderForPartition: 367 // We need to refresh the metadata, get the new connection and 368 // retry the request. To do so, we remove the producer from this 369 // connection and add it to the client brokerlessWorkers list. 370 // The client will do the rest. 371 m_producerRequestBundler.removeQueue(queueTopic, queuePartition); 372 m_client.addToBrokerless(queue, true); 373 continue; 374 default: throw new ProtocolException(format("Unexpected produce response error: %d", ppi.errorCode)); 375 } 376 377 synchronized (queue.mutex) { 378 //queue.returnBuffer(BufferType.Filled); 379 // queue.requestPending is always true here 380 if (queue.hasBuffer(BufferType.Filled)) 381 m_producerRequestBundler.queueHasReadyBuffers(queueTopic, queuePartition); 382 else 383 queue.requestPending = false; 384 } 385 } 386 } 387 } 388 break; 389 default: assert(0); // FIXME 390 } 391 } 392 } 393 catch (StreamException) { 394 // stream error, typically connection loss 395 m_fetcherTask.interrupt(); 396 m_pusherTask.interrupt(); 397 m_client.connectionLost(this); 398 } catch (InterruptException) { 399 // do nothing 400 logDebugV("RECEIVER INT"); 401 } 402 } 403 404 Metadata getMetadata(string[] topics) { 405 synchronized (m_mutex) { 406 m_ser.metadataRequest_v0(0, m_client.clientId, topics); 407 m_ser.flush(); 408 409 auto req = m_requests.getNodeToFill(); 410 req.type = RequestType.Metadata; 411 req.tid = thisTid; 412 m_requests.pushFilledNode(req); 413 } 414 Metadata ret; 415 receive((immutable Metadata v) { ret = cast()v; }); 416 return ret; 417 } 418 419 long getStartingOffset(string topic, int partition, long offset) { 420 assert(offset == -1 || offset == -2); 421 OffsetRequestParams_v0.PartTimeMax[1] p; 422 p[0].partition = partition; 423 p[0].time = offset; 424 p[0].maxOffsets = 1; 425 OffsetRequestParams_v0.Topic[1] t; 426 t[0].topic = topic; 427 t[0].partitions = p; 428 OffsetRequestParams_v0 params; 429 params.replicaId = id; 430 params.topics = t; 431 synchronized (m_mutex) { 432 m_ser.offsetRequest_v0(0, m_client.clientId, params); 433 m_ser.flush(); 434 435 auto req = m_requests.getNodeToFill(); 436 req.type = RequestType.Offset; 437 req.tid = thisTid; 438 m_requests.pushFilledNode(req); 439 } 440 long return_offset; 441 receive( 442 (immutable OffsetResponse_v0 resp) { 443 enforce(resp.topics.length == 1); 444 enforce(resp.topics[0].partitions.length == 1); 445 import std.format; 446 enforce(resp.topics[0].partitions[0].errorCode == 0,format("Could not get starting offset for topic %s and partition %d", topic, partition)); 447 enforce(resp.topics[0].partitions[0].offsets.length == 1); 448 return_offset = resp.topics[0].partitions[0].offsets[0]; 449 }); 450 451 return return_offset; 452 } 453 }