libvisiontransfer  10.8.0
imagetransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2024 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <cstdio>
16 #include <iostream>
17 #include <cstring>
18 #include <memory>
19 #include <string>
20 #include <vector>
21 #include <mutex>
22 #include <thread>
23 #include "visiontransfer/imagetransfer.h"
24 #include "visiontransfer/exceptions.h"
25 #include "visiontransfer/internal/datablockprotocol.h"
26 #include "visiontransfer/internal/networking.h"
27 
28 using namespace std;
29 using namespace visiontransfer;
30 using namespace visiontransfer::internal;
31 
32 namespace visiontransfer {
33 
34 /*************** Pimpl class containing all private members ***********/
35 
36 class ImageTransfer::Pimpl {
37 public:
38  Pimpl(const char* address, const char* service, ImageProtocol::ProtocolType protType,
39  bool server, int bufferSize, int maxUdpPacketSize, int autoReconnectDelay);
40  ~Pimpl();
41 
42  // Redeclaration of public members
43  void setRawTransferData(const ImageSet& metaData, const std::vector<unsigned char*>& rawData,
44  int firstTileWidth = 0, int middleTileWidth = 0, int lastTileWidth = 0);
45  void setRawValidBytes(const std::vector<int>& validBytes);
46  void setTransferImageSet(const ImageSet& imageSet);
47  TransferStatus transferData();
48  bool receiveImageSet(ImageSet& imageSet);
49  bool receivePartialImageSet(ImageSet& imageSet, int& validRows, bool& complete);
50  int getNumDroppedFrames() const;
51  bool isConnected() const;
52  void disconnect();
53  std::string getRemoteAddress() const;
54  bool tryAccept();
55  void setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback);
56  void establishConnection();
57  void setAutoReconnect(int secondsBetweenRetries);
58 
59  std::string statusReport();
60 
61 private:
62  // Configuration parameters
64  bool isServer;
65  int bufferSize;
66  int maxUdpPacketSize;
67 
68  // Thread synchronization
69  std::recursive_mutex receiveMutex;
70  std::recursive_mutex sendMutex;
71 
72  // Transfer related members
73  SOCKET clientSocket;
74  SOCKET tcpServerSocket;
75  sockaddr_in remoteAddress;
76  addrinfo* addressInfo;
77 
78  int tcpReconnectSecondsBetweenRetries;
79  bool knownConnectedState; // see Pimpl::isConnected() for info
80  bool gotAnyData; // to disambiguate 'connection refused'
81 
82  // Object for encoding and decoding the network protocol
83  std::unique_ptr<ImageProtocol> protocol;
84 
85  // Outstanding network message that still has to be transferred
86  int currentMsgLen;
87  int currentMsgOffset;
88  const unsigned char* currentMsg;
89 
90  // User callback for connection state changes
91  std::function<void(visiontransfer::ConnectionState)> connectionStateChangeCallback;
92 
93  // Socket configuration
94  void setSocketOptions();
95 
96  // Network socket initialization
97  void initTcpServer(const addrinfo* addressInfo);
98  void initTcpClient(const addrinfo* addressInfo);
99  void initUdp(const addrinfo* addressInfo);
100 
101  // Data reception
102  bool receiveNetworkData(bool block);
103 
104  // Data transmission
105  bool sendNetworkMessage(const unsigned char* msg, int length, sockaddr_in* destAddrUdp=nullptr);
106  void sendPendingControlMessages();
107 
108  bool selectSocket(bool read, bool wait);
109  bool isTcpClientClosed(SOCKET sock);
110 };
111 
112 /******************** Stubs for all public members ********************/
113 
114 ImageTransfer::ImageTransfer(const char* address, const char* service,
115  ImageProtocol::ProtocolType protType, bool server, int bufferSize, int maxUdpPacketSize,
116  int autoReconnectDelay):
117  pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize,
118  autoReconnectDelay)) {
119  // All initialization in the pimpl class
120 }
121 
122 ImageTransfer::ImageTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize,
123  int autoReconnectDelay):
124  pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
125  false, bufferSize, maxUdpPacketSize, autoReconnectDelay)) {
126  // All initialization in the pimpl class
127 }
128 
129 ImageTransfer::~ImageTransfer() {
130  delete pimpl;
131 }
132 
133 void ImageTransfer::setRawTransferData(const ImageSet& metaData, const std::vector<unsigned char*>& rawData,
134  int firstTileWidth, int middleTileWidth, int lastTileWidth) {
135  pimpl->setRawTransferData(metaData, rawData, firstTileWidth, middleTileWidth, lastTileWidth);
136 }
137 
138 void ImageTransfer::setRawValidBytes(const std::vector<int>& validBytes) {
139  pimpl->setRawValidBytes(validBytes);
140 }
141 
143  pimpl->setTransferImageSet(imageSet);
144 }
145 
147  return pimpl->transferData();
148 }
149 
151  return pimpl->receiveImageSet(imageSet);
152 }
153 
154 bool ImageTransfer::receivePartialImageSet(ImageSet& imageSet, int& validRows, bool& complete) {
155  return pimpl->receivePartialImageSet(imageSet, validRows, complete);
156 }
157 
159  return pimpl->getNumDroppedFrames();
160 }
161 
163  return pimpl->isConnected();
164 }
165 
167  // User-requested disconnect: disable reconnection first
168  pimpl->setAutoReconnect(0);
169  pimpl->disconnect();
170 }
171 
172 std::string ImageTransfer::getRemoteAddress() const {
173  return pimpl->getRemoteAddress();
174 }
175 
177  return pimpl->tryAccept();
178 }
179 
180 void ImageTransfer::setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback) {
181  pimpl->setConnectionStateChangeCallback(callback);
182 }
183 
184 void ImageTransfer::setAutoReconnect(int secondsBetweenRetries) {
185  pimpl->setAutoReconnect(secondsBetweenRetries);
186 }
187 
188 /******************** Implementation in pimpl class *******************/
189 ImageTransfer::Pimpl::Pimpl(const char* address, const char* service,
190  ImageProtocol::ProtocolType protType, bool server, int
191  bufferSize, int maxUdpPacketSize, int autoReconnectDelay)
192  : protType(protType), isServer(server), bufferSize(bufferSize),
193  maxUdpPacketSize(maxUdpPacketSize),
194  clientSocket(INVALID_SOCKET), tcpServerSocket(INVALID_SOCKET),
195  tcpReconnectSecondsBetweenRetries(autoReconnectDelay),
196  knownConnectedState(false), gotAnyData(false),
197  currentMsgLen(0), currentMsgOffset(0), currentMsg(nullptr) {
198 
199  Networking::initNetworking();
200 #ifndef _WIN32
201  // We don't want to be interrupted by the pipe signal
202  signal(SIGPIPE, SIG_IGN);
203 #endif
204 
205  memset(&remoteAddress, 0, sizeof(remoteAddress));
206 
207  // If address is null we use the any address
208  if(address == nullptr || string(address) == "") {
209  address = "0.0.0.0";
210  }
211 
212  addressInfo = Networking::resolveAddress(address, service);
213  establishConnection();
214 }
215 
216 void ImageTransfer::Pimpl::establishConnection() {
217 
218  try {
219  if(protType == ImageProtocol::PROTOCOL_UDP) {
220  initUdp(addressInfo);
221  } else if(protType == ImageProtocol::PROTOCOL_TCP && isServer) {
222  initTcpServer(addressInfo);
223  } else {
224  initTcpClient(addressInfo);
225  }
226  } catch(...) {
227  throw;
228  }
229 
230  knownConnectedState = true;
231  if (connectionStateChangeCallback) {
232  std::thread([&](){connectionStateChangeCallback(visiontransfer::ConnectionState::CONNECTED);}).detach();
233  }
234 }
235 
236 ImageTransfer::Pimpl::~Pimpl() {
237 
238  setAutoReconnect(0);
239  if (isConnected()) disconnect();
240 
241  if(clientSocket != INVALID_SOCKET) {
242  Networking::closeSocket(clientSocket);
243  }
244  if(tcpServerSocket != INVALID_SOCKET) {
245  Networking::closeSocket(tcpServerSocket);
246  }
247  if(addressInfo != nullptr) {
248  freeaddrinfo(addressInfo);
249  }
250 
251 }
252 
253 void ImageTransfer::Pimpl::initTcpClient(const addrinfo* addressInfo) {
254  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_TCP));
255  clientSocket = Networking::connectTcpSocket(addressInfo);
256  memcpy(&remoteAddress, addressInfo->ai_addr, sizeof(remoteAddress));
257 
258  // Set special socket options
259  setSocketOptions();
260 }
261 
262 void ImageTransfer::Pimpl::initTcpServer(const addrinfo* addressInfo) {
263  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_TCP));
264 
265  // Create socket
266  tcpServerSocket = ::socket(addressInfo->ai_family, addressInfo->ai_socktype,
267  addressInfo->ai_protocol);
268  if (tcpServerSocket == INVALID_SOCKET) {
269  TransferException ex("Error opening socket: " + Networking::getLastErrorString());
270  throw ex;
271  }
272 
273  // Enable reuse address
274  Networking::enableReuseAddress(tcpServerSocket, true);
275 
276  // Open a server port
277  Networking::bindSocket(tcpServerSocket, addressInfo);
278  clientSocket = INVALID_SOCKET;
279 
280  // Make the server socket non-blocking
281  Networking::setSocketBlocking(tcpServerSocket, false);
282 
283  // Listen on port
284  listen(tcpServerSocket, 1);
285 }
286 
287 void ImageTransfer::Pimpl::initUdp(const addrinfo* addressInfo) {
288  protocol.reset(new ImageProtocol(isServer, ImageProtocol::PROTOCOL_UDP, maxUdpPacketSize));
289  // Create sockets
290  clientSocket = socket(AF_INET, SOCK_DGRAM, 0);
291  if(clientSocket == INVALID_SOCKET) {
292  TransferException ex("Error creating receive socket: " + Networking::getLastErrorString());
293  throw ex;
294  }
295 
296  // Enable reuse address
297  Networking::enableReuseAddress(clientSocket, true);
298 
299  // Bind socket to port
300  if(isServer && addressInfo != nullptr) {
301  Networking::bindSocket(clientSocket, addressInfo);
302  }
303 
304  if(!isServer) {
305  memcpy(&remoteAddress, addressInfo->ai_addr, sizeof(remoteAddress));
306  }
307 
308  // Set special socket options
309  setSocketOptions();
310 }
311 
312 bool ImageTransfer::Pimpl::tryAccept() {
313  if(protType != ImageProtocol::PROTOCOL_TCP || ! isServer) {
314  throw TransferException("Connections can only be accepted in tcp server mode");
315  }
316 
317  // Accept one connection
318  sockaddr_in newRemoteAddress;
319  memset(&newRemoteAddress, 0, sizeof(newRemoteAddress));
320  SOCKET newSocket = Networking::acceptConnection(tcpServerSocket, newRemoteAddress);
321  if(newSocket == INVALID_SOCKET) {
322  // No connection
323  return false;
324  }
325 
326  // For a new connection we require locks
327  unique_lock<recursive_mutex> recvLock(receiveMutex);
328  unique_lock<recursive_mutex> sendLock(sendMutex);
329 
330  if(clientSocket != INVALID_SOCKET) {
331  // More robust TCP behavior: reject new connection.
332  // (We had to accept first so we can close now.)
333  // Remote client will detect that we closed immediately without sending data.
334  //std::cerr << "DEBUG- Rejecting new TCP connection, we are busy already" << std::endl;
335  Networking::closeSocket(newSocket);
336  return false;
337  }
338  memcpy(&remoteAddress, &newRemoteAddress, sizeof(remoteAddress));
339  clientSocket = newSocket;
340 
341  // Set special socket options
342  setSocketOptions();
343 
344  // Reset connection data
345  protocol->resetTransfer();
346  protocol->resetReception();
347  currentMsg = nullptr;
348 
349  knownConnectedState = true;
350 
351  return true;
352 }
353 
354 std::string ImageTransfer::Pimpl::getRemoteAddress() const {
355  unique_lock<recursive_mutex> lock(const_cast<recursive_mutex&>(sendMutex)); // either mutex will work
356 
357  if(remoteAddress.sin_family != AF_INET) {
358  return "";
359  }
360 
361  char strPort[11];
362  snprintf(strPort, sizeof(strPort), ":%d", remoteAddress.sin_port);
363 
364  return string(inet_ntoa(remoteAddress.sin_addr)) + strPort;
365 }
366 
367 void ImageTransfer::Pimpl::setSocketOptions() {
368  // Set the socket buffer sizes
369  if(bufferSize > 0) {
370  setsockopt(clientSocket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&bufferSize), sizeof(bufferSize));
371  setsockopt(clientSocket, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&bufferSize), sizeof(bufferSize));
372  }
373 
374  Networking::setSocketTimeout(clientSocket, 500);
375  Networking::setSocketBlocking(clientSocket, true);
376 }
377 
378 void ImageTransfer::Pimpl::setRawTransferData(const ImageSet& metaData,
379  const std::vector<unsigned char*>& rawDataVec, int firstTileWidth, int middleTileWidth, int lastTileWidth) {
380  unique_lock<recursive_mutex> sendLock(sendMutex);
381  protocol->setRawTransferData(metaData, rawDataVec, firstTileWidth, middleTileWidth, lastTileWidth);
382  currentMsg = nullptr;
383 }
384 
385 void ImageTransfer::Pimpl::setRawValidBytes(const std::vector<int>& validBytes) {
386  unique_lock<recursive_mutex> sendLock(sendMutex);
387  protocol->setRawValidBytes(validBytes);
388 }
389 
390 void ImageTransfer::Pimpl::setTransferImageSet(const ImageSet& imageSet) {
391  unique_lock<recursive_mutex> sendLock(sendMutex);
392  protocol->setTransferImageSet(imageSet);
393  currentMsg = nullptr;
394 }
395 
396 ImageTransfer::TransferStatus ImageTransfer::Pimpl::transferData() {
397  unique_lock<recursive_mutex> lock(sendMutex);
398 
399  // First receive data in case a control message arrives
400  if(protType == ImageProtocol::PROTOCOL_UDP) {
401  // This also handles the UDP 'disconnection' tracking
402  receiveNetworkData(false);
403  } else if (isServer) {
404  if (isConnected()) {
405  // Test if TCP pipe closed remotely (even when we have nothing to send)
406  bool disconnected = isTcpClientClosed(clientSocket);
407  if (disconnected) {
408  // The connection has been closed
409  disconnect();
410  }
411  }
412  }
413 
414  if(!isConnected()) {
415  // Cannot send while (temporarily) disconnected
416  // Note: TCP server mode is currently not auto-reconnecting
417  return NOT_CONNECTED;
418  }
419 
420 #ifndef _WIN32
421  // Cork TCP to prevent sending of small packets
422  if(protType == ImageProtocol::PROTOCOL_TCP) {
423  int flag = 1;
424  setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (char *) &flag, sizeof(int));
425  }
426 #endif
427 
428  // Get first message to transfer
429  if(currentMsg == nullptr) {
430  currentMsgOffset = 0;
431  currentMsg = protocol->getTransferMessage(currentMsgLen);
432 
433  if(currentMsg == nullptr) {
434  if(protocol->transferComplete()) {
435  return ALL_TRANSFERRED;
436  } else {
437  return NO_VALID_DATA;
438  }
439  }
440  }
441 
442  // Try transferring messages
443  bool wouldBlock = false;
444  bool dataTransferred = (currentMsg != nullptr);
445  while(currentMsg != nullptr) {
446  int writing = (int)(currentMsgLen - currentMsgOffset);
447 
448  if(sendNetworkMessage(&currentMsg[currentMsgOffset], writing)) {
449  // Get next message
450  currentMsgOffset = 0;
451  currentMsg = protocol->getTransferMessage(currentMsgLen);
452  } else {
453  // The operation would block
454  wouldBlock = true;
455  break;
456  }
457  }
458 
459  if(dataTransferred && protType == ImageProtocol::PROTOCOL_TCP && protocol->transferComplete()) {
460 #ifndef _WIN32
461  // Uncork - sends the assembled messages
462  int flag = 0;
463  setsockopt(clientSocket, IPPROTO_TCP, TCP_CORK, (char *) &flag, sizeof(int));
464 #else
465  // Force a flush for TCP by turning the nagle algorithm off and on
466  int flag = 1;
467  setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
468  flag = 0;
469  setsockopt(clientSocket, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
470 #endif
471  }
472 
473  // Also check for control messages at the end
474  if(protType == ImageProtocol::PROTOCOL_UDP) {
475  receiveNetworkData(false);
476  }
477 
478  if(protocol->transferComplete()) {
479  return ALL_TRANSFERRED;
480  } else if(wouldBlock) {
481  return WOULD_BLOCK;
482  } else {
483  return PARTIAL_TRANSFER;
484  }
485 }
486 
487 bool ImageTransfer::Pimpl::receiveImageSet(ImageSet& imageSet) {
488  int validRows = 0;
489  bool complete = false;
490 
491  std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
492  while(!complete) {
493  if(!receivePartialImageSet(imageSet, validRows, complete)) {
494  return false;
495  }
496 
497  unsigned int time = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::milliseconds>(
498  std::chrono::steady_clock::now() - startTime).count());
499  if(time > 100 && !complete) {
500  return false;
501  }
502  }
503 
504  return true;
505 }
506 
507 bool ImageTransfer::Pimpl::receivePartialImageSet(ImageSet& imageSet,
508  int& validRows, bool& complete) {
509  unique_lock<recursive_mutex> lock(receiveMutex);
510 
511  // Try to receive further image data if needed
512  bool block = true;
513  while(!protocol->imagesReceived() && receiveNetworkData(block)) {
514  block = false;
515  }
516 
517  // Get received image
518  return protocol->getPartiallyReceivedImageSet(imageSet, validRows, complete);
519 }
520 
521 bool ImageTransfer::Pimpl::receiveNetworkData(bool block) {
522  unique_lock<recursive_mutex> lock = block ?
523  unique_lock<recursive_mutex>(receiveMutex) : unique_lock<recursive_mutex>(receiveMutex, std::try_to_lock);
524 
525  if(clientSocket == INVALID_SOCKET) {
526  return false; // Not connected
527  }
528 
529  if (protType == ImageProtocol::PROTOCOL_UDP) {
530  // UDP-only: Track and signal connection state by checking protocol
531  // (TCP uses socket-level disconnect/reconnect events instead)
532  bool newConnectedState = protocol->isConnected();
533  if (newConnectedState != knownConnectedState) {
534  knownConnectedState = newConnectedState;
535  if (connectionStateChangeCallback) {
536  std::thread([&, newConnectedState](){ connectionStateChangeCallback(newConnectedState ? visiontransfer::ConnectionState::CONNECTED : visiontransfer::ConnectionState::DISCONNECTED); }).detach();
537  }
538  if (!newConnectedState) {
539  // Newly disconnected, abort
540  return false;
541  }
542  }
543  }
544 
545  // First send control messages if necessary
546  sendPendingControlMessages();
547 
548  if(!lock.owns_lock()) {
549  // Waiting for the lock would block this call
550  return false;
551  }
552 
553  // Test if the socket has data available
554  if(!block && !selectSocket(true, false)) {
555  return false;
556  }
557 
558  int maxLength = 0;
559  char* buffer = reinterpret_cast<char*>(protocol->getNextReceiveBuffer(maxLength));
560 
561  // Receive data
562  sockaddr_in fromAddress;
563  socklen_t fromSize = sizeof(fromAddress);
564 
565  int bytesReceived = recvfrom(clientSocket, buffer, maxLength,
566  0, reinterpret_cast<sockaddr*>(&fromAddress), &fromSize);
567 
568  auto err = Networking::getErrno();
569  if(bytesReceived == 0 || (protType == ImageProtocol::PROTOCOL_TCP && bytesReceived < 0 && err == WSAECONNRESET)) {
570  // Connection closed
571  disconnect();
572  if ((!isServer) && (!gotAnyData)) {
573  // TCP client connection was refused by the device because it had another connected client
574  setAutoReconnect(0);
575  throw ConnectionClosedException("Device is already connected to another client");
576  }
577  } else if(bytesReceived < 0 && err != EWOULDBLOCK && err != EINTR &&
578  err != ETIMEDOUT && err != WSA_IO_PENDING && err != WSAECONNRESET) {
579  TransferException ex("Error reading from socket: " + Networking::getErrorString(err));
580  throw ex;
581  } else if(bytesReceived > 0) {
582  // Check whether this reception is from an unexpected new sender (for UDP server)
583  bool newSender = (
584  protType == ImageProtocol::PROTOCOL_UDP &&
585  ((fromAddress.sin_addr.s_addr!=remoteAddress.sin_addr.s_addr) || (fromAddress.sin_port!=remoteAddress.sin_port)) &&
586  (remoteAddress.sin_port != 0)
587  );
588  if (isServer && newSender) {
589  //std::cout << "New connection" << std::endl;
590  if (protocol->isConnected()) {
591  // Reject interfering client
592  // Note: this has no bearing on the receive buffer obtained above; we will overwrite in place
593  //std::cerr << "DEBUG- Rejecting interfering UDP client" << std::endl;
594  const unsigned char* disconnectionMsg;
595  int disconnectionMsgLen;
596  DataBlockProtocol::getDisconnectionMessage(disconnectionMsg, disconnectionMsgLen);
597  if (disconnectionMsgLen > 0) {
598  sendNetworkMessage(disconnectionMsg, disconnectionMsgLen, &fromAddress);
599  }
600  }
601  } else {
602  gotAnyData = true;
603  protocol->processReceivedMessage(bytesReceived);
604  if(protocol->newClientConnected()) {
605  // We have just established a new connection
606  memcpy(&remoteAddress, &fromAddress, sizeof(remoteAddress));
607 
608  if (isServer && (protType == ImageProtocol::PROTOCOL_UDP)) {
609  // Welcome client with the knock sequence. Older clients just ignore this,
610  // new clients know they can expect, and may also use, the extended protocol.
611  const unsigned char* heartbeatMsg;
612  int heartbeatMsgLen;
613  DataBlockProtocol::getHeartbeatMessage(heartbeatMsg, heartbeatMsgLen);
614  if (heartbeatMsgLen > 0) {
615  //std::cout << "Sending five knocks" << std::endl;
616  for (int i=0; i<5; ++i) {
617  // Send 5 UDP knocks for good measure, the client looks for at least 3 within 0.5 s
618  sendNetworkMessage(heartbeatMsg, heartbeatMsgLen, &fromAddress);
619  }
620  }
621  }
622  }
623  }
624  if (isServer && protType == ImageProtocol::PROTOCOL_UDP) {
625  if (!protocol->isConnected() && (remoteAddress.sin_port != 0)) {
626  //std::cout << "Invalidating remote address" << std::endl;
627  // Existing UDP client has disconnected, invalidate the remote address
628  memset(&remoteAddress, 0, sizeof(remoteAddress));
629  }
630  }
631  }
632 
633  return bytesReceived > 0;
634 }
635 
636 void ImageTransfer::Pimpl::disconnect() {
637  // disconnect
638  unique_lock<recursive_mutex> recvLock(receiveMutex);
639  unique_lock<recursive_mutex> sendLock(sendMutex);
640 
641  if(clientSocket != INVALID_SOCKET) {
642  if ((!isServer) && isConnected() && protType == ImageProtocol::PROTOCOL_UDP) {
643  if (protocol->supportsExtendedConnectionStateProtocol()) {
644  // Send a final client-side disconnection request instead of
645  // needing to wait for UDP heartbeat timeout on the device
646  try {
647  const unsigned char* disconnectionMsg;
648  int disconnectionMsgLen;
649  DataBlockProtocol::getDisconnectionMessage(disconnectionMsg, disconnectionMsgLen);
650  if (disconnectionMsgLen > 0) {
651  sendNetworkMessage(disconnectionMsg, disconnectionMsgLen, &remoteAddress);
652  }
653  } catch(...) {
654  // Server will see a disconnection (through heartbeat timeout) anyway
655  }
656  }
657  }
658  }
659 
660  knownConnectedState = false;
661  if (connectionStateChangeCallback) connectionStateChangeCallback(visiontransfer::ConnectionState::DISCONNECTED);
662 
663  if(clientSocket != INVALID_SOCKET && protType == ImageProtocol::PROTOCOL_TCP) {
664  Networking::closeSocket(clientSocket);
665  memset(&remoteAddress, 0, sizeof(remoteAddress));
666 
667  // Attempt reconnection, if configured
668  if ((!isServer) && (tcpReconnectSecondsBetweenRetries > 0)) {
669  for (;;) {
670  try {
671  establishConnection();
672  // Successful reconnection (state change has been signaled inside establishConnection)
673  return;
674  } catch(...) {
675  // An exception has occurred during reconnection. Since the connection
676  // had suceeded originally during construction, we just keep trying.
677  std::this_thread::sleep_for(std::chrono::seconds(tcpReconnectSecondsBetweenRetries));
678  }
679  }
680  }
681  }
682  memset(&remoteAddress, 0, sizeof(remoteAddress));
683 }
684 
685 bool ImageTransfer::Pimpl::isConnected() const {
686  unique_lock<recursive_mutex> lock(const_cast<recursive_mutex&>(sendMutex)); //either mutex will work
687 
688  // This tracks the most up-to-date connection state. For TCP, this simply means socket-level
689  // disconnects or [re]connects. For UDP, which is connectionless, this means tracking whether
690  // the heartbeat replies (and/or payload data) currently arrive (this is established internally
691  // on the level of the DataBlockProtocol).
692  return knownConnectedState;
693 }
694 
695 bool ImageTransfer::Pimpl::sendNetworkMessage(const unsigned char* msg, int length, sockaddr_in* destAddrUdp) {
696  int written = 0;
697  if(protType == ImageProtocol::PROTOCOL_UDP) {
698  sockaddr_in* destAddr;
699  SOCKET destSocket;
700  {
701  unique_lock<recursive_mutex> lock(sendMutex);
702  if (destAddrUdp) {
703  // An overridden UDP destination (i.e. an interfering client)
704  destAddr = destAddrUdp;
705  } else {
706  // The correctly connected client
707  destAddr = &remoteAddress;
708  }
709  destSocket = clientSocket;
710  }
711 
712  if(destAddr->sin_family != AF_INET) {
713  return false; // Not connected
714  }
715 
716  written = sendto(destSocket, reinterpret_cast<const char*>(msg), length, 0,
717  reinterpret_cast<sockaddr*>(destAddr), sizeof(*destAddr));
718  } else {
719  SOCKET destSocket;
720  {
721  unique_lock<recursive_mutex> lock(sendMutex);
722  destSocket = clientSocket;
723  }
724  written = send(destSocket, reinterpret_cast<const char*>(msg), length, 0);
725  }
726 
727  auto sendError = Networking::getErrno();
728 
729  if(written < 0) {
730  if(sendError == EAGAIN || sendError == EWOULDBLOCK || sendError == ETIMEDOUT) {
731  // The socket is not yet ready for a new transfer
732  return false;
733  } else if(sendError == EPIPE) {
734  // The connection has been closed
735  disconnect();
736  return false;
737  } else {
738  TransferException ex("Error sending network packet: " + Networking::getErrorString(sendError));
739  throw ex;
740  }
741  } else if(written != length) {
742  if(protType == ImageProtocol::PROTOCOL_UDP) {
743  // The message has been transmitted partially
744  throw TransferException("Unable to transmit complete UDP message");
745  } else {
746  // For TCP we can transmit the remaining data later
747  currentMsgOffset += written;
748  return false;
749  }
750  } else {
751  return true;
752  }
753 }
754 
755 void ImageTransfer::Pimpl::sendPendingControlMessages() {
756  const unsigned char* controlMsgData = nullptr;
757  int controlMsgLen = 0;
758 
759  while(true) {
760  unique_lock<recursive_mutex> lock(sendMutex);
761  if(remoteAddress.sin_family != AF_INET) {
762  return;
763  }
764 
765  controlMsgData = protocol->getNextControlMessage(controlMsgLen);
766 
767  if(controlMsgData != nullptr) {
768  sendNetworkMessage(controlMsgData, controlMsgLen);
769  } else {
770  break;
771  }
772  }
773 }
774 
775 int ImageTransfer::Pimpl::getNumDroppedFrames() const {
776  return protocol->getNumDroppedFrames();
777 }
778 
779 bool ImageTransfer::Pimpl::isTcpClientClosed(SOCKET sock) {
780  char x;
781  auto ret = recv(sock, &x, 1, MSG_DONTWAIT | MSG_PEEK);
782  return ret == 0;
783 }
784 
785 bool ImageTransfer::Pimpl::selectSocket(bool read, bool wait) {
786  SOCKET sock;
787  {
788  unique_lock<recursive_mutex> lock(sendMutex); // Either mutex will do
789  sock = clientSocket;
790  }
791 #ifdef _WIN32
792  fd_set fds;
793  struct timeval tv;
794  FD_ZERO(&fds);
795  FD_SET(sock, &fds);
796  tv.tv_sec = 0;
797  if(wait) {
798  tv.tv_usec = 100000;
799  } else {
800  tv.tv_usec = 0;
801  }
802 
803  if(select(((int)sock)+1, (read ? &fds : nullptr), (!read ? &fds : nullptr), nullptr, &tv) <= 0) {
804  // The socket is currently not ready
805  return false;
806  }
807 #else
808  // use poll() on non-Windows platform (glibc select() limitations)
809  constexpr int timeoutMillisec = 100;
810  pollfd pfd;
811  pfd.fd = sock;
812  pfd.events = POLLIN;
813  if (poll(&pfd, 1, wait ? timeoutMillisec: 0) <= 0) {
814  // The socket is currently not ready
815  return false;
816  }
817 #endif
818  // select (or poll) reported an event
819  return true;
820 }
821 
822 std::string ImageTransfer::statusReport() {
823  return pimpl->statusReport();
824 }
825 std::string ImageTransfer::Pimpl::statusReport() {
826  return protocol->statusReport();
827 }
828 
829 void ImageTransfer::Pimpl::setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback) {
830  connectionStateChangeCallback = callback;
831 }
832 
833 void ImageTransfer::Pimpl::setAutoReconnect(int secondsBetweenRetries) {
834  tcpReconnectSecondsBetweenRetries = secondsBetweenRetries;
835 }
836 
837 } // namespace
838 
visiontransfer::ImageTransfer::ImageTransfer
ImageTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=16 *1048576, int maxUdpPacketSize=1472, int autoReconnectDelay=1)
Creates a new transfer object by manually specifying the target address.
Definition: imagetransfer.cpp:114
visiontransfer::ImageTransfer::setRawValidBytes
void setRawValidBytes(const std::vector< int > &validBytes)
Updates the number of valid bytes in a partial raw transmission.
Definition: imagetransfer.cpp:138
visiontransfer::ImageTransfer::setConnectionStateChangeCallback
void setConnectionStateChangeCallback(std::function< void(visiontransfer::ConnectionState)> callback)
Install a handler that will be called when the connection state changes (e.g. socket is disconnected)...
Definition: imagetransfer.cpp:180
visiontransfer::ImageTransfer::getRemoteAddress
std::string getRemoteAddress() const
Returns the address of the remote host.
Definition: imagetransfer.cpp:172
visiontransfer::ImageTransfer::getNumDroppedFrames
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host.
Definition: imagetransfer.cpp:158
visiontransfer::ImageTransfer::TransferStatus
TransferStatus
The result of a partial image transfer.
Definition: imagetransfer.h:72
visiontransfer::ImageTransfer::tryAccept
bool tryAccept()
Tries to accept a client connection.
Definition: imagetransfer.cpp:176
visiontransfer::ImageTransfer::receivePartialImageSet
bool receivePartialImageSet(ImageSet &imageSet, int &validRows, bool &complete)
Returns the received image set, even if it is not yet complete.
Definition: imagetransfer.cpp:154
visiontransfer::ImageProtocol::PROTOCOL_UDP
@ PROTOCOL_UDP
The connection-less UDP transport protocol.
Definition: imageprotocol.h:84
visiontransfer::DeviceInfo
Aggregates information about a discovered device.
Definition: deviceinfo.h:59
visiontransfer::ImageSet
A set of one to three images, but usually two (the left camera image and the disparity map)....
Definition: imageset.h:50
visiontransfer::TransferException
Exception class that is used for all transfer exceptions.
Definition: exceptions.h:45
visiontransfer::ImageProtocol::ProtocolType
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:67
visiontransfer::ImageTransfer::disconnect
void disconnect()
Terminates the current connection.
Definition: imagetransfer.cpp:166
visiontransfer::ImageProtocol::PROTOCOL_TCP
@ PROTOCOL_TCP
The connection oriented TCP transport protocol.
Definition: imageprotocol.h:81
visiontransfer::ImageProtocol
A lightweight protocol for transferring image sets.
Definition: imageprotocol.h:52
visiontransfer::ImageTransfer::setTransferImageSet
void setTransferImageSet(const ImageSet &imageSet)
Sets a new image set that shall be transmitted.
Definition: imagetransfer.cpp:142
visiontransfer::ImageTransfer::transferData
TransferStatus transferData()
Performs a partial (or full) image transmission.
Definition: imagetransfer.cpp:146
visiontransfer::ImageTransfer::setRawTransferData
void setRawTransferData(const ImageSet &metaData, const std::vector< unsigned char * > &rawData, int firstTileWidth=0, int middleTileWidth=0, int lastTileWidth=0)
Sets the raw pixel data for a partial image transmission.
Definition: imagetransfer.cpp:133
visiontransfer::ConnectionClosedException
Exception class that is used for a busy server terminating a connection.
Definition: exceptions.h:53
visiontransfer::ImageTransfer::receiveImageSet
bool receiveImageSet(ImageSet &imageSet)
Waits for and receives a new image set.
Definition: imagetransfer.cpp:150
visiontransfer::ImageTransfer::isConnected
bool isConnected() const
Returns true if a remote connection is established (and not temporarily disconnected)....
Definition: imagetransfer.cpp:162
Allied Vision