22 #include "visiontransfer/internal/datablockprotocol.h"
23 #include "visiontransfer/exceptions.h"
31 #include <arpa/inet.h>
34 #define LOG_DEBUG_DBP(expr)
38 using namespace visiontransfer;
39 using namespace visiontransfer::internal;
41 namespace visiontransfer {
44 DataBlockProtocol::DataBlockProtocol(
bool server, ProtocolType protType,
int maxUdpPacketSize)
45 : isServer(server), protType(protType),
47 overwrittenTransferData{0},
48 overwrittenTransferIndex{-1},
49 overwrittenTransferBlock{-1},
50 transferHeaderData{
nullptr},
51 transferHeaderSize{0},
52 totalBytesCompleted{0}, totalTransferSize{0},
53 waitingForMissingSegments(
false),
54 totalReceiveSize(0), connectionConfirmed(
false),
55 confirmationMessagePending(
false), eofMessagePending(
false),
56 clientConnectionPending(
false), resendMessagePending(
false),
57 lastRemoteHostActivity(), lastSentHeartbeat(),
58 lastReceivedHeartbeat(std::chrono::steady_clock::now()),
59 lastReceivedAnything(std::chrono::steady_clock::now()),
60 heartbeatKnockCount(0),
61 heartbeatRepliesQueued(0),
62 extendedConnectionStateProtocol(
false),
63 finishedReception(
false), droppedReceptions(0),
64 completedReceptions(0), lostSegmentRate(0.0), lostSegmentBytes(0),
65 unprocessedMsgLength(0), headerReceived(
false) {
67 if(protType == PROTOCOL_TCP) {
68 maxPayloadSize = MAX_TCP_BYTES_TRANSFER -
sizeof(SegmentHeaderTCP);
71 maxPayloadSize = maxUdpPacketSize -
sizeof(SegmentHeaderUDP);
72 minPayloadSize = maxPayloadSize;
75 resizeReceiveBuffer();
76 resetReception(
false);
78 void DataBlockProtocol::splitRawOffset(
int rawSegmentOffset,
int& dataBlockID,
int& segmentOffset) {
79 int selector = (rawSegmentOffset >> 28) & 0xf;
80 dataBlockID = selector & 0x7;
81 segmentOffset = rawSegmentOffset & 0x0FFFffff;
84 int DataBlockProtocol::mergeRawOffset(
int dataBlockID,
int segmentOffset,
int reserved_defaults0) {
85 return ((reserved_defaults0 & 1) << 31) | ((dataBlockID & 0x07) << 28) | (segmentOffset & 0x0FFFffff);
88 void DataBlockProtocol::zeroStructures() {
89 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
90 rawDataArr[i] =
nullptr;
91 rawDataArrStrideHackOrig[i] = 0;
92 rawDataArrStrideHackRepl[i] = 0;
94 transferOffset[i] = 0;
97 std::memset(overwrittenTransferData, 0,
sizeof(overwrittenTransferData));
98 overwrittenTransferIndex = -1;
99 overwrittenTransferBlock = -1;
100 lastTransmittedBlock = -1;
102 numReceptionBlocks = 0;
107 overwrittenTransferIndex = -1;
108 overwrittenTransferBlock = -1;
109 totalBytesCompleted = 0;
110 totalTransferSize = 0;
111 numTransferBlocks = 0;
112 missingTransferSegments.clear();
116 if (transferHeaderData ==
nullptr) {
117 throw ProtocolException(
"Tried to set data block size before initializing header!");
118 }
else if (block >= numTransferBlocks) {
119 throw ProtocolException(
"Request to set data block size - block index too high!");
121 transferSize[block] = bytes;
123 hp->netTransferSizes[block] = htonl(bytes);
127 if(!transferDone && numTransferBlocks > 0) {
129 }
else if(headerSize + 9 >
static_cast<int>(
sizeof(controlMessageBuffer))) {
131 }
else if(blocks == 0) {
135 numTransferBlocks = blocks;
137 transferDone =
false;
138 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
139 this->transferSize[i] = 0;
144 transferHeaderData = &data[-headerBaseOffset];
147 unsigned short netHeaderSize = htons(
static_cast<unsigned short>(headerSize));
148 ourHeader->netHeaderSize = netHeaderSize;
149 ourHeader->netTransferSizeDummy = htonl(-1);
150 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
151 ourHeader->netTransferSizes[i] = 0;
154 headerSize += headerBaseOffset;
156 if(protType == PROTOCOL_UDP) {
158 transferHeaderData[headerSize++] = HEADER_MESSAGE;
159 transferHeaderData[headerSize++] = 0xFF;
160 transferHeaderData[headerSize++] = 0xFF;
161 transferHeaderData[headerSize++] = 0xFF;
162 transferHeaderData[headerSize++] = 0xFF;
165 transferHeaderSize = headerSize;
169 if(transferHeaderSize == 0 || transferHeaderData ==
nullptr) {
173 transferDone =
false;
174 rawDataArr[block] = data;
175 transferOffset[block] = 0;
176 overwrittenTransferIndex = -1;
177 overwrittenTransferBlock = -1;
178 rawValidBytes[block] = min(transferSize[block], validBytes);
179 totalBytesCompleted = 0;
183 if(validBytes >= transferSize[block]) {
184 rawValidBytes[block] = transferSize[block];
185 }
else if(validBytes <
static_cast<int>(
sizeof(
int))) {
186 rawValidBytes[block] = 0;
188 rawValidBytes[block] = validBytes;
192 std::string DataBlockProtocol::statusReport() {
193 std::stringstream ss;
194 ss <<
"DataBlockProtocol, blocks=" << numTransferBlocks <<
": ";
195 for (
int i=0; i<numTransferBlocks; ++i) {
196 ss << i <<
":(len " << transferSize[i] <<
" ofs " << transferOffset[i] <<
" rawvalid " << rawValidBytes[i] <<
") ";
198 ss <<
" total done: " << totalBytesCompleted <<
"/" << totalTransferSize;
208 for (
int i=0; i<numTransferBlocks; ++i) {
209 if (rawValidBytes[i] == 0) {
217 if(protType == PROTOCOL_TCP && transferHeaderData !=
nullptr) {
218 length = transferHeaderSize;
219 const unsigned char* ret = transferHeaderData;
220 transferHeaderData =
nullptr;
226 restoreTransferBuffer();
229 int block = -1, offset = -1;
230 getNextTransferSegment(block, offset, length);
235 if(protType == PROTOCOL_UDP) {
237 overwrittenTransferBlock = block;
238 overwrittenTransferIndex = offset + length;
240 std::memcpy(overwrittenTransferData, segmentHeader,
sizeof(
SegmentHeaderUDP));
241 segmentHeader->segmentOffset =
static_cast<int>(htonl(mergeRawOffset(block, offset)));
243 lastTransmittedBlock = block;
244 return &rawDataArr[block][offset];
250 unsigned char* dataPointer =
nullptr;
252 if(headerOffset < 0) {
255 static unsigned char tcpBuffer[MAX_TCP_BYTES_TRANSFER];
256 dataPointer = tcpBuffer;
258 std::memcpy(&tcpBuffer[
sizeof(segmentHeader)], &rawDataArr[block][offset], length);
262 dataPointer = &rawDataArr[block][headerOffset];
263 segmentHeader =
reinterpret_cast<SegmentHeaderTCP*
>(&rawDataArr[block][headerOffset]);
264 overwrittenTransferBlock = block;
265 overwrittenTransferIndex = headerOffset;
266 std::memcpy(overwrittenTransferData, segmentHeader,
sizeof(
SegmentHeaderTCP));
269 segmentHeader->fragmentSize = htonl(length);
270 segmentHeader->segmentOffset =
static_cast<int>(htonl(mergeRawOffset(block, offset)));
272 lastTransmittedBlock = block;
277 void DataBlockProtocol::getNextTransferSegment(
int& block,
int& offset,
int& length) {
278 if(missingTransferSegments.size() == 0) {
280 int sendBlock = 0, amount = 0;
281 for (
int i=0; i<numTransferBlocks; ++i) {
282 int avail = std::min(transferSize[i], rawValidBytes[i]);
283 avail -= transferOffset[i];
284 if (avail > amount) {
289 length = std::min(maxPayloadSize, amount);
290 if(length == 0 || (length < minPayloadSize && rawValidBytes[sendBlock] != transferSize[sendBlock])) {
296 offset = transferOffset[sendBlock];
297 transferOffset[sendBlock] += length;
298 if (protType == PROTOCOL_UDP) {
299 bool complete =
true;
300 for (
int i=0; i<numTransferBlocks; ++i) {
301 if (transferOffset[i] < transferSize[i]) {
307 eofMessagePending =
true;
312 splitRawOffset(missingTransferSegments.front().first, block, offset);
313 length = std::min(maxPayloadSize, missingTransferSegments.front().second);
314 LOG_DEBUG_DBP(
"Re-transmitting: " << offset <<
" - " << (offset + length));
316 int remaining = missingTransferSegments[0].second - length;
319 missingTransferSegments.pop_front();
322 missingTransferSegments.front().first += length;
323 missingTransferSegments.front().second = remaining;
328 void DataBlockProtocol::restoreTransferBuffer() {
329 if(overwrittenTransferBlock >= 0) {
330 if(protType == PROTOCOL_UDP) {
331 std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData,
sizeof(SegmentHeaderUDP));
333 std::memcpy(&rawDataArr[overwrittenTransferBlock][overwrittenTransferIndex], overwrittenTransferData,
sizeof(SegmentHeaderTCP));
336 overwrittenTransferIndex = -1;
337 overwrittenTransferBlock = -1;
341 for (
int i=0; i<numTransferBlocks; ++i) {
342 if (transferOffset[i] < transferSize[i])
return false;
344 return !eofMessagePending;
348 if(protType == PROTOCOL_TCP) {
349 return MAX_TCP_BYTES_TRANSFER;
351 return MAX_UDP_RECEPTION;
356 if(receiveOffset + maxLength > (
int)receiveBuffer.size()) {
357 receiveBuffer.resize(receiveOffset + maxLength);
359 return &receiveBuffer[receiveOffset];
368 if(finishedReception) {
375 lastReceivedAnything = std::chrono::steady_clock::now();
377 if(protType == PROTOCOL_UDP) {
386 void DataBlockProtocol::processReceivedUdpMessage(
int length,
bool& transferComplete) {
387 if(length <
static_cast<int>(
sizeof(
int)) ||
388 0 + length >
static_cast<int>(receiveBuffer.size())) {
393 int rawSegmentOffset = ntohl(*
reinterpret_cast<int*
>(
394 &receiveBuffer[0 + length -
sizeof(
int)]));
396 int dataBlockID, segmentOffset;
397 splitRawOffset(rawSegmentOffset, dataBlockID, segmentOffset);
399 if(rawSegmentOffset ==
static_cast<int>(0xFFFFFFFF)) {
401 processControlMessage(length);
402 }
else if(headerReceived) {
404 int realPayloadOffset = 0;
405 int payloadLength = length -
sizeof(int);
407 if(segmentOffset != blockReceiveOffsets[dataBlockID]) {
410 if(!waitingForMissingSegments &&
411 segmentOffset > blockReceiveOffsets[dataBlockID]
412 && segmentOffset + payloadLength <= (
int)blockReceiveBuffers[dataBlockID].size()) {
414 LOG_DEBUG_DBP(
"Missing segment: " << dataBlockID <<
" size " << payloadLength <<
" ofs " << segmentOffset
415 <<
" but blkRecvOfs " << blockReceiveOffsets[dataBlockID]
416 <<
" (# " << missingReceiveSegments[dataBlockID].size() <<
")");
418 MissingReceiveSegment missingSeg;
419 missingSeg.offset = mergeRawOffset(dataBlockID, blockReceiveOffsets[dataBlockID]);
420 missingSeg.length = segmentOffset - blockReceiveOffsets[dataBlockID];
421 missingSeg.isEof =
false;
422 lostSegmentBytes += missingSeg.length;
423 missingReceiveSegments[dataBlockID].push_back(missingSeg);
426 memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
428 blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
434 if(segmentOffset > 0 ) {
435 if(blockReceiveOffsets[dataBlockID] > 0) {
436 LOG_DEBUG_DBP(
"Resend failed!");
440 LOG_DEBUG_DBP(
"Missed EOF message!");
444 if ((realPayloadOffset+payloadLength) > (
int)receiveBuffer.size()) {
449 memcpy(&blockReceiveBuffers[dataBlockID][segmentOffset], &receiveBuffer[0 + realPayloadOffset], payloadLength);
451 blockReceiveOffsets[dataBlockID] = segmentOffset + payloadLength;
452 if (waitingForMissingSegments) {
454 if ((missingReceiveSegments[dataBlockID].size() == 1) && (missingReceiveSegments[dataBlockID].front().length <= payloadLength)) {
456 blockValidSize[dataBlockID] = blockReceiveSize[dataBlockID];
458 blockValidSize[dataBlockID] = segmentOffset + payloadLength;
460 }
else if (missingReceiveSegments[dataBlockID].size() == 0) {
461 blockValidSize[dataBlockID] = segmentOffset + payloadLength;
465 if(segmentOffset == 0 && dataBlockID == 0) {
467 lastRemoteHostActivity = std::chrono::steady_clock::now();
471 integrateMissingUdpSegments(dataBlockID, segmentOffset, payloadLength);
475 void DataBlockProtocol::integrateMissingUdpSegments(
int block,
int lastSegmentOffset,
int lastSegmentSize) {
476 if(waitingForMissingSegments && missingReceiveSegments[block].size() > 0) {
478 int checkBlock, checkOffset;
479 MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
480 splitRawOffset(firstSeg.offset, checkBlock, checkOffset);
481 if((lastSegmentOffset != checkOffset) || (block != checkBlock)) {
482 LOG_DEBUG_DBP(
"Received invalid resend: " << block <<
" " << lastSegmentOffset);
485 firstSeg.offset += lastSegmentSize;
486 firstSeg.length -= lastSegmentSize;
487 if(firstSeg.length == 0) {
488 missingReceiveSegments[block].pop_front();
493 for (
int blk=0; blk<numReceptionBlocks; ++blk) {
494 if(missingReceiveSegments[blk].size() > 0) {
500 waitingForMissingSegments =
false;
501 finishedReception =
true;
502 }
else if (missingReceiveSegments[block].size() > 0) {
504 int newBlock, newOffset;
505 MissingReceiveSegment& firstSeg = missingReceiveSegments[block].front();
506 splitRawOffset(firstSeg.offset, newBlock, newOffset);
507 blockReceiveOffsets[block] = newOffset;
513 void DataBlockProtocol::processReceivedTcpMessage(
int length,
bool& transferComplete) {
515 if(!headerReceived) {
516 int totalHeaderSize = parseReceivedHeader(length, 0);
517 if(totalHeaderSize == 0) {
519 receiveOffset += length;
524 length -= totalHeaderSize;
531 int movelength = receiveOffset + length;
532 ::memmove(&receiveBuffer[0], &receiveBuffer[totalHeaderSize], movelength);
533 receiveOffset = movelength;
536 receiveOffset += length;
539 if (legacyTransfer) {
541 int remainingSize = blockReceiveSize[0] - blockValidSize[0];
542 int availableSize = std::min(receiveOffset, remainingSize);
544 std::memcpy(&blockReceiveBuffers[0][blockReceiveOffsets[0]], &receiveBuffer[0], availableSize);
545 blockReceiveOffsets[0] += availableSize;
546 blockValidSize[0] = blockReceiveOffsets[0];
548 if (receiveOffset <= remainingSize) {
553 std::memmove(&receiveBuffer[0], &receiveBuffer[remainingSize], availableSize - remainingSize);
554 receiveOffset = availableSize - remainingSize;
559 while ((receiveOffset - ofs) >= (
int)
sizeof(SegmentHeaderTCP)) {
560 SegmentHeaderTCP* header =
reinterpret_cast<SegmentHeaderTCP*
>(&receiveBuffer[ofs]);
561 int fragsize = ntohl(header->fragmentSize);
562 int rawSegmentOffset = ntohl(header->segmentOffset);
564 splitRawOffset(rawSegmentOffset, block, offset);
568 if ((receiveOffset - ofs) >= (fragsize + (
int)
sizeof(SegmentHeaderTCP))) {
571 if (offset != blockReceiveOffsets[block]) {
574 std::memcpy(&blockReceiveBuffers[block][blockReceiveOffsets[block]], &receiveBuffer[ofs+
sizeof(SegmentHeaderTCP)], fragsize);
575 blockReceiveOffsets[block] += fragsize;
576 blockValidSize[block] = blockReceiveOffsets[block];
578 ofs += fragsize +
sizeof(SegmentHeaderTCP);
586 std::memmove(&receiveBuffer[0], &receiveBuffer[ofs], receiveOffset - ofs);
587 receiveOffset -= ofs;
592 bool complete =
true;
593 for (
int i=0; i<numReceptionBlocks; ++i) {
594 if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
599 finishedReception = complete;
603 int DataBlockProtocol::parseReceivedHeader(
int length,
int offset) {
604 int headerExtraBytes = 6;
606 if(length < headerExtraBytes) {
610 unsigned short headerSize = ntohs(*
reinterpret_cast<unsigned short*
>(&receiveBuffer[offset]));
611 if (length < (headerExtraBytes + headerSize)) {
614 totalReceiveSize =
static_cast<int>(ntohl(*
reinterpret_cast<unsigned int*
>(&receiveBuffer[offset + 2])));
616 if (totalReceiveSize >= 0) {
617 legacyTransfer =
true;
618 headerExtraBytes = 6;
619 numReceptionBlocks = 1;
620 blockReceiveSize[0] = totalReceiveSize;
622 legacyTransfer =
false;
623 headerExtraBytes =
static_cast<int>(
sizeof(HeaderPreamble));
624 HeaderPreamble* header =
reinterpret_cast<HeaderPreamble*
>(&receiveBuffer[offset]);
625 numReceptionBlocks = 0;
626 totalReceiveSize = 0;
627 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
628 int s = ntohl(header->netTransferSizes[i]);
630 blockReceiveSize[i] = s;
631 numReceptionBlocks++;
632 totalReceiveSize += s;
640 if (numReceptionBlocks==0)
throw std::runtime_error(
"Received a transfer with zero blocks");
641 if (numReceptionBlocks > MAX_DATA_BLOCKS)
throw std::runtime_error(
"Received a transfer with too many blocks");
643 if(headerSize + headerExtraBytes >
static_cast<int>(receiveBuffer.size())
644 || totalReceiveSize < 0 || headerSize + headerExtraBytes > length ) {
648 headerReceived =
true;
649 receivedHeader.assign(receiveBuffer.begin() + offset + headerExtraBytes,
650 receiveBuffer.begin() + offset + headerSize + headerExtraBytes);
651 resizeReceiveBuffer();
653 return headerSize + headerExtraBytes;
657 numReceptionBlocks = 0;
658 headerReceived =
false;
659 for (
int blk = 0; blk<MAX_DATA_BLOCKS; ++blk) {
660 missingReceiveSegments[blk].clear();
662 receivedHeader.clear();
663 waitingForMissingSegments =
false;
664 totalReceiveSize = 0;
665 finishedReception =
false;
666 lostSegmentBytes = 0;
667 for (
int i=0; i<MAX_DATA_BLOCKS; ++i) {
668 blockReceiveOffsets[i] = 0;
669 blockValidSize[i] = 0;
678 return &receiveBuffer[0];
682 if(receivedHeader.size() > 0) {
683 length =
static_cast<int>(receivedHeader.size());
684 return &receivedHeader[0];
690 bool DataBlockProtocol::processControlMessage(
int length) {
691 if(length <
static_cast<int>(
sizeof(
int) + 1)) {
695 int payloadLength = length -
sizeof(int) - 1;
696 switch(receiveBuffer[0 + payloadLength]) {
697 case CONFIRM_MESSAGE:
699 connectionConfirmed =
true;
701 heartbeatKnockCount = 0;
703 case CONNECTION_MESSAGE:
705 connectionConfirmed =
true;
706 confirmationMessagePending =
true;
707 clientConnectionPending =
true;
708 extendedConnectionStateProtocol =
false;
711 lastReceivedHeartbeat = std::chrono::steady_clock::now();
713 case HEADER_MESSAGE: {
714 if (anyPayloadReceived()) {
715 if (allBlocksDone()) {
716 LOG_DEBUG_DBP(
"No EOF message received!");
718 LOG_DEBUG_DBP(
"Received header too late/early!");
722 if(parseReceivedHeader(payloadLength, 0) == 0) {
729 if(anyPayloadReceived()) {
730 parseEofMessage(length);
733 case RESEND_MESSAGE: {
735 parseResendMessage(payloadLength);
738 case HEARTBEAT_MESSAGE:
740 auto now = std::chrono::steady_clock::now();
741 auto elapsedSinceLast = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastReceivedHeartbeat).count();
742 if (elapsedSinceLast < 200) {
743 heartbeatKnockCount++;
744 if (heartbeatKnockCount >= 3) {
745 if (!extendedConnectionStateProtocol) {
747 extendedConnectionStateProtocol =
true;
751 heartbeatRepliesQueued = 5;
756 heartbeatKnockCount = 0;
759 lastReceivedHeartbeat = now;
760 if (isServer && (!heartbeatKnockCount)) {
764 heartbeatRepliesQueued = 1;
768 case DISCONNECTION_MESSAGE:
771 connectionConfirmed =
false;
786 if(protType == PROTOCOL_TCP) {
789 }
else if(connectionConfirmed) {
790 auto now = std::chrono::steady_clock::now();
793 return std::chrono::duration_cast<std::chrono::milliseconds>(
794 now - lastReceivedHeartbeat).count()
795 < 2*HEARTBEAT_INTERVAL_MS;
798 return (std::chrono::duration_cast<std::chrono::milliseconds>(
799 now - lastReceivedAnything).count()
800 < 3*HEARTBEAT_INTERVAL_MS);
808 if(protType == PROTOCOL_TCP) {
813 if(confirmationMessagePending) {
815 confirmationMessagePending =
false;
816 controlMessageBuffer[0] = CONFIRM_MESSAGE;
818 }
else if(!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
819 std::chrono::steady_clock::now() - lastRemoteHostActivity).count() > RECONNECT_TIMEOUT_MS) {
821 controlMessageBuffer[0] = CONNECTION_MESSAGE;
825 lastRemoteHostActivity = lastSentHeartbeat = std::chrono::steady_clock::now();
826 }
else if(transferHeaderData !=
nullptr &&
isConnected()) {
828 length = transferHeaderSize;
829 const unsigned char* ret = transferHeaderData;
830 transferHeaderData =
nullptr;
832 }
else if(eofMessagePending) {
834 eofMessagePending =
false;
835 unsigned int networkOffset = htonl(mergeRawOffset(lastTransmittedBlock, transferSize[lastTransmittedBlock]));
836 memcpy(&controlMessageBuffer[0], &networkOffset,
sizeof(
int));
837 controlMessageBuffer[
sizeof(int)] = EOF_MESSAGE;
839 }
else if(resendMessagePending) {
841 resendMessagePending =
false;
842 if(!generateResendRequest(length)) {
846 }
else if(heartbeatRepliesQueued ||
847 (!isServer && std::chrono::duration_cast<std::chrono::milliseconds>(
848 std::chrono::steady_clock::now() - lastSentHeartbeat).count() > HEARTBEAT_INTERVAL_MS)) {
851 controlMessageBuffer[0] = HEARTBEAT_MESSAGE;
853 lastSentHeartbeat = std::chrono::steady_clock::now();
854 if (heartbeatRepliesQueued > 0) heartbeatRepliesQueued--;
860 controlMessageBuffer[length++] = 0xff;
861 controlMessageBuffer[length++] = 0xff;
862 controlMessageBuffer[length++] = 0xff;
863 controlMessageBuffer[length++] = 0xff;
864 return controlMessageBuffer;
868 if(clientConnectionPending) {
869 clientConnectionPending =
false;
876 bool DataBlockProtocol::generateResendRequest(
int& length) {
878 for (
int blk = 0; blk < numReceptionBlocks; ++blk) {
879 for(MissingReceiveSegment segment: missingReceiveSegments[blk]) {
880 unsigned int segOffset = htonl(
static_cast<unsigned int>(segment.offset));
881 unsigned int segLen = htonl(
static_cast<unsigned int>(segment.length));
883 if (
sizeof(controlMessageBuffer) < length + 2*
sizeof(
unsigned int) + 4) {
889 memcpy(&controlMessageBuffer[length], &segOffset,
sizeof(segOffset));
890 length +=
sizeof(
unsigned int);
891 memcpy(&controlMessageBuffer[length], &segLen,
sizeof(segLen));
892 length +=
sizeof(
unsigned int);
895 splitRawOffset(segment.offset, dbgBlk, dbgOfs);
896 LOG_DEBUG_DBP(
"Req missing " << dbgBlk <<
" " << dbgOfs <<
" " << segment.length);
900 if(length +
sizeof(
int) + 1 >
sizeof(controlMessageBuffer)) {
904 controlMessageBuffer[length++] = RESEND_MESSAGE;
909 void DataBlockProtocol::parseResendMessage(
int length) {
910 missingTransferSegments.clear();
912 int num = length / (
sizeof(
unsigned int) +
sizeof(
unsigned int));
913 int bufferOffset = 0;
915 for(
int i=0; i<num; i++) {
916 unsigned int segOffsetNet = *
reinterpret_cast<unsigned int*
>(&receiveBuffer[bufferOffset]);
917 bufferOffset +=
sizeof(
unsigned int);
918 unsigned int segLenNet = *
reinterpret_cast<unsigned int*
>(&receiveBuffer[bufferOffset]);
919 bufferOffset +=
sizeof(
unsigned int);
921 int segmentOffsetRaw =
static_cast<int>(ntohl(segOffsetNet));
922 int segmentLength =
static_cast<int>(ntohl(segLenNet));
923 int dataBlockID, segmentOffset;
924 splitRawOffset(segmentOffsetRaw, dataBlockID, segmentOffset);
926 if(segmentOffset >= 0 && segmentLength > 0 && (segmentOffset + segmentLength) <= rawValidBytes[dataBlockID]) {
927 missingTransferSegments.push_back(std::pair<int, int>(
928 segmentOffsetRaw, segmentLength));
934 void DataBlockProtocol::parseEofMessage(
int length) {
936 completedReceptions++;
937 lostSegmentRate = (lostSegmentRate * (completedReceptions-1) + ((
double) lostSegmentBytes) / totalReceiveSize) / completedReceptions;
938 LOG_DEBUG_DBP(
"Lost segment rate: " << lostSegmentRate);
941 for (
int i=0; i<numReceptionBlocks; ++i) {
942 if (blockReceiveOffsets[i] < blockReceiveSize[i]) {
943 MissingReceiveSegment missingSeg;
944 missingSeg.offset = mergeRawOffset(i, blockReceiveOffsets[i]);
945 missingSeg.length = blockReceiveSize[i] - blockReceiveOffsets[i];
946 missingSeg.isEof =
true;
947 missingReceiveSegments[i].push_back(missingSeg);
948 lostSegmentBytes += missingSeg.length;
951 for (
int blk=0; blk<numReceptionBlocks; ++blk) {
952 if(missingReceiveSegments[blk].size() > 0) {
953 waitingForMissingSegments =
true;
954 resendMessagePending =
true;
957 for (
int i=0; i<static_cast<int>(missingReceiveSegments[blk].size()); ++i) {
958 splitRawOffset(missingReceiveSegments[blk][i].offset, mblock, moffset);
959 if (moffset < blockReceiveOffsets[mblock]) {
960 blockReceiveOffsets[mblock] = moffset;
965 if (!resendMessagePending) {
966 finishedReception =
true;
969 LOG_DEBUG_DBP(
"EOF message too short, length " << length);
973 void DataBlockProtocol::resizeReceiveBuffer() {
974 if(totalReceiveSize < 0) {
981 + MAX_OUTSTANDING_BYTES +
sizeof(int);
984 if(
static_cast<int>(receiveBuffer.size()) < bufferSize) {
985 receiveBuffer.resize(bufferSize);
988 for (
int i=0; i<numReceptionBlocks; ++i) {
989 if (
static_cast<int>(blockReceiveBuffers[i].size()) < blockReceiveSize[i]) {
990 blockReceiveBuffers[i].resize(blockReceiveSize[i]);
996 void DataBlockProtocol::getDisconnectionMessage(
const unsigned char* &buf,
int &sz) {
1002 static const unsigned char DISCONNECTION_MESSAGE_BUFFER[] = { DISCONNECTION_MESSAGE, 0xff, 0xff, 0xff, 0xff };
1003 buf = DISCONNECTION_MESSAGE_BUFFER;
1004 sz =
sizeof(DISCONNECTION_MESSAGE_BUFFER);
1008 void DataBlockProtocol::getHeartbeatMessage(
const unsigned char* &buf,
int &sz) {
1013 static const unsigned char HEARTBEAT_MESSAGE_BUFFER[] = { HEARTBEAT_MESSAGE, 0xff, 0xff, 0xff, 0xff };
1014 buf = HEARTBEAT_MESSAGE_BUFFER;
1015 sz =
sizeof(HEARTBEAT_MESSAGE_BUFFER);