libvisiontransfer  10.8.0
datablockprotocol.cpp
1 /*******************************************************************************
2  * Copyright (c) 2024 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <algorithm>
16 #include <iostream>
17 #include <cstring>
18 
19 #include <iomanip>
20 #include <sstream>
21 
22 #include "visiontransfer/internal/datablockprotocol.h"
23 #include "visiontransfer/exceptions.h"
24 
25 // Network headers
26 #ifdef _WIN32
27 #include <winsock2.h>
28 #undef min
29 #undef max
30 #else
31 #include <arpa/inet.h>
32 #endif
33 
34 #define LOG_DEBUG_DBP(expr)
35 //#define LOG_DEBUG_DBP(expr) std::cerr << "DataBlockProtocol: " << expr << std::endl
36 
37 using namespace std;
38 using namespace visiontransfer;
39 using namespace visiontransfer::internal;
40 
41 namespace visiontransfer {
42 namespace internal {
43 
44 DataBlockProtocol::DataBlockProtocol(bool server, ProtocolType protType, int maxUdpPacketSize)
45  : isServer(server), protType(protType),
46  transferDone(true),
47  overwrittenTransferData{0},
48  overwrittenTransferIndex{-1},
49  overwrittenTransferBlock{-1},
50  transferHeaderData{nullptr},
51  transferHeaderSize{0},
52  totalBytesCompleted{0}, totalTransferSize{0},
53  waitingForMissingSegments(false),
54  totalReceiveSize(0), connectionConfirmed(false),
55  confirmationMessagePending(false), eofMessagePending(false),
56  clientConnectionPending(false), resendMessagePending(false),
57  lastRemoteHostActivity(), lastSentHeartbeat(),
58  lastReceivedHeartbeat(std::chrono::steady_clock::now()),
59  lastReceivedAnything(std::chrono::steady_clock::now()),
60  heartbeatKnockCount(0),
61  heartbeatRepliesQueued(0),
62  extendedConnectionStateProtocol(false),
63  finishedReception(false), droppedReceptions(0),
64  completedReceptions(0), lostSegmentRate(0.0), lostSegmentBytes(0),
65  unprocessedMsgLength(0), headerReceived(false) {
66  // Determine the maximum allowed payload size
67  if(protType == PROTOCOL_TCP) {
68  maxPayloadSize = MAX_TCP_BYTES_TRANSFER - sizeof(SegmentHeaderTCP);
69  minPayloadSize = 0;
70  } else {
71  maxPayloadSize = maxUdpPacketSize - sizeof(SegmentHeaderUDP);
72  minPayloadSize = maxPayloadSize;
73  }
74  zeroStructures();
75  resizeReceiveBuffer();
76  resetReception(false);
77 }
78 void DataBlockProtocol::splitRawOffset(int rawSegmentOffset, int& dataBlockID, int& segmentOffset) {
79  int selector = (rawSegmentOffset >> 28) & 0xf;
80  dataBlockID = selector & 0x7; // Note: 0x8 bit is reserved for now
81  segmentOffset = rawSegmentOffset & 0x0FFFffff;
82 }
83 
84 int DataBlockProtocol::mergeRawOffset(int dataBlockID, int segmentOffset, int reserved_defaults0) {
85  return ((reserved_defaults0 & 1) << 31) | ((dataBlockID & 0x07) << 28) | (segmentOffset & 0x0FFFffff);
86 }
87 
88 void DataBlockProtocol::zeroStructures() {
89  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
90  rawDataArr[i] = nullptr;
91  rawDataArrStrideHackOrig[i] = 0;
92  rawDataArrStrideHackRepl[i] = 0;
93  rawValidBytes[i] = 0;
94  transferOffset[i] = 0;
95  transferSize[i] = 0;
96  }
97  std::memset(overwrittenTransferData, 0, sizeof(overwrittenTransferData));
98  overwrittenTransferIndex = -1;
99  overwrittenTransferBlock = -1;
100  lastTransmittedBlock = -1;
101  receiveOffset = 0;
102  numReceptionBlocks = 0;
103 }
104 
106  transferDone = true;
107  overwrittenTransferIndex = -1;
108  overwrittenTransferBlock = -1;
109  totalBytesCompleted = 0;
110  totalTransferSize = 0;
111  numTransferBlocks = 0;
112  missingTransferSegments.clear();
113 }
114 
115 void DataBlockProtocol::setTransferBytes(int block, long bytes) {
116  if (transferHeaderData == nullptr) {
117  throw ProtocolException("Tried to set data block size before initializing header!");
118  } else if (block >= numTransferBlocks) {
119  throw ProtocolException("Request to set data block size - block index too high!");
120  }
121  transferSize[block] = bytes;
122  HeaderPreamble* hp = reinterpret_cast<HeaderPreamble*>(transferHeaderData);
123  hp->netTransferSizes[block] = htonl(bytes);
124 }
125 
126 void DataBlockProtocol::setTransferHeader(unsigned char* data, int headerSize, int blocks) {
127  if(!transferDone && numTransferBlocks > 0) {
128  throw ProtocolException("Header data set while transfer is active!");
129  } else if(headerSize + 9 > static_cast<int>(sizeof(controlMessageBuffer))) {
130  throw ProtocolException("Transfer header is too large!");
131  } else if(blocks == 0) {
132  throw ProtocolException("Requested transfer of 0 blocks!");
133  }
134 
135  numTransferBlocks = blocks;
136 
137  transferDone = false;
138  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
139  this->transferSize[i] = 0; // must be set via setRawTransferBytes()
140  }
141 
142  int headerBaseOffset = sizeof(HeaderPreamble);
143 
144  transferHeaderData = &data[-headerBaseOffset];
145  HeaderPreamble* ourHeader = reinterpret_cast<HeaderPreamble*>(transferHeaderData);
146 
147  unsigned short netHeaderSize = htons(static_cast<unsigned short>(headerSize));
148  ourHeader->netHeaderSize = netHeaderSize;
149  ourHeader->netTransferSizeDummy = htonl(-1); // clashes on purpose with old recipients
150  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
151  ourHeader->netTransferSizes[i] = 0;
152  }
153 
154  headerSize += headerBaseOffset;
155 
156  if(protType == PROTOCOL_UDP) {
157  // In UDP mode we still need to make this a control message
158  transferHeaderData[headerSize++] = HEADER_MESSAGE;
159  transferHeaderData[headerSize++] = 0xFF;
160  transferHeaderData[headerSize++] = 0xFF;
161  transferHeaderData[headerSize++] = 0xFF;
162  transferHeaderData[headerSize++] = 0xFF;
163  }
164 
165  transferHeaderSize = headerSize;
166 }
167 
168 void DataBlockProtocol::setTransferData(int block, unsigned char* data, int validBytes) {
169  if(transferHeaderSize == 0 || transferHeaderData == nullptr) {
170  throw ProtocolException("The transfer header has not yet been set!");
171  }
172 
173  transferDone = false;
174  rawDataArr[block] = data;
175  transferOffset[block] = 0;
176  overwrittenTransferIndex = -1;
177  overwrittenTransferBlock = -1;
178  rawValidBytes[block] = min(transferSize[block], validBytes);
179  totalBytesCompleted = 0;
180 }
181 
182 void DataBlockProtocol::setTransferValidBytes(int block, int validBytes) {
183  if(validBytes >= transferSize[block]) {
184  rawValidBytes[block] = transferSize[block];
185  } else if(validBytes < static_cast<int>(sizeof(int))) {
186  rawValidBytes[block] = 0;
187  } else {
188  rawValidBytes[block] = validBytes;
189  }
190 }
191 
192 std::string DataBlockProtocol::statusReport() {
193  std::stringstream ss;
194  ss << "DataBlockProtocol, blocks=" << numTransferBlocks << ": ";
195  for (int i=0; i<numTransferBlocks; ++i) {
196  ss << i << ":(len " << transferSize[i] << " ofs " << transferOffset[i] << " rawvalid " << rawValidBytes[i] << ") ";
197  }
198  ss << " total done: " << totalBytesCompleted << "/" << totalTransferSize;
199  return ss.str();
200 }
201 
202 const unsigned char* DataBlockProtocol::getTransferMessage(int& length) {
203  if(transferDone) {
204  // No more data to be transferred
205  length = 0;
206  return nullptr;
207  }
208  for (int i=0; i<numTransferBlocks; ++i) {
209  if (rawValidBytes[i] == 0) {
210  length = 0;
211  return nullptr;
212  }
213  }
214 
215 
216  // For TCP we always send the header first
217  if(protType == PROTOCOL_TCP && transferHeaderData != nullptr) {
218  length = transferHeaderSize;
219  const unsigned char* ret = transferHeaderData;
220  transferHeaderData = nullptr;
221  return ret;
222  }
223 
224  // The transfer buffer might have been altered by the previous transfer
225  // and first needs to be restored
226  restoreTransferBuffer();
227 
228  // Determine which data segment to transfer next
229  int block = -1, offset = -1;
230  getNextTransferSegment(block, offset, length);
231  if(length == 0) {
232  return nullptr;
233  }
234 
235  if(protType == PROTOCOL_UDP) {
236  // For udp, we always append a segment offset
237  overwrittenTransferBlock = block;
238  overwrittenTransferIndex = offset + length;
239  SegmentHeaderUDP* segmentHeader = reinterpret_cast<SegmentHeaderUDP*>(&rawDataArr[block][offset + length]);
240  std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderUDP));
241  segmentHeader->segmentOffset = static_cast<int>(htonl(mergeRawOffset(block, offset)));
242  length += sizeof(SegmentHeaderUDP);
243  lastTransmittedBlock = block;
244  return &rawDataArr[block][offset];
245  } else {
246  // For tcp, we *PRE*pend the segment header consisting of segment offset plus the packet payload size
247  int headerOffset = offset - sizeof(SegmentHeaderTCP);
248 
249  SegmentHeaderTCP* segmentHeader = nullptr;
250  unsigned char* dataPointer = nullptr;
251 
252  if(headerOffset < 0) {
253  // For the first TCP transfer we need to copy the data as we cannot
254  // prepend before the data start
255  static unsigned char tcpBuffer[MAX_TCP_BYTES_TRANSFER];
256  dataPointer = tcpBuffer;
257  segmentHeader = reinterpret_cast<SegmentHeaderTCP*>(tcpBuffer);
258  std::memcpy(&tcpBuffer[sizeof(segmentHeader)], &rawDataArr[block][offset], length);
259  } else {
260  // For subsequent calls we will overwrite the segment header data and
261  // restore it
262  dataPointer = &rawDataArr[block][headerOffset];
263  segmentHeader = reinterpret_cast<SegmentHeaderTCP*>(&rawDataArr[block][headerOffset]);
264  overwrittenTransferBlock = block;
265  overwrittenTransferIndex = headerOffset;
266  std::memcpy(overwrittenTransferData, segmentHeader, sizeof(SegmentHeaderTCP));
267  }
268 
269  segmentHeader->fragmentSize = htonl(length);
270  segmentHeader->segmentOffset = static_cast<int>(htonl(mergeRawOffset(block, offset)));
271  length += sizeof(SegmentHeaderTCP);
272  lastTransmittedBlock = block;
273  return dataPointer;
274  }
275 }
276 
277 void DataBlockProtocol::getNextTransferSegment(int& block, int& offset, int& length) {
278  if(missingTransferSegments.size() == 0) {
279  // Select from block with the most unsent data
280  int sendBlock = 0, amount = 0;
281  for (int i=0; i<numTransferBlocks; ++i) {
282  int avail = std::min(transferSize[i], rawValidBytes[i]);
283  avail -= transferOffset[i];
284  if (avail > amount) {
285  amount = avail;
286  sendBlock = i;
287  }
288  }
289  length = std::min(maxPayloadSize, amount);
290  if(length == 0 || (length < minPayloadSize && rawValidBytes[sendBlock] != transferSize[sendBlock])) {
291  length = 0;
292  return;
293  }
294 
295  block = sendBlock;
296  offset = transferOffset[sendBlock];
297  transferOffset[sendBlock] += length; // for next transfer
298  if (protType == PROTOCOL_UDP) {
299  bool complete = true;
300  for (int i=0; i<numTransferBlocks; ++i) {
301  if (transferOffset[i] < transferSize[i]) {
302  complete = false;
303  break;
304  }
305  }
306  if (complete) {
307  eofMessagePending = true;
308  }
309  }
310  } else {
311  // This is a segment that is re-transmitted due to packet loss
312  splitRawOffset(missingTransferSegments.front().first, block, offset);
313  length = std::min(maxPayloadSize, missingTransferSegments.front().second);
314  LOG_DEBUG_DBP("Re-transmitting: " << offset << " - " << (offset + length));
315 
316  int remaining = missingTransferSegments[0].second - length;
317  if(remaining == 0) {
318  // The segment is competed
319  missingTransferSegments.pop_front();
320  } else {
321  // The segment is only partially complete
322  missingTransferSegments.front().first += length;
323  missingTransferSegments.front().second = remaining;
324  }
325  }
326 }
327 
328 void DataBlockProtocol::restoreTransferBuffer() {
329  if(overwrittenTransferBlock >= 0) {
330  if(protType == PROTOCOL_UDP) {
331  std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderUDP));
332  } else {
333  std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData, sizeof(SegmentHeaderTCP));
334  }
335  }
336  overwrittenTransferIndex = -1;
337  overwrittenTransferBlock = -1;
338 }
339 
341  for (int i=0; i<numTransferBlocks; ++i) {
342  if (transferOffset[i] < transferSize[i]) return false;
343  }
344  return !eofMessagePending;
345 }
346 
348  if(protType == PROTOCOL_TCP) {
349  return MAX_TCP_BYTES_TRANSFER;
350  } else {
351  return MAX_UDP_RECEPTION;
352  }
353 }
354 
355 unsigned char* DataBlockProtocol::getNextReceiveBuffer(int maxLength) {
356  if(receiveOffset + maxLength > (int)receiveBuffer.size()) {
357  receiveBuffer.resize(receiveOffset + maxLength);
358  }
359  return &receiveBuffer[receiveOffset];
360 }
361 
362 void DataBlockProtocol::processReceivedMessage(int length, bool& transferComplete) {
363  transferComplete = false;
364  if(length <= 0) {
365  return; // Nothing received
366  }
367 
368  if(finishedReception) {
369  // First reset for next frame
370  resetReception(false);
371  }
372 
373  // Track last successful reception, UDP client tracks this
374  // (lastRemoteHostActivity also includes sent data)
375  lastReceivedAnything = std::chrono::steady_clock::now();
376 
377  if(protType == PROTOCOL_UDP) {
378  processReceivedUdpMessage(length, transferComplete);
379  } else {
380  processReceivedTcpMessage(length, transferComplete);
381  }
382 
383  transferComplete = finishedReception;
384 }
385 
386 void DataBlockProtocol::processReceivedUdpMessage(int length, bool& transferComplete) {
387  if(length < static_cast<int>(sizeof(int)) ||
388  0 + length > static_cast<int>(receiveBuffer.size())) {
389  throw ProtocolException("Received message size is invalid!");
390  }
391 
392  // Extract the sequence number
393  int rawSegmentOffset = ntohl(*reinterpret_cast<int*>(
394  &receiveBuffer[0 + length - sizeof(int)]));
395  // for holding the offset with blanked-out channel index
396  int dataBlockID, segmentOffset;
397  splitRawOffset(rawSegmentOffset, dataBlockID, segmentOffset);
398 
399  if(rawSegmentOffset == static_cast<int>(0xFFFFFFFF)) {
400  // This is a control packet
401  processControlMessage(length);
402  } else if(headerReceived) {
403  // Correct the length by subtracting the size of the segment offset
404  int realPayloadOffset = 0;
405  int payloadLength = length - sizeof(int);
406 
407  if(segmentOffset != blockReceiveOffsets[dataBlockID]) {
408  // The segment offset doesn't match what we expected. Probably
409  // a packet was dropped
410  if(!waitingForMissingSegments && //receiveOffset > 0 &&
411  segmentOffset > blockReceiveOffsets[dataBlockID]
412  && segmentOffset + payloadLength <= (int)blockReceiveBuffers[dataBlockID].size()) {
413  // We can just ask for a retransmission of this packet
414  LOG_DEBUG_DBP("Missing segment: " << dataBlockID << " size " << payloadLength << " ofs " << segmentOffset
415  << " but blkRecvOfs " << blockReceiveOffsets[dataBlockID]
416  << " (# " << missingReceiveSegments[dataBlockID].size() << ")");
417 
418  MissingReceiveSegment missingSeg;
419  missingSeg.offset = mergeRawOffset(dataBlockID, blockReceiveOffsets[dataBlockID]);
420  missingSeg.length = segmentOffset - blockReceiveOffsets[dataBlockID];
421  missingSeg.isEof = false;
422  lostSegmentBytes += missingSeg.length;
423  missingReceiveSegments[dataBlockID].push_back(missingSeg);
424 
425  // Move the received data to the right place in the buffer
426  memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
427  // Advance block receive offset
428  blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
429  } else {
430  // In this case we cannot recover from the packet loss or
431  // we just didn't get the EOF packet and everything is
432  // actually fine
433  resetReception(blockReceiveOffsets[0] > 0);
434  if(segmentOffset > 0 ) {
435  if(blockReceiveOffsets[dataBlockID] > 0) {
436  LOG_DEBUG_DBP("Resend failed!");
437  }
438  return;
439  } else {
440  LOG_DEBUG_DBP("Missed EOF message!");
441  }
442  }
443  } else {
444  if ((realPayloadOffset+payloadLength) > (int)receiveBuffer.size()) {
445  throw ProtocolException("Received out-of-bound data.");
446  }
447 
448  // append to correct block buffer
449  memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
450  // advance the expected next data offset for this block
451  blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
452  if (waitingForMissingSegments) {
453  // segment extends the currently valid region (suspended once we missed out first segment)
454  if ((missingReceiveSegments[dataBlockID].size() == 1) && (missingReceiveSegments[dataBlockID].front().length <= payloadLength)) {
455  // last gap closed by this segment
456  blockValidSize[dataBlockID] = blockReceiveSize[dataBlockID];
457  } else {
458  blockValidSize[dataBlockID] = segmentOffset + payloadLength;
459  }
460  } else if (missingReceiveSegments[dataBlockID].size() == 0) {
461  blockValidSize[dataBlockID] = segmentOffset + payloadLength;
462  }
463  }
464 
465  if(segmentOffset == 0 && dataBlockID == 0) {
466  // This is the beginning of a new frame
467  lastRemoteHostActivity = std::chrono::steady_clock::now();
468  }
469 
470  // Try to fill missing regions
471  integrateMissingUdpSegments(dataBlockID, segmentOffset, payloadLength);
472  }
473 }
474 
475 void DataBlockProtocol::integrateMissingUdpSegments(int block, int lastSegmentOffset, int lastSegmentSize) {
476  if(waitingForMissingSegments && missingReceiveSegments[block].size() > 0) {
477  // Things get more complicated when re-transmitting dropped packets
478  int checkBlock, checkOffset;
479  MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
480  splitRawOffset(firstSeg.offset, checkBlock, checkOffset);
481  if((lastSegmentOffset != checkOffset) || (block != checkBlock)) {
482  LOG_DEBUG_DBP("Received invalid resend: " << block << " " << lastSegmentOffset);
483  resetReception(true);
484  } else {
485  firstSeg.offset += lastSegmentSize;
486  firstSeg.length -= lastSegmentSize;
487  if(firstSeg.length == 0) {
488  missingReceiveSegments[block].pop_front();
489  }
490 
491  // Check if ALL missing blocks are now handled
492  bool done = true;
493  for (int blk=0; blk<numReceptionBlocks; ++blk) {
494  if(missingReceiveSegments[blk].size() > 0) {
495  done = false;
496  break;
497  }
498  }
499  if (done) {
500  waitingForMissingSegments = false;
501  finishedReception = true;
502  } else if (missingReceiveSegments[block].size() > 0) {
503  // Another lost segment
504  int newBlock, newOffset;
505  MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
506  splitRawOffset(firstSeg.offset, newBlock, newOffset);
507  blockReceiveOffsets[block] = newOffset;
508  }
509  }
510  }
511 }
512 
513 void DataBlockProtocol::processReceivedTcpMessage(int length, bool& transferComplete) {
514  // In TCP mode the header must be the first data item to be transmitted
515  if(!headerReceived) {
516  int totalHeaderSize = parseReceivedHeader(length, 0);
517  if(totalHeaderSize == 0) {
518  // Not yet enough data. Keep on buffering.
519  receiveOffset += length; // append in next recv
520  return;
521  } else {
522  // Header successfully parsed
523  // Move the remaining data to the beginning of the buffer
524  length -= totalHeaderSize;
525  // The rest is the first [part of] buffer segment data
526 
527  if(length == 0) {
528  return; // No more data remaining
529  }
530 
531  int movelength = receiveOffset + length; // also move the old stuff
532  ::memmove(&receiveBuffer[0], &receiveBuffer[totalHeaderSize], movelength);
533  receiveOffset = movelength; // append in next recv
534  }
535  } else {
536  receiveOffset += length; // modified below if complete chunks are present
537  }
538 
539  if (legacyTransfer) {
540  // Legacy TCP transfer: no segment headers, just raw data for block 0, up to the expected size
541  int remainingSize = blockReceiveSize[0] - blockValidSize[0];
542  int availableSize = std::min(receiveOffset, remainingSize);
543  // Update actual target buffer
544  std::memcpy(&blockReceiveBuffers[0][blockReceiveOffsets[0]], &receiveBuffer[0], availableSize);
545  blockReceiveOffsets[0] += availableSize;
546  blockValidSize[0] = blockReceiveOffsets[0];
547  // Extra data, store at buffer start for next reception to append to
548  if (receiveOffset <= remainingSize) {
549  // Start next reception at recv buffer start
550  receiveOffset = 0;
551  } else {
552  // Mark next reception to append to unhandled data remainder
553  std::memmove(&receiveBuffer[0], &receiveBuffer[remainingSize], availableSize - remainingSize);
554  receiveOffset = availableSize - remainingSize;
555  }
556  } else {
557  // Parse the SegmentHeaderTCP (if present) to see if a full fragment is present
558  int ofs = 0;
559  while ((receiveOffset - ofs) >= (int) sizeof(SegmentHeaderTCP)) {
560  SegmentHeaderTCP* header = reinterpret_cast<SegmentHeaderTCP*>(&receiveBuffer[ofs]);
561  int fragsize = ntohl(header->fragmentSize);
562  int rawSegmentOffset = ntohl(header->segmentOffset);
563  int block, offset;
564  splitRawOffset(rawSegmentOffset, block, offset);
565  if (block == 7) { // Block 7 is reserved; control message (the next header), stop moving image data
566  break;
567  }
568  if ((receiveOffset - ofs) >= (fragsize + (int) sizeof(SegmentHeaderTCP))) {
569  // Incorporate fragment
570  // assert here that offset==blockReceiveOffsets[block]
571  if (offset != blockReceiveOffsets[block]) {
572  throw ProtocolException("Received invalid header!");
573  }
574  std::memcpy(&blockReceiveBuffers[block][blockReceiveOffsets[block]], &receiveBuffer[ofs+sizeof(SegmentHeaderTCP)], fragsize);
575  blockReceiveOffsets[block] += fragsize;
576  blockValidSize[block] = blockReceiveOffsets[block];
577  // Advance to next potential chunk
578  ofs += fragsize + sizeof(SegmentHeaderTCP);
579  } else {
580  // Fragment incomplete, will be appended to in next recv (offset increased above)
581  break;
582  }
583  }
584  if (ofs > 0) {
585  // Move start of next unaccounted-for fragment to start of buffer
586  std::memmove(&receiveBuffer[0], &receiveBuffer[ofs], receiveOffset - ofs);
587  receiveOffset -= ofs; // and shift append position accordingly
588  }
589  }
590 
591  // Determine whether all buffers are filled now
592  bool complete = true;
593  for (int i=0; i<numReceptionBlocks; ++i) {
594  if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
595  complete = false;
596  break;
597  }
598  }
599  finishedReception = complete;
600 
601 }
602 
603 int DataBlockProtocol::parseReceivedHeader(int length, int offset) {
604  int headerExtraBytes = 6; // see below
605 
606  if(length < headerExtraBytes) {
607  return 0;
608  }
609 
610  unsigned short headerSize = ntohs(*reinterpret_cast<unsigned short*>(&receiveBuffer[offset]));
611  if (length < (headerExtraBytes + headerSize)) {
612  return 0;
613  }
614  totalReceiveSize = static_cast<int>(ntohl(*reinterpret_cast<unsigned int*>(&receiveBuffer[offset + 2])));
615 
616  if (totalReceiveSize >= 0) { // old-style single block transfer
617  legacyTransfer = true;
618  headerExtraBytes = 6;
619  numReceptionBlocks = 1; // ONE interleaved buffer
620  blockReceiveSize[0] = totalReceiveSize;
621  } else { // marked -1 for new-style multi block transfer
622  legacyTransfer = false;
623  headerExtraBytes = static_cast<int>(sizeof(HeaderPreamble));
624  HeaderPreamble* header = reinterpret_cast<HeaderPreamble*>(&receiveBuffer[offset]);
625  numReceptionBlocks = 0;
626  totalReceiveSize = 0;
627  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
628  int s = ntohl(header->netTransferSizes[i]);
629  if (s > 0) {
630  blockReceiveSize[i] = s;
631  numReceptionBlocks++;
632  totalReceiveSize += s;
633  } else {
634  // first non-positive payload size signals end of blocks
635  //break;
636  }
637  }
638  }
639 
640  if (numReceptionBlocks==0) throw std::runtime_error("Received a transfer with zero blocks");
641  if (numReceptionBlocks > MAX_DATA_BLOCKS) throw std::runtime_error("Received a transfer with too many blocks");
642 
643  if(headerSize + headerExtraBytes > static_cast<int>(receiveBuffer.size())
644  || totalReceiveSize < 0 || headerSize + headerExtraBytes > length ) {
645  throw ProtocolException("Received invalid header!");
646  }
647 
648  headerReceived = true;
649  receivedHeader.assign(receiveBuffer.begin() + offset + headerExtraBytes,
650  receiveBuffer.begin() + offset + headerSize + headerExtraBytes);
651  resizeReceiveBuffer();
652 
653  return headerSize + headerExtraBytes;
654 }
655 
657  numReceptionBlocks = 0;
658  headerReceived = false;
659  for (int blk = 0; blk<MAX_DATA_BLOCKS; ++blk) {
660  missingReceiveSegments[blk].clear();
661  }
662  receivedHeader.clear();
663  waitingForMissingSegments = false;
664  totalReceiveSize = 0;
665  finishedReception = false;
666  lostSegmentBytes = 0;
667  for (int i=0; i<MAX_DATA_BLOCKS; ++i) {
668  blockReceiveOffsets[i] = 0;
669  blockValidSize[i] = 0;
670  }
671  if(dropped) {
672  droppedReceptions++;
673  }
674 }
675 
676 unsigned char* DataBlockProtocol::getReceivedData(int& length) {
677  length = 0;
678  return &receiveBuffer[0];
679 }
680 
681 unsigned char* DataBlockProtocol::getReceivedHeader(int& length) {
682  if(receivedHeader.size() > 0) {
683  length = static_cast<int>(receivedHeader.size());
684  return &receivedHeader[0];
685  } else {
686  return nullptr;
687  }
688 }
689 
690 bool DataBlockProtocol::processControlMessage(int length) {
691  if(length < static_cast<int>(sizeof(int) + 1)) {
692  return false;
693  }
694 
695  int payloadLength = length - sizeof(int) - 1;
696  switch(receiveBuffer[0 + payloadLength]) {
697  case CONFIRM_MESSAGE:
698  // Our connection request has been accepted
699  connectionConfirmed = true;
700  // Disregard heartbeats from repeated acks for protocol upgrade (esp. after reconnection)
701  heartbeatKnockCount = 0;
702  break;
703  case CONNECTION_MESSAGE:
704  // We establish a new connection
705  connectionConfirmed = true;
706  confirmationMessagePending = true;
707  clientConnectionPending = true;
708  extendedConnectionStateProtocol = false;
709 
710  // A connection request is just as good as a heartbeat
711  lastReceivedHeartbeat = std::chrono::steady_clock::now();
712  break;
713  case HEADER_MESSAGE: {
714  if (anyPayloadReceived()) {
715  if (allBlocksDone()) {
716  LOG_DEBUG_DBP("No EOF message received!");
717  } else {
718  LOG_DEBUG_DBP("Received header too late/early!");
719  }
720  resetReception(true);
721  }
722  if(parseReceivedHeader(payloadLength, 0) == 0) {
723  throw ProtocolException("Received header is too short!");
724  }
725  }
726  break;
727  case EOF_MESSAGE:
728  // This is the end of the frame
729  if(anyPayloadReceived()) {
730  parseEofMessage(length);
731  }
732  break;
733  case RESEND_MESSAGE: {
734  // The client requested retransmission of missing packets
735  parseResendMessage(payloadLength);
736  break;
737  }
738  case HEARTBEAT_MESSAGE:
739  {
740  auto now = std::chrono::steady_clock::now();
741  auto elapsedSinceLast = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastReceivedHeartbeat).count();
742  if (elapsedSinceLast < 200) { // about 3 knocks within 0.5 sec
743  heartbeatKnockCount++;
744  if (heartbeatKnockCount >= 3) {
745  if (!extendedConnectionStateProtocol) {
746  // Confirmed extended protocol
747  extendedConnectionStateProtocol = true;
748  if (!isServer) {
749  //std::cout << "Sending back five knocks" << std::endl;
750  // Client replies with a knock sequence as well
751  heartbeatRepliesQueued = 5;
752  }
753  }
754  }
755  } else {
756  heartbeatKnockCount = 0;
757  }
758  // A cyclic heartbeat message
759  lastReceivedHeartbeat = now;
760  if (isServer && (!heartbeatKnockCount)) {
761  // An isolated heartbeat ping from the client.
762  // We send back a 'pong' that UDP clients expect so
763  // they can ascertain when a connection is interrupted
764  heartbeatRepliesQueued = 1;
765  }
766  break;
767  }
768  case DISCONNECTION_MESSAGE:
769  if (isServer) {
770  // Disconnection event in server sent by client: orderly disconnect (instead of heartbeat timeout)
771  connectionConfirmed = false; // => isConnected()==false
772  } else {
773  // Disconnection event in client sent by server: rejected!
774  throw ConnectionClosedException("Device is already connected to another client");
775  }
776  break;
777  default:
778  throw ProtocolException("Received invalid control message!");
779  break;
780  }
781 
782  return true;
783 }
784 
786  if(protType == PROTOCOL_TCP) {
787  // Connection is handled by TCP and not by us
788  return true;
789  } else if(connectionConfirmed) {
790  auto now = std::chrono::steady_clock::now();
791  if (isServer) {
792  // Original server-side client disconnection heuristic
793  return std::chrono::duration_cast<std::chrono::milliseconds>(
794  now - lastReceivedHeartbeat).count()
795  < 2*HEARTBEAT_INTERVAL_MS;
796  } else {
797  // Client counts any data and has multiplier 3: robust wrt. message timing
798  return (std::chrono::duration_cast<std::chrono::milliseconds>(
799  now - lastReceivedAnything).count()
800  < 3*HEARTBEAT_INTERVAL_MS);
801  }
802  } else return false;
803 }
804 
805 const unsigned char* DataBlockProtocol::getNextControlMessage(int& length) {
806  length = 0;
807 
808  if(protType == PROTOCOL_TCP) {
809  // There are no control messages for TCP
810  return nullptr;
811  }
812 
813  if(confirmationMessagePending) {
814  // Send confirmation message
815  confirmationMessagePending = false;
816  controlMessageBuffer[0] = CONFIRM_MESSAGE;
817  length = 1;
818  } else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
819  std::chrono::steady_clock::now() - lastRemoteHostActivity).count() > RECONNECT_TIMEOUT_MS) {
820  // Send a new connection request
821  controlMessageBuffer[0] = CONNECTION_MESSAGE;
822  length = 1;
823 
824  // Also update time stamps
825  lastRemoteHostActivity = lastSentHeartbeat = std::chrono::steady_clock::now();
826  } else if(transferHeaderData != nullptr && isConnected()) {
827  // We need to send a new protocol header
828  length = transferHeaderSize;
829  const unsigned char* ret = transferHeaderData;
830  transferHeaderData = nullptr;
831  return ret;
832  } else if(eofMessagePending) {
833  // Send end of frame message
834  eofMessagePending = false;
835  unsigned int networkOffset = htonl(mergeRawOffset(lastTransmittedBlock, transferSize[lastTransmittedBlock]));
836  memcpy(&controlMessageBuffer[0], &networkOffset, sizeof(int));
837  controlMessageBuffer[sizeof(int)] = EOF_MESSAGE;
838  length = 5;
839  } else if(resendMessagePending) {
840  // Send a re-send request for missing messages
841  resendMessagePending = false;
842  if(!generateResendRequest(length)) {
843  length = 0;
844  return nullptr;
845  }
846  } else if(heartbeatRepliesQueued ||
847  (!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
848  std::chrono::steady_clock::now() - lastSentHeartbeat).count() > HEARTBEAT_INTERVAL_MS)) {
849  // Send a heartbeat message
850  //std::cout << "Queued a heartbeat, " << heartbeatRepliesQueued << std::endl;
851  controlMessageBuffer[0] = HEARTBEAT_MESSAGE;
852  length = 1;
853  lastSentHeartbeat = std::chrono::steady_clock::now();
854  if (heartbeatRepliesQueued > 0) heartbeatRepliesQueued--; // Replied with 'pong' or part of knock sequence
855  } else {
856  return nullptr;
857  }
858 
859  // Mark this message as a control message
860  controlMessageBuffer[length++] = 0xff;
861  controlMessageBuffer[length++] = 0xff;
862  controlMessageBuffer[length++] = 0xff;
863  controlMessageBuffer[length++] = 0xff;
864  return controlMessageBuffer;
865 }
866 
868  if(clientConnectionPending) {
869  clientConnectionPending = false;
870  return true;
871  } else {
872  return false;
873  }
874 }
875 
876 bool DataBlockProtocol::generateResendRequest(int& length) {
877  length = 0;
878  for (int blk = 0; blk < numReceptionBlocks; ++blk) {
879  for(MissingReceiveSegment segment: missingReceiveSegments[blk]) {
880  unsigned int segOffset = htonl(static_cast<unsigned int>(segment.offset));
881  unsigned int segLen = htonl(static_cast<unsigned int>(segment.length));
882 
883  if (sizeof(controlMessageBuffer) < length + 2*sizeof(unsigned int) + 4) {
884  // Too many UDP resend segments for control buffer, dropping the frame!
885  resetReception(true);
886  break;
887  }
888 
889  memcpy(&controlMessageBuffer[length], &segOffset, sizeof(segOffset));
890  length += sizeof(unsigned int);
891  memcpy(&controlMessageBuffer[length], &segLen, sizeof(segLen));
892  length += sizeof(unsigned int);
893 
894  int dbgBlk, dbgOfs;
895  splitRawOffset(segment.offset, dbgBlk, dbgOfs);
896  LOG_DEBUG_DBP("Req missing " << dbgBlk << " " << dbgOfs << " " << segment.length);
897  }
898  }
899 
900  if(length + sizeof(int) + 1 > sizeof(controlMessageBuffer)) {
901  return false;
902  }
903 
904  controlMessageBuffer[length++] = RESEND_MESSAGE;
905 
906  return true;
907 }
908 
909 void DataBlockProtocol::parseResendMessage(int length) {
910  missingTransferSegments.clear();
911 
912  int num = length / (sizeof(unsigned int) + sizeof(unsigned int));
913  int bufferOffset = 0;
914 
915  for(int i=0; i<num; i++) {
916  unsigned int segOffsetNet = *reinterpret_cast<unsigned int*>(&receiveBuffer[bufferOffset]);
917  bufferOffset += sizeof(unsigned int);
918  unsigned int segLenNet = *reinterpret_cast<unsigned int*>(&receiveBuffer[bufferOffset]);
919  bufferOffset += sizeof(unsigned int);
920 
921  int segmentOffsetRaw = static_cast<int>(ntohl(segOffsetNet)); // with block ID
922  int segmentLength = static_cast<int>(ntohl(segLenNet));
923  int dataBlockID, segmentOffset;
924  splitRawOffset(segmentOffsetRaw, dataBlockID, segmentOffset);
925 
926  if(segmentOffset >= 0 && segmentLength > 0 && (segmentOffset + segmentLength) <= rawValidBytes[dataBlockID]) {
927  missingTransferSegments.push_back(std::pair<int, int>(
928  segmentOffsetRaw, segmentLength));
929  }
930 
931  }
932 }
933 
934 void DataBlockProtocol::parseEofMessage(int length) {
935 
936  completedReceptions++;
937  lostSegmentRate = (lostSegmentRate * (completedReceptions-1) + ((double) lostSegmentBytes) / totalReceiveSize) / completedReceptions;
938  LOG_DEBUG_DBP("Lost segment rate: " << lostSegmentRate);
939  if(length >= 4) {
940  // Find all missing segments at the end of blocks
941  for (int i=0; i<numReceptionBlocks; ++i) {
942  if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
943  MissingReceiveSegment missingSeg;
944  missingSeg.offset = mergeRawOffset(i, blockReceiveOffsets[i]);
945  missingSeg.length = blockReceiveSize[i] - blockReceiveOffsets[i];
946  missingSeg.isEof = true;
947  missingReceiveSegments[i].push_back(missingSeg);
948  lostSegmentBytes += missingSeg.length;
949  }
950  }
951  for (int blk=0; blk<numReceptionBlocks; ++blk) {
952  if(missingReceiveSegments[blk].size() > 0) {
953  waitingForMissingSegments = true;
954  resendMessagePending = true;
955  // Initialize all missing block start indices with earliest missing address
956  int mblock, moffset;
957  for (int i=0; i<static_cast<int>(missingReceiveSegments[blk].size()); ++i) {
958  splitRawOffset(missingReceiveSegments[blk][i].offset, mblock, moffset);
959  if (moffset < blockReceiveOffsets[mblock]) {
960  blockReceiveOffsets[mblock] = moffset;
961  }
962  }
963  }
964  }
965  if (!resendMessagePending) {
966  finishedReception = true;
967  }
968  } else {
969  LOG_DEBUG_DBP("EOF message too short, length " << length);
970  }
971 }
972 
973 void DataBlockProtocol::resizeReceiveBuffer() {
974  if(totalReceiveSize < 0) {
975  throw ProtocolException("Received invalid transfer size!");
976  }
977 
978  // We increase the requested size to allow for one
979  // additional network message and the protocol overhead
980  int bufferSize = 2*getMaxReceptionSize()
981  + MAX_OUTSTANDING_BYTES + sizeof(int);
982 
983  // Resize the buffer
984  if(static_cast<int>(receiveBuffer.size()) < bufferSize) {
985  receiveBuffer.resize(bufferSize);
986  }
987 
988  for (int i=0; i<numReceptionBlocks; ++i) {
989  if (static_cast<int>(blockReceiveBuffers[i].size()) < blockReceiveSize[i]) {
990  blockReceiveBuffers[i].resize(blockReceiveSize[i]);
991  }
992  }
993 }
994 
995 // static
996 void DataBlockProtocol::getDisconnectionMessage(const unsigned char* &buf, int &sz) {
997  // A single disconnection message in the correct control message UDP wire format.
998  // A UDP client should send this in its connection teardown to signal to the server
999  // its intent to disconnect, so that the heartbeat timeout can be cut short.
1000  // This is also the only message sent back to a new UDP client that connects to a
1001  // busy server, explicitly rejecting the interfering client.
1002  static const unsigned char DISCONNECTION_MESSAGE_BUFFER[] = { DISCONNECTION_MESSAGE, 0xff, 0xff, 0xff, 0xff };
1003  buf = DISCONNECTION_MESSAGE_BUFFER;
1004  sz = sizeof(DISCONNECTION_MESSAGE_BUFFER);
1005 }
1006 
1007 // static
1008 void DataBlockProtocol::getHeartbeatMessage(const unsigned char* &buf, int &sz) {
1009  // A single heartbeat message in the correct control message UDP wire format.
1010  // The server sends a few on connection to signal the new protocol. This is because
1011  // otherwise control messages are only sent with an image, which may not be
1012  // forthcoming with external triggering.
1013  static const unsigned char HEARTBEAT_MESSAGE_BUFFER[] = { HEARTBEAT_MESSAGE, 0xff, 0xff, 0xff, 0xff };
1014  buf = HEARTBEAT_MESSAGE_BUFFER;
1015  sz = sizeof(HEARTBEAT_MESSAGE_BUFFER);
1016 }
1017 
1018 }} // namespace
1019 
visiontransfer::internal::DataBlockProtocol::setTransferData
void setTransferData(int block, unsigned char *data, int validBytes=0x7FFFFFFF)
Sets the payload data for the next transfer.
Definition: datablockprotocol.cpp:168
visiontransfer::internal::DataBlockProtocol::getMaxReceptionSize
int getMaxReceptionSize() const
Returns the maximum payload size that can be received.
Definition: datablockprotocol.cpp:347
visiontransfer::internal::DataBlockProtocol::transferComplete
bool transferComplete()
Returns true if the current transfer has been completed.
Definition: datablockprotocol.cpp:340
visiontransfer::internal::DataBlockProtocol::resetReception
void resetReception(bool dropped)
Resets the message reception.
Definition: datablockprotocol.cpp:656
visiontransfer::internal::DataBlockProtocol::HeaderPreamble
Definition: datablockprotocol.h:100
visiontransfer::internal::DataBlockProtocol::getNextReceiveBuffer
unsigned char * getNextReceiveBuffer(int maxLength)
Gets a buffer for receiving the next network message.
Definition: datablockprotocol.cpp:355
visiontransfer::internal::DataBlockProtocol::getTransferMessage
const unsigned char * getTransferMessage(int &length)
Gets the next network message for the current transfer.
Definition: datablockprotocol.cpp:202
visiontransfer::internal::DataBlockProtocol::SegmentHeaderTCP
Definition: datablockprotocol.h:108
visiontransfer::internal::DataBlockProtocol::isConnected
bool isConnected() const
Returns true if a remote connection is established.
Definition: datablockprotocol.cpp:785
visiontransfer::internal::DataBlockProtocol::getReceivedData
unsigned char * getReceivedData(int &length)
Returns the data that has been received for the current transfer.
Definition: datablockprotocol.cpp:676
visiontransfer::internal::DataBlockProtocol::setTransferBytes
void setTransferBytes(int block, long bytes)
Sets the per-block transfer size.
Definition: datablockprotocol.cpp:115
visiontransfer::internal::DataBlockProtocol::getReceivedHeader
unsigned char * getReceivedHeader(int &length)
Returns the header data that has been received for the current transfer.
Definition: datablockprotocol.cpp:681
visiontransfer::internal::DataBlockProtocol::newClientConnected
bool newClientConnected()
Returns true if the last network message has established a new connection from a client.
Definition: datablockprotocol.cpp:867
visiontransfer::internal::DataBlockProtocol::processReceivedMessage
void processReceivedMessage(int length, bool &transferComplete)
Handles a received network message.
Definition: datablockprotocol.cpp:362
visiontransfer::ProtocolException
Exception class that is used for all protocol exceptions.
Definition: exceptions.h:37
visiontransfer::ConnectionClosedException
Exception class that is used for a busy server terminating a connection.
Definition: exceptions.h:53
visiontransfer::internal::DataBlockProtocol::resetTransfer
void resetTransfer()
Resets all transfer related internal variables.
Definition: datablockprotocol.cpp:105
visiontransfer::internal::DataBlockProtocol::SegmentHeaderUDP
Definition: datablockprotocol.h:105
visiontransfer::internal::DataBlockProtocol::setTransferValidBytes
void setTransferValidBytes(int block, int validBytes)
Updates the number of valid bytes in a partial transfer.
Definition: datablockprotocol.cpp:182
visiontransfer::internal::DataBlockProtocol::setTransferHeader
void setTransferHeader(unsigned char *data, int headerSize, int blocks)
Sets a user-defined header that shall be transmitted with the next transfer.
Definition: datablockprotocol.cpp:126
visiontransfer::internal::DataBlockProtocol::getNextControlMessage
const unsigned char * getNextControlMessage(int &length)
If a control message is pending to be transmitted, then the message data will be returned by this met...
Definition: datablockprotocol.cpp:805
Allied Vision