libvisiontransfer  10.8.0
asynctransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2024 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
16 // This is a very ugly workaround for GCC bug 54562. If omitted,
17 // passing timeouts to collectReceivedImage() is broken.
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
20 #endif
21 
22 #include <iostream>
23 #include <functional>
24 #include <stdexcept>
25 #include <thread>
26 #include <condition_variable>
27 #include <chrono>
28 #include <mutex>
29 #include <vector>
30 #include <cstring>
31 #include <algorithm>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/internal/alignedallocator.h"
34 
35 using namespace std;
36 using namespace visiontransfer;
37 using namespace visiontransfer::internal;
38 
39 namespace visiontransfer {
40 
41 /*************** Pimpl class containing all private members ***********/
42 
43 class AsyncTransfer::Pimpl {
44 public:
45  Pimpl(const char* address, const char* service,
46  ImageProtocol::ProtocolType protType, bool server,
47  int bufferSize, int maxUdpPacketSize, int autoReconnectDelay);
48  ~Pimpl();
49 
50  // Redeclaration of public members
51  void sendImageSetAsync(const ImageSet& imageSet, bool deleteData);
52  bool collectReceivedImageSet(ImageSet& imageSet, double timeout);
53  int getNumDroppedFrames() const;
54  bool isConnected() const;
55  void disconnect();
56  std::string getRemoteAddress() const;
57  bool tryAccept();
58  void setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback);
59  void setAutoReconnect(int secondsBetweenRetries);
60 
61 private:
62  static constexpr int NUM_BUFFERS = ImageSet::MAX_SUPPORTED_IMAGES * 3;
63  static constexpr int SEND_THREAD_SHORT_WAIT_MS = 1;
64  static constexpr int SEND_THREAD_LONG_WAIT_MS = 10;
65 
66  // The encapsulated image transfer object
67  ImageTransfer imgTrans;
68 
69  // Variable for controlling thread termination
70  volatile bool terminate;
71 
72  // There are two threads, one for sending and one for receiving.
73  // Each has a mutex and condition variable for synchronization.
74  std::thread sendThread;
75  std::mutex sendMutex;
76  std::condition_variable sendCond;
77  std::condition_variable sendWaitCond;
78 
79  std::thread receiveThread;
80  std::timed_mutex receiveMutex;
81  std::condition_variable_any receiveCond;
82 
83  // Objects for exchanging images with the send and receive threads
84  ImageSet receivedSet;
85  std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
86  volatile int receiveBufferIndex;
87  volatile bool newDataReceived;
88 
89  ImageSet sendImageSet;
90  bool sendSetValid;
91  bool deleteSendData;
92 
93  // Exception occurred in one of the threads
94  std::exception_ptr receiveException;
95  std::exception_ptr sendException;
96 
97  bool sendThreadCreated;
98  bool receiveThreadCreated;
99 
100  // Count of additional locally dropped frames (due to not being collected in time)
101  // Only starts counting with the first call of collectReceivedImagePair()
102  int uncollectedDroppedFrames;
103 
104  // Main loop for sending thread
105  void sendLoop();
106 
107  // Main loop for receiving;
108  void receiveLoop();
109 
110  void createSendThread();
111 };
112 
113 /******************** Stubs for all public members ********************/
114 
115 AsyncTransfer::AsyncTransfer(const char* address, const char* service,
116  ImageProtocol::ProtocolType protType, bool server,
117  int bufferSize, int maxUdpPacketSize, int autoReconnectDelay)
118  : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize,
119  autoReconnectDelay)) {
120 }
121 
122 AsyncTransfer::AsyncTransfer(const DeviceInfo& device, int bufferSize, int maxUdpPacketSize,
123  int autoReconnectDelay)
124  : pimpl(new Pimpl(device.getIpAddress().c_str(), "7681", static_cast<ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
125  false, bufferSize, maxUdpPacketSize, autoReconnectDelay)) {
126 }
127 
128 AsyncTransfer::~AsyncTransfer() {
129  delete pimpl;
130 }
131 
132 void AsyncTransfer::sendImageSetAsync(const ImageSet& imageSet, bool deleteData) {
133  pimpl->sendImageSetAsync(imageSet, deleteData);
134 }
135 
136 bool AsyncTransfer::collectReceivedImageSet(ImageSet& imageSet, double timeout) {
137  return pimpl->collectReceivedImageSet(imageSet, timeout);
138 }
139 
141  return pimpl->getNumDroppedFrames();
142 }
143 
145  return pimpl->isConnected();
146 }
147 
149  // User-requested disconnect: disable reconnection first
150  pimpl->setAutoReconnect(0);
151  pimpl->disconnect();
152 }
153 
154 std::string AsyncTransfer::getRemoteAddress() const {
155  return pimpl->getRemoteAddress();
156 }
157 
159  return pimpl->tryAccept();
160 }
161 
162 void AsyncTransfer::setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback) {
163  pimpl->setConnectionStateChangeCallback(callback);
164 }
165 
166 void AsyncTransfer::setAutoReconnect(int secondsBetweenRetries) {
167  pimpl->setAutoReconnect(secondsBetweenRetries);
168 }
169 
170 /******************** Implementation in pimpl class *******************/
171 
172 AsyncTransfer::Pimpl::Pimpl(const char* address, const char* service,
173  ImageProtocol::ProtocolType protType, bool server,
174  int bufferSize, int maxUdpPacketSize, int autoReconnectDelay)
175  : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize, autoReconnectDelay),
176  terminate(false), receiveBufferIndex(0), newDataReceived(false), sendSetValid(false),
177  deleteSendData(false), sendThreadCreated(false),
178  receiveThreadCreated(false), uncollectedDroppedFrames(-1) {
179 
180  if(server) {
181  createSendThread();
182  }
183 }
184 
185 AsyncTransfer::Pimpl::~Pimpl() {
186  terminate = true;
187 
188  sendCond.notify_all();
189  receiveCond.notify_all();
190  sendWaitCond.notify_all();
191 
192  if(sendThreadCreated && sendThread.joinable()) {
193  sendThread.join();
194  }
195 
196  if(receiveThreadCreated && receiveThread.joinable()) {
197  receiveThread.join();
198  }
199 
200  if(sendSetValid && deleteSendData) {
201  delete[] sendImageSet.getPixelData(0);
202  delete[] sendImageSet.getPixelData(1);
203  }
204 }
205 
206 void AsyncTransfer::Pimpl::createSendThread() {
207  if(!sendThreadCreated) {
208  // Lazy initialization of the send thread as it is not always needed
209  unique_lock<mutex> lock(sendMutex);
210  sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop, this));
211  sendThreadCreated = true;
212  }
213 }
214 
215 void AsyncTransfer::Pimpl::sendImageSetAsync(const ImageSet& imageSet, bool deleteData) {
216  createSendThread();
217 
218  while(true) {
219  unique_lock<mutex> lock(sendMutex);
220 
221  // Test for errors
222  if(sendException) {
223  std::rethrow_exception(sendException);
224  }
225 
226  if(!sendSetValid) {
227  sendImageSet = imageSet;
228  sendSetValid = true;
229  deleteSendData = deleteData;
230 
231  // Wake up the sender thread
232  sendCond.notify_one();
233 
234  return;
235  } else {
236  // Wait for old data to be processed first
237  sendWaitCond.wait(lock);
238  }
239  }
240 }
241 
242 bool AsyncTransfer::Pimpl::collectReceivedImageSet(ImageSet& imageSet, double timeout) {
243  if(!receiveThreadCreated) {
244  // Lazy initialization of receive thread
245  unique_lock<timed_mutex> lock(receiveMutex);
246  receiveThreadCreated = true;
247  receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop, this));
248  }
249 
250  // Acquire mutex
251  unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
252  if(timeout < 0) {
253  lock.lock();
254  } else {
255  std::chrono::steady_clock::time_point lockStart =
256  std::chrono::steady_clock::now();
257  if(!lock.try_lock_for(std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)))) {
258  // Timed out
259  return false;
260  }
261 
262  // Update timeout
263  unsigned int lockDuration = static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
264  std::chrono::steady_clock::now() - lockStart).count());
265  timeout = std::max(0.0, timeout - lockDuration*1e-6);
266  }
267 
268  // Test for errors
269  if(receiveException) {
270  std::rethrow_exception(receiveException);
271  }
272 
273  if(timeout == 0 && !newDataReceived) {
274  // No image has been received and we are not blocking
275  return false;
276  }
277 
278  // If there is no data yet then keep on waiting
279  if(!newDataReceived) {
280  if(timeout < 0) {
281  while(!terminate && !receiveException && !newDataReceived) {
282  receiveCond.wait(lock);
283  }
284  } else {
285  receiveCond.wait_for(lock, std::chrono::microseconds(static_cast<unsigned int>(timeout*1e6)));
286  }
287  }
288 
289  // Test for errors again
290  if(receiveException) {
291  std::rethrow_exception(receiveException);
292  }
293 
294  if(newDataReceived) {
295  // Get the received image
296  imageSet = receivedSet;
297 
298  newDataReceived = false;
299 
300  // Increment index for data buffers
301  receiveBufferIndex = (receiveBufferIndex + receivedSet.getNumberOfImages()) % NUM_BUFFERS;
302 
303  // Start counting uncollected frames at the time of first collection
304  if (uncollectedDroppedFrames < 0) uncollectedDroppedFrames = 0;
305 
306  return true;
307  } else {
308  return false;
309  }
310 }
311 
312 void AsyncTransfer::Pimpl::sendLoop() {
313  {
314  // Delay the thread start
315  unique_lock<mutex> lock(sendMutex);
316  }
317 
318  ImageSet imgSet;
319  bool deleteSet = false;
320 
321  try {
322  while(!terminate) {
323  // Wait for next image
324  {
325  unique_lock<mutex> lock(sendMutex);
326  // Wait for next frame to be queued
327  bool firstWait = true;
328  while(!terminate && !sendSetValid) {
329  imgTrans.transferData();
330  sendCond.wait_for(lock, std::chrono::milliseconds(
331  firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
332  firstWait = false;
333  }
334  if(!sendSetValid) {
335  continue;
336  }
337 
338  imgSet = sendImageSet;
339  deleteSet = deleteSendData;
340  sendSetValid = false;
341 
342  sendWaitCond.notify_one();
343  }
344 
345  imgTrans.setTransferImageSet(imgSet);
346  while(!terminate) {
347  ImageTransfer::TransferStatus status = imgTrans.transferData();
349  break;
350  }
351  std::this_thread::sleep_for(std::chrono::milliseconds(SEND_THREAD_LONG_WAIT_MS));
352  }
353 
354  if(deleteSet) {
355  for (int i=0; i<imgSet.getNumberOfImages(); ++i) {
356  delete[] imgSet.getPixelData(i);
357  }
358  deleteSet = false;
359  }
360  }
361  } catch(...) {
362  // Store the exception for later
363  if(!sendException) {
364  sendException = std::current_exception();
365  }
366  sendWaitCond.notify_all();
367 
368  // Don't forget to free the memory
369  if(deleteSet) {
370  for (int i=0; i<imgSet.getNumberOfImages(); ++i) {
371  delete[] imgSet.getPixelData(i);
372  }
373  deleteSet = false;
374  }
375  }
376 }
377 
378 void AsyncTransfer::Pimpl::receiveLoop() {
379  {
380  // Delay the thread start
381  unique_lock<timed_mutex> lock(receiveMutex);
382  }
383 
384  try {
385  ImageSet currentSet;
386 
387  while(!terminate) {
388  // Receive new image (blocks internally)
389  bool newImageSetArrived = imgTrans.receiveImageSet(currentSet);
390 
391  if (newImageSetArrived) {
392  unique_lock<timed_mutex> lock(receiveMutex);
393  if (newDataReceived) {
394  // collectReceivedImageSet() frequency was too low; previous frame lost
395  if (uncollectedDroppedFrames > -1) uncollectedDroppedFrames++;
396  }
397  // Copy the pixel data
398  for(int i=0;i<currentSet.getNumberOfImages();i++) {
399  int bytesPerPixel = currentSet.getBytesPerPixel(i);
400  int newStride = currentSet.getWidth() * bytesPerPixel;
401  int totalSize = currentSet.getHeight() * newStride;
402  int bufIdxHere = (i + receiveBufferIndex) % NUM_BUFFERS;
403  if(static_cast<int>(receivedData[bufIdxHere].size()) < totalSize) {
404  receivedData[bufIdxHere].resize(totalSize);
405  }
406  if(newStride == currentSet.getRowStride(i)) {
407  memcpy(&receivedData[bufIdxHere][0], currentSet.getPixelData(i),
408  newStride*currentSet.getHeight());
409  } else {
410  for(int y = 0; y<currentSet.getHeight(); y++) {
411  memcpy(&receivedData[bufIdxHere][y*newStride],
412  &currentSet.getPixelData(i)[y*currentSet.getRowStride(i)],
413  newStride);
414  }
415  currentSet.setRowStride(i, newStride);
416  }
417  currentSet.setPixelData(i, &receivedData[bufIdxHere][0]);
418  }
419  // N.B. receiveBufferIndex is only increased at collection time
420 
421  // Notify that a new image set has been received
422  newDataReceived = true;
423  receivedSet = currentSet;
424  receiveCond.notify_one();
425  }
426 
427  }
428  } catch(...) {
429  // Store the exception for later
430  if(!receiveException) {
431  receiveException = std::current_exception();
432  }
433  receiveCond.notify_all();
434  }
435 }
436 
437 bool AsyncTransfer::Pimpl::isConnected() const {
438  return imgTrans.isConnected();
439 }
440 
441 void AsyncTransfer::Pimpl::disconnect() {
442  imgTrans.disconnect();
443 }
444 
445 std::string AsyncTransfer::Pimpl::getRemoteAddress() const {
446  return imgTrans.getRemoteAddress();
447 }
448 
449 int AsyncTransfer::Pimpl::getNumDroppedFrames() const {
450  return imgTrans.getNumDroppedFrames() + uncollectedDroppedFrames;
451 }
452 
453 bool AsyncTransfer::Pimpl::tryAccept() {
454  return imgTrans.tryAccept();
455 }
456 
457 void AsyncTransfer::Pimpl::setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback) {
458  imgTrans.setConnectionStateChangeCallback(callback);
459 }
460 
461 void AsyncTransfer::Pimpl::setAutoReconnect(int secondsBetweenRetries) {
462  imgTrans.setAutoReconnect(secondsBetweenRetries);
463 }
464 
465 constexpr int AsyncTransfer::Pimpl::NUM_BUFFERS;
466 constexpr int AsyncTransfer::Pimpl::SEND_THREAD_SHORT_WAIT_MS;
467 constexpr int AsyncTransfer::Pimpl::SEND_THREAD_LONG_WAIT_MS;
468 
469 } // namespace
470 
visiontransfer::ImageSet::getHeight
int getHeight() const
Returns the height of each image.
Definition: imageset.cpp:533
visiontransfer::AsyncTransfer::AsyncTransfer
AsyncTransfer(const char *address, const char *service="7681", ImageProtocol::ProtocolType protType=ImageProtocol::PROTOCOL_UDP, bool server=false, int bufferSize=16 *1048576, int maxUdpPacketSize=1472, int autoReconnectDelay=1)
Creates a new transfer object.
Definition: asynctransfer.cpp:115
visiontransfer::AsyncTransfer::tryAccept
bool tryAccept()
Tries to accept a client connection.
Definition: asynctransfer.cpp:158
visiontransfer::ImageSet::getWidth
int getWidth() const
Returns the width of each image.
Definition: imageset.cpp:529
visiontransfer::ImageSet::getRowStride
int getRowStride(int imageNumber) const
Returns the row stride for the pixel data of one image.
Definition: imageset.cpp:537
visiontransfer::ImageSet::getNumberOfImages
int getNumberOfImages() const
Returns the number of images in this set.
Definition: imageset.cpp:601
visiontransfer::AsyncTransfer::isConnected
bool isConnected() const
Returns true if a remote connection is established.
Definition: asynctransfer.cpp:144
visiontransfer::ImageSet::setPixelData
void setPixelData(int imageNumber, unsigned char *pixelData)
Sets the pixel data for the given image.
Definition: imageset.cpp:501
visiontransfer::ImageSet::getPixelData
unsigned char * getPixelData(int imageNumber) const
Returns the pixel data for the given image.
Definition: imageset.cpp:553
visiontransfer::ImageTransfer::TransferStatus
TransferStatus
The result of a partial image transfer.
Definition: imagetransfer.h:72
visiontransfer::DeviceInfo
Aggregates information about a discovered device.
Definition: deviceinfo.h:59
visiontransfer::ImageSet
A set of one to three images, but usually two (the left camera image and the disparity map)....
Definition: imageset.h:50
visiontransfer::AsyncTransfer::getRemoteAddress
std::string getRemoteAddress() const
Returns the address of the remote host.
Definition: asynctransfer.cpp:154
visiontransfer::ImageTransfer::WOULD_BLOCK
@ WOULD_BLOCK
The operation would block and blocking as been disabled.
Definition: imagetransfer.h:96
visiontransfer::ImageTransfer::PARTIAL_TRANSFER
@ PARTIAL_TRANSFER
Definition: imagetransfer.h:90
visiontransfer::ImageSet::setRowStride
void setRowStride(int imageNumber, int stride)
Sets a new row stride for the pixel data of one image.
Definition: imageset.cpp:493
visiontransfer::ImageSet::getBytesPerPixel
int getBytesPerPixel(int imageNumber) const
Returns the number of bytes that are required to store one image pixel.
Definition: imageset.cpp:589
visiontransfer::ImageProtocol::ProtocolType
ProtocolType
Supported network protocols.
Definition: imageprotocol.h:67
visiontransfer::AsyncTransfer::setConnectionStateChangeCallback
void setConnectionStateChangeCallback(std::function< void(visiontransfer::ConnectionState)> callback)
Install a handler that will be called when the connection state changes (e.g. socket is disconnected)...
Definition: asynctransfer.cpp:162
visiontransfer::AsyncTransfer::sendImageSetAsync
void sendImageSetAsync(const ImageSet &imageSet, bool deleteData=false)
Starts an asynchronous transmission of the given image set.
Definition: asynctransfer.cpp:132
visiontransfer::ImageProtocol
A lightweight protocol for transferring image sets.
Definition: imageprotocol.h:52
visiontransfer::ImageTransfer
Class for synchronous transfer of image sets.
Definition: imagetransfer.h:57
visiontransfer::AsyncTransfer::disconnect
void disconnect()
Terminates the current connection.
Definition: asynctransfer.cpp:148
visiontransfer::AsyncTransfer::collectReceivedImageSet
bool collectReceivedImageSet(ImageSet &imageSet, double timeout=-1)
Collects the asynchronously received image.
Definition: asynctransfer.cpp:136
visiontransfer::AsyncTransfer::getNumDroppedFrames
int getNumDroppedFrames() const
Returns the number of frames that have been dropped since connecting to the current remote host.
Definition: asynctransfer.cpp:140
Allied Vision