23 #include "visiontransfer/imagetransfer.h"
24 #include "visiontransfer/exceptions.h"
25 #include "visiontransfer/internal/datablockprotocol.h"
26 #include "visiontransfer/internal/networking.h"
29 using namespace visiontransfer;
30 using namespace visiontransfer::internal;
32 namespace visiontransfer {
36 class ImageTransfer::Pimpl {
39 bool server,
int bufferSize,
int maxUdpPacketSize,
int autoReconnectDelay);
43 void setRawTransferData(
const ImageSet& metaData,
const std::vector<unsigned char*>& rawData,
44 int firstTileWidth = 0,
int middleTileWidth = 0,
int lastTileWidth = 0);
45 void setRawValidBytes(
const std::vector<int>& validBytes);
46 void setTransferImageSet(
const ImageSet& imageSet);
47 TransferStatus transferData();
48 bool receiveImageSet(
ImageSet& imageSet);
49 bool receivePartialImageSet(
ImageSet& imageSet,
int& validRows,
bool& complete);
50 int getNumDroppedFrames()
const;
51 bool isConnected()
const;
53 std::string getRemoteAddress()
const;
55 void setConnectionStateChangeCallback(std::function<
void(visiontransfer::ConnectionState)> callback);
56 void establishConnection();
57 void setAutoReconnect(
int secondsBetweenRetries);
59 std::string statusReport();
69 std::recursive_mutex receiveMutex;
70 std::recursive_mutex sendMutex;
74 SOCKET tcpServerSocket;
75 sockaddr_in remoteAddress;
76 addrinfo* addressInfo;
78 int tcpReconnectSecondsBetweenRetries;
79 bool knownConnectedState;
83 std::unique_ptr<ImageProtocol> protocol;
88 const unsigned char* currentMsg;
91 std::function<void(visiontransfer::ConnectionState)> connectionStateChangeCallback;
94 void setSocketOptions();
97 void initTcpServer(
const addrinfo* addressInfo);
98 void initTcpClient(
const addrinfo* addressInfo);
99 void initUdp(
const addrinfo* addressInfo);
102 bool receiveNetworkData(
bool block);
105 bool sendNetworkMessage(
const unsigned char* msg,
int length, sockaddr_in* destAddrUdp=
nullptr);
106 void sendPendingControlMessages();
108 bool selectSocket(
bool read,
bool wait);
109 bool isTcpClientClosed(SOCKET sock);
114 ImageTransfer::ImageTransfer(
const char* address,
const char* service,
116 int autoReconnectDelay):
117 pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize,
118 autoReconnectDelay)) {
123 int autoReconnectDelay):
124 pimpl(new Pimpl(device.getIpAddress().c_str(),
"7681", static_cast<
ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
125 false, bufferSize, maxUdpPacketSize, autoReconnectDelay)) {
129 ImageTransfer::~ImageTransfer() {
134 int firstTileWidth,
int middleTileWidth,
int lastTileWidth) {
135 pimpl->setRawTransferData(metaData, rawData, firstTileWidth, middleTileWidth, lastTileWidth);
139 pimpl->setRawValidBytes(validBytes);
143 pimpl->setTransferImageSet(imageSet);
147 return pimpl->transferData();
151 return pimpl->receiveImageSet(imageSet);
155 return pimpl->receivePartialImageSet(imageSet, validRows, complete);
159 return pimpl->getNumDroppedFrames();
163 return pimpl->isConnected();
168 pimpl->setAutoReconnect(0);
173 return pimpl->getRemoteAddress();
177 return pimpl->tryAccept();
181 pimpl->setConnectionStateChangeCallback(callback);
184 void ImageTransfer::setAutoReconnect(
int secondsBetweenRetries) {
185 pimpl->setAutoReconnect(secondsBetweenRetries);
189 ImageTransfer::Pimpl::Pimpl(
const char* address,
const char* service,
191 bufferSize,
int maxUdpPacketSize,
int autoReconnectDelay)
192 : protType(protType), isServer(server), bufferSize(bufferSize),
193 maxUdpPacketSize(maxUdpPacketSize),
194 clientSocket(INVALID_SOCKET), tcpServerSocket(INVALID_SOCKET),
195 tcpReconnectSecondsBetweenRetries(autoReconnectDelay),
196 knownConnectedState(false), gotAnyData(false),
197 currentMsgLen(0), currentMsgOffset(0), currentMsg(nullptr) {
199 Networking::initNetworking();
202 signal(SIGPIPE, SIG_IGN);
205 memset(&remoteAddress, 0,
sizeof(remoteAddress));
208 if(address ==
nullptr ||
string(address) ==
"") {
212 addressInfo = Networking::resolveAddress(address, service);
213 establishConnection();
216 void ImageTransfer::Pimpl::establishConnection() {
220 initUdp(addressInfo);
222 initTcpServer(addressInfo);
224 initTcpClient(addressInfo);
230 knownConnectedState =
true;
231 if (connectionStateChangeCallback) {
232 std::thread([&](){connectionStateChangeCallback(visiontransfer::ConnectionState::CONNECTED);}).detach();
236 ImageTransfer::Pimpl::~Pimpl() {
239 if (isConnected()) disconnect();
241 if(clientSocket != INVALID_SOCKET) {
242 Networking::closeSocket(clientSocket);
244 if(tcpServerSocket != INVALID_SOCKET) {
245 Networking::closeSocket(tcpServerSocket);
247 if(addressInfo !=
nullptr) {
248 freeaddrinfo(addressInfo);
253 void ImageTransfer::Pimpl::initTcpClient(
const addrinfo* addressInfo) {
255 clientSocket = Networking::connectTcpSocket(addressInfo);
256 memcpy(&remoteAddress, addressInfo->ai_addr,
sizeof(remoteAddress));
262 void ImageTransfer::Pimpl::initTcpServer(
const addrinfo* addressInfo) {
266 tcpServerSocket = ::socket(addressInfo->ai_family, addressInfo->ai_socktype,
267 addressInfo->ai_protocol);
268 if (tcpServerSocket == INVALID_SOCKET) {
269 TransferException ex(
"Error opening socket: " + Networking::getLastErrorString());
274 Networking::enableReuseAddress(tcpServerSocket,
true);
277 Networking::bindSocket(tcpServerSocket, addressInfo);
278 clientSocket = INVALID_SOCKET;
281 Networking::setSocketBlocking(tcpServerSocket,
false);
284 listen(tcpServerSocket, 1);
287 void ImageTransfer::Pimpl::initUdp(
const addrinfo* addressInfo) {
290 clientSocket = socket(AF_INET, SOCK_DGRAM, 0);
291 if(clientSocket == INVALID_SOCKET) {
292 TransferException ex(
"Error creating receive socket: " + Networking::getLastErrorString());
297 Networking::enableReuseAddress(clientSocket,
true);
300 if(isServer && addressInfo !=
nullptr) {
301 Networking::bindSocket(clientSocket, addressInfo);
305 memcpy(&remoteAddress, addressInfo->ai_addr,
sizeof(remoteAddress));
312 bool ImageTransfer::Pimpl::tryAccept() {
318 sockaddr_in newRemoteAddress;
319 memset(&newRemoteAddress, 0,
sizeof(newRemoteAddress));
320 SOCKET newSocket = Networking::acceptConnection(tcpServerSocket, newRemoteAddress);
321 if(newSocket == INVALID_SOCKET) {
327 unique_lock<recursive_mutex> recvLock(receiveMutex);
328 unique_lock<recursive_mutex> sendLock(sendMutex);
330 if(clientSocket != INVALID_SOCKET) {
335 Networking::closeSocket(newSocket);
338 memcpy(&remoteAddress, &newRemoteAddress,
sizeof(remoteAddress));
339 clientSocket = newSocket;
345 protocol->resetTransfer();
346 protocol->resetReception();
347 currentMsg =
nullptr;
349 knownConnectedState =
true;
354 std::string ImageTransfer::Pimpl::getRemoteAddress()
const {
355 unique_lock<recursive_mutex> lock(
const_cast<recursive_mutex&
>(sendMutex));
357 if(remoteAddress.sin_family != AF_INET) {
362 snprintf(strPort,
sizeof(strPort),
":%d", remoteAddress.sin_port);
364 return string(inet_ntoa(remoteAddress.sin_addr)) + strPort;
367 void ImageTransfer::Pimpl::setSocketOptions() {
370 setsockopt(clientSocket, SOL_SOCKET, SO_RCVBUF,
reinterpret_cast<char*
>(&bufferSize),
sizeof(bufferSize));
371 setsockopt(clientSocket, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char*
>(&bufferSize),
sizeof(bufferSize));
374 Networking::setSocketTimeout(clientSocket, 500);
375 Networking::setSocketBlocking(clientSocket,
true);
378 void ImageTransfer::Pimpl::setRawTransferData(
const ImageSet& metaData,
379 const std::vector<unsigned char*>& rawDataVec,
int firstTileWidth,
int middleTileWidth,
int lastTileWidth) {
380 unique_lock<recursive_mutex> sendLock(sendMutex);
381 protocol->setRawTransferData(metaData, rawDataVec, firstTileWidth, middleTileWidth, lastTileWidth);
382 currentMsg =
nullptr;
385 void ImageTransfer::Pimpl::setRawValidBytes(
const std::vector<int>& validBytes) {
386 unique_lock<recursive_mutex> sendLock(sendMutex);
387 protocol->setRawValidBytes(validBytes);
390 void ImageTransfer::Pimpl::setTransferImageSet(
const ImageSet& imageSet) {
391 unique_lock<recursive_mutex> sendLock(sendMutex);
392 protocol->setTransferImageSet(imageSet);
393 currentMsg =
nullptr;
397 unique_lock<recursive_mutex> lock(sendMutex);
402 receiveNetworkData(
false);
403 }
else if (isServer) {
406 bool disconnected = isTcpClientClosed(clientSocket);
417 return NOT_CONNECTED;
424 setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (
char *) &flag,
sizeof(
int));
429 if(currentMsg ==
nullptr) {
430 currentMsgOffset = 0;
431 currentMsg = protocol->getTransferMessage(currentMsgLen);
433 if(currentMsg ==
nullptr) {
434 if(protocol->transferComplete()) {
435 return ALL_TRANSFERRED;
437 return NO_VALID_DATA;
443 bool wouldBlock =
false;
444 bool dataTransferred = (currentMsg !=
nullptr);
445 while(currentMsg !=
nullptr) {
446 int writing = (int)(currentMsgLen - currentMsgOffset);
448 if(sendNetworkMessage(¤tMsg[currentMsgOffset], writing)) {
450 currentMsgOffset = 0;
451 currentMsg = protocol->getTransferMessage(currentMsgLen);
463 setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (
char *) &flag,
sizeof(
int));
467 setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
469 setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (
char *) &flag,
sizeof(
int));
475 receiveNetworkData(
false);
478 if(protocol->transferComplete()) {
479 return ALL_TRANSFERRED;
480 }
else if(wouldBlock) {
483 return PARTIAL_TRANSFER;
487 bool ImageTransfer::Pimpl::receiveImageSet(
ImageSet& imageSet) {
489 bool complete =
false;
491 std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
493 if(!receivePartialImageSet(imageSet, validRows, complete)) {
497 unsigned int time =
static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::milliseconds>(
498 std::chrono::steady_clock::now() - startTime).count());
499 if(time > 100 && !complete) {
507 bool ImageTransfer::Pimpl::receivePartialImageSet(
ImageSet& imageSet,
508 int& validRows,
bool& complete) {
509 unique_lock<recursive_mutex> lock(receiveMutex);
513 while(!protocol->imagesReceived() && receiveNetworkData(block)) {
518 return protocol->getPartiallyReceivedImageSet(imageSet, validRows, complete);
521 bool ImageTransfer::Pimpl::receiveNetworkData(
bool block) {
522 unique_lock<recursive_mutex> lock = block ?
523 unique_lock<recursive_mutex>(receiveMutex) : unique_lock<recursive_mutex>(receiveMutex, std::try_to_lock);
525 if(clientSocket == INVALID_SOCKET) {
532 bool newConnectedState = protocol->isConnected();
533 if (newConnectedState != knownConnectedState) {
534 knownConnectedState = newConnectedState;
535 if (connectionStateChangeCallback) {
536 std::thread([&, newConnectedState](){ connectionStateChangeCallback(newConnectedState ? visiontransfer::ConnectionState::CONNECTED : visiontransfer::ConnectionState::DISCONNECTED); }).detach();
538 if (!newConnectedState) {
546 sendPendingControlMessages();
548 if(!lock.owns_lock()) {
554 if(!block && !selectSocket(
true,
false)) {
559 char* buffer =
reinterpret_cast<char*
>(protocol->getNextReceiveBuffer(maxLength));
562 sockaddr_in fromAddress;
563 socklen_t fromSize =
sizeof(fromAddress);
565 int bytesReceived = recvfrom(clientSocket, buffer, maxLength,
566 0,
reinterpret_cast<sockaddr*
>(&fromAddress), &fromSize);
568 auto err = Networking::getErrno();
572 if ((!isServer) && (!gotAnyData)) {
577 }
else if(bytesReceived < 0 && err != EWOULDBLOCK && err != EINTR &&
578 err != ETIMEDOUT && err != WSA_IO_PENDING && err != WSAECONNRESET) {
579 TransferException ex(
"Error reading from socket: " + Networking::getErrorString(err));
581 }
else if(bytesReceived > 0) {
585 ((fromAddress.sin_addr.s_addr!=remoteAddress.sin_addr.s_addr) || (fromAddress.sin_port!=remoteAddress.sin_port)) &&
586 (remoteAddress.sin_port != 0)
588 if (isServer && newSender) {
590 if (protocol->isConnected()) {
594 const unsigned char* disconnectionMsg;
595 int disconnectionMsgLen;
596 DataBlockProtocol::getDisconnectionMessage(disconnectionMsg, disconnectionMsgLen);
597 if (disconnectionMsgLen > 0) {
598 sendNetworkMessage(disconnectionMsg, disconnectionMsgLen, &fromAddress);
603 protocol->processReceivedMessage(bytesReceived);
604 if(protocol->newClientConnected()) {
606 memcpy(&remoteAddress, &fromAddress,
sizeof(remoteAddress));
611 const unsigned char* heartbeatMsg;
613 DataBlockProtocol::getHeartbeatMessage(heartbeatMsg, heartbeatMsgLen);
614 if (heartbeatMsgLen > 0) {
616 for (
int i=0; i<5; ++i) {
618 sendNetworkMessage(heartbeatMsg, heartbeatMsgLen, &fromAddress);
625 if (!protocol->isConnected() && (remoteAddress.sin_port != 0)) {
628 memset(&remoteAddress, 0,
sizeof(remoteAddress));
633 return bytesReceived > 0;
636 void ImageTransfer::Pimpl::disconnect() {
638 unique_lock<recursive_mutex> recvLock(receiveMutex);
639 unique_lock<recursive_mutex> sendLock(sendMutex);
641 if(clientSocket != INVALID_SOCKET) {
643 if (protocol->supportsExtendedConnectionStateProtocol()) {
647 const unsigned char* disconnectionMsg;
648 int disconnectionMsgLen;
649 DataBlockProtocol::getDisconnectionMessage(disconnectionMsg, disconnectionMsgLen);
650 if (disconnectionMsgLen > 0) {
651 sendNetworkMessage(disconnectionMsg, disconnectionMsgLen, &remoteAddress);
660 knownConnectedState =
false;
661 if (connectionStateChangeCallback) connectionStateChangeCallback(visiontransfer::ConnectionState::DISCONNECTED);
664 Networking::closeSocket(clientSocket);
665 memset(&remoteAddress, 0,
sizeof(remoteAddress));
668 if ((!isServer) && (tcpReconnectSecondsBetweenRetries > 0)) {
671 establishConnection();
677 std::this_thread::sleep_for(std::chrono::seconds(tcpReconnectSecondsBetweenRetries));
682 memset(&remoteAddress, 0,
sizeof(remoteAddress));
685 bool ImageTransfer::Pimpl::isConnected()
const {
686 unique_lock<recursive_mutex> lock(
const_cast<recursive_mutex&
>(sendMutex));
692 return knownConnectedState;
695 bool ImageTransfer::Pimpl::sendNetworkMessage(
const unsigned char* msg,
int length, sockaddr_in* destAddrUdp) {
698 sockaddr_in* destAddr;
701 unique_lock<recursive_mutex> lock(sendMutex);
704 destAddr = destAddrUdp;
707 destAddr = &remoteAddress;
709 destSocket = clientSocket;
712 if(destAddr->sin_family != AF_INET) {
716 written = sendto(destSocket,
reinterpret_cast<const char*
>(msg), length, 0,
717 reinterpret_cast<sockaddr*
>(destAddr),
sizeof(*destAddr));
721 unique_lock<recursive_mutex> lock(sendMutex);
722 destSocket = clientSocket;
724 written = send(destSocket,
reinterpret_cast<const char*
>(msg), length, 0);
727 auto sendError = Networking::getErrno();
730 if(sendError == EAGAIN || sendError == EWOULDBLOCK || sendError == ETIMEDOUT) {
733 }
else if(sendError == EPIPE) {
738 TransferException ex(
"Error sending network packet: " + Networking::getErrorString(sendError));
741 }
else if(written != length) {
747 currentMsgOffset += written;
755 void ImageTransfer::Pimpl::sendPendingControlMessages() {
756 const unsigned char* controlMsgData =
nullptr;
757 int controlMsgLen = 0;
760 unique_lock<recursive_mutex> lock(sendMutex);
761 if(remoteAddress.sin_family != AF_INET) {
765 controlMsgData = protocol->getNextControlMessage(controlMsgLen);
767 if(controlMsgData !=
nullptr) {
768 sendNetworkMessage(controlMsgData, controlMsgLen);
775 int ImageTransfer::Pimpl::getNumDroppedFrames()
const {
776 return protocol->getNumDroppedFrames();
779 bool ImageTransfer::Pimpl::isTcpClientClosed(SOCKET sock) {
781 auto ret = recv(sock, &x, 1, MSG_DONTWAIT | MSG_PEEK);
785 bool ImageTransfer::Pimpl::selectSocket(
bool read,
bool wait) {
788 unique_lock<recursive_mutex> lock(sendMutex);
803 if(select(((
int)sock)+1, (read ? &fds :
nullptr), (!read ? &fds :
nullptr),
nullptr, &tv) <= 0) {
809 constexpr
int timeoutMillisec = 100;
813 if (poll(&pfd, 1, wait ? timeoutMillisec: 0) <= 0) {
822 std::string ImageTransfer::statusReport() {
823 return pimpl->statusReport();
825 std::string ImageTransfer::Pimpl::statusReport() {
826 return protocol->statusReport();
829 void ImageTransfer::Pimpl::setConnectionStateChangeCallback(std::function<
void(visiontransfer::ConnectionState)> callback) {
830 connectionStateChangeCallback = callback;
833 void ImageTransfer::Pimpl::setAutoReconnect(
int secondsBetweenRetries) {
834 tcpReconnectSecondsBetweenRetries = secondsBetweenRetries;