15 #ifndef VISIONTRANSFER_DATABLOCKPROTOCOL_H
16 #define VISIONTRANSFER_DATABLOCKPROTOCOL_H
24 #include "visiontransfer/internal/alignedallocator.h"
25 #include "visiontransfer/exceptions.h"
27 namespace visiontransfer {
46 class DataBlockProtocol {
54 static const int MAX_DATA_BLOCKS = 8;
57 static const int MAX_TCP_BYTES_TRANSFER = 0xFFFF;
58 static const int MAX_UDP_RECEPTION = 0x4000;
59 static const int MAX_OUTSTANDING_BYTES = 2*MAX_TCP_BYTES_TRANSFER;
64 struct HeaderPreamble {
65 uint16_t netHeaderSize;
66 int32_t netTransferSizeDummy;
67 uint32_t netTransferSizes[MAX_DATA_BLOCKS];
69 struct SegmentHeaderUDP {
70 uint32_t segmentOffset;
72 struct SegmentHeaderTCP {
73 uint32_t fragmentSize;
74 uint32_t segmentOffset;
93 return protType == PROTOCOL_UDP ?
sizeof(int) : 0;
146 void setTransferData(
int block,
unsigned char* data,
int validBytes = 0x7FFFFFFF);
233 return droppedReceptions;
265 unsigned char* getBlockReceiveBuffer(
int block) {
266 if (block >= numReceptionBlocks) {
267 throw ProtocolException(
"Tried to get receive buffer beyond initialized block range");
269 return &blockReceiveBuffers[block][0];
271 int getBlockValidSize(
int block) {
272 if (block >= numReceptionBlocks) {
273 throw ProtocolException(
"Tried to get valid buffer index beyond initialized block range");
275 return blockValidSize[block];
277 bool isBlockDone(
int block) {
278 if (block >= numReceptionBlocks) {
279 throw ProtocolException(
"Tried to get completion status of uninitialized block");
281 return blockValidSize[block] >= blockReceiveSize[block];
283 bool allBlocksDone() {
284 for (
int i=0; i<numReceptionBlocks; ++i) {
285 if (!isBlockDone(i))
return false;
289 bool anyPayloadReceived() {
290 for (
int i=0; i<numReceptionBlocks; ++i) {
291 if (blockReceiveOffsets[i] > 0)
return true;
296 std::string statusReport();
298 bool wasHeaderReceived()
const {
299 return headerReceived;
303 static void getDisconnectionMessage(
const unsigned char* &buf,
int &sz);
306 static void getHeartbeatMessage(
const unsigned char* &buf,
int &sz);
308 bool supportsExtendedConnectionStateProtocol()
const {
309 return extendedConnectionStateProtocol;
316 struct MissingReceiveSegment {
320 unsigned char subsequentData[4];
323 static constexpr
int HEARTBEAT_INTERVAL_MS = 1000;
324 static constexpr
int RECONNECT_TIMEOUT_MS = 2000;
326 static constexpr
unsigned char CONNECTION_MESSAGE = 0x01;
327 static constexpr
unsigned char CONFIRM_MESSAGE = 0x02;
328 static constexpr
unsigned char HEADER_MESSAGE = 0x03;
329 static constexpr
unsigned char RESEND_MESSAGE = 0x04;
330 static constexpr
unsigned char EOF_MESSAGE = 0x05;
331 static constexpr
unsigned char HEARTBEAT_MESSAGE = 0x06;
332 static constexpr
unsigned char DISCONNECTION_MESSAGE = 0x07;
335 ProtocolType protType;
341 unsigned char* rawDataArr[MAX_DATA_BLOCKS];
342 int rawDataArrStrideHackOrig[MAX_DATA_BLOCKS];
343 int rawDataArrStrideHackRepl[MAX_DATA_BLOCKS];
344 int rawValidBytes[MAX_DATA_BLOCKS];
345 int transferOffset[MAX_DATA_BLOCKS];
346 int transferSize[MAX_DATA_BLOCKS];
347 char overwrittenTransferData[
sizeof(SegmentHeaderTCP)];
348 int overwrittenTransferIndex;
349 int overwrittenTransferBlock;
350 unsigned char* transferHeaderData;
351 int transferHeaderSize;
352 int totalBytesCompleted;
353 int totalTransferSize;
354 int numTransferBlocks;
355 int lastTransmittedBlock;
358 std::deque<MissingReceiveSegment> missingReceiveSegments[MAX_DATA_BLOCKS];
359 std::deque<std::pair<int, int> > missingTransferSegments;
360 bool waitingForMissingSegments;
361 int totalReceiveSize;
363 unsigned char controlMessageBuffer[1024 * 16];
366 bool connectionConfirmed;
367 bool confirmationMessagePending;
368 bool eofMessagePending;
369 bool clientConnectionPending;
370 bool resendMessagePending;
371 std::chrono::steady_clock::time_point lastRemoteHostActivity;
372 std::chrono::steady_clock::time_point lastSentHeartbeat;
373 std::chrono::steady_clock::time_point lastReceivedHeartbeat;
374 std::chrono::steady_clock::time_point lastReceivedAnything;
375 int heartbeatKnockCount;
376 int heartbeatRepliesQueued;
379 bool extendedConnectionStateProtocol;
382 std::vector<unsigned char, AlignedAllocator<unsigned char> > receiveBuffer;
383 std::vector<unsigned char, AlignedAllocator<unsigned char> > blockReceiveBuffers[MAX_DATA_BLOCKS];
384 int blockReceiveOffsets[MAX_DATA_BLOCKS];
385 int blockReceiveSize[MAX_DATA_BLOCKS];
386 int blockValidSize[MAX_DATA_BLOCKS];
387 std::vector<unsigned char> receivedHeader;
388 bool finishedReception;
389 int droppedReceptions;
390 int completedReceptions;
391 double lostSegmentRate;
392 int lostSegmentBytes;
393 unsigned char unprocessedMsgPart[MAX_OUTSTANDING_BYTES];
394 int unprocessedMsgLength;
397 int numReceptionBlocks;
400 const unsigned char* extractPayload(
const unsigned char* data,
int& length,
bool& error);
401 bool processControlMessage(
int length);
402 void restoreTransferBuffer();
403 bool generateResendRequest(
int& length);
404 void getNextTransferSegment(
int& block,
int& offset,
int& length);
405 void parseResendMessage(
int length);
406 void parseEofMessage(
int length);
407 void integrateMissingUdpSegments(
int block,
int lastSegmentOffset,
int lastSegmentSize);
410 void resizeReceiveBuffer();
411 int parseReceivedHeader(
int length,
int offset);
412 void zeroStructures();
413 void splitRawOffset(
int rawSegmentOffset,
int& dataBlockID,
int& segmentOffset);
414 int mergeRawOffset(
int dataBlockID,
int segmentOffset,
int reserved=0);