15 #if __GNUC__ == 4 && __GNUC_MINOR__ < 9
18 #include <bits/c++config.h>
19 #undef _GLIBCXX_USE_CLOCK_MONOTONIC
26 #include <condition_variable>
32 #include "visiontransfer/asynctransfer.h"
33 #include "visiontransfer/internal/alignedallocator.h"
36 using namespace visiontransfer;
37 using namespace visiontransfer::internal;
39 namespace visiontransfer {
43 class AsyncTransfer::Pimpl {
45 Pimpl(
const char* address,
const char* service,
47 int bufferSize,
int maxUdpPacketSize,
int autoReconnectDelay);
51 void sendImageSetAsync(
const ImageSet& imageSet,
bool deleteData);
52 bool collectReceivedImageSet(
ImageSet& imageSet,
double timeout);
53 int getNumDroppedFrames()
const;
54 bool isConnected()
const;
56 std::string getRemoteAddress()
const;
58 void setConnectionStateChangeCallback(std::function<
void(visiontransfer::ConnectionState)> callback);
59 void setAutoReconnect(
int secondsBetweenRetries);
62 static constexpr
int NUM_BUFFERS = ImageSet::MAX_SUPPORTED_IMAGES * 3;
63 static constexpr
int SEND_THREAD_SHORT_WAIT_MS = 1;
64 static constexpr
int SEND_THREAD_LONG_WAIT_MS = 10;
70 volatile bool terminate;
74 std::thread sendThread;
76 std::condition_variable sendCond;
77 std::condition_variable sendWaitCond;
79 std::thread receiveThread;
80 std::timed_mutex receiveMutex;
81 std::condition_variable_any receiveCond;
85 std::vector<unsigned char, AlignedAllocator<unsigned char> > receivedData[NUM_BUFFERS];
86 volatile int receiveBufferIndex;
87 volatile bool newDataReceived;
94 std::exception_ptr receiveException;
95 std::exception_ptr sendException;
97 bool sendThreadCreated;
98 bool receiveThreadCreated;
102 int uncollectedDroppedFrames;
110 void createSendThread();
115 AsyncTransfer::AsyncTransfer(
const char* address,
const char* service,
117 int bufferSize,
int maxUdpPacketSize,
int autoReconnectDelay)
118 : pimpl(new Pimpl(address, service, protType, server, bufferSize, maxUdpPacketSize,
119 autoReconnectDelay)) {
123 int autoReconnectDelay)
124 : pimpl(new Pimpl(device.getIpAddress().c_str(),
"7681", static_cast<
ImageProtocol::ProtocolType>(device.getNetworkProtocol()),
125 false, bufferSize, maxUdpPacketSize, autoReconnectDelay)) {
128 AsyncTransfer::~AsyncTransfer() {
133 pimpl->sendImageSetAsync(imageSet, deleteData);
137 return pimpl->collectReceivedImageSet(imageSet, timeout);
141 return pimpl->getNumDroppedFrames();
145 return pimpl->isConnected();
150 pimpl->setAutoReconnect(0);
155 return pimpl->getRemoteAddress();
159 return pimpl->tryAccept();
163 pimpl->setConnectionStateChangeCallback(callback);
166 void AsyncTransfer::setAutoReconnect(
int secondsBetweenRetries) {
167 pimpl->setAutoReconnect(secondsBetweenRetries);
172 AsyncTransfer::Pimpl::Pimpl(
const char* address,
const char* service,
174 int bufferSize,
int maxUdpPacketSize,
int autoReconnectDelay)
175 : imgTrans(address, service, protType, server, bufferSize, maxUdpPacketSize, autoReconnectDelay),
176 terminate(false), receiveBufferIndex(0), newDataReceived(false), sendSetValid(false),
177 deleteSendData(false), sendThreadCreated(false),
178 receiveThreadCreated(false), uncollectedDroppedFrames(-1) {
185 AsyncTransfer::Pimpl::~Pimpl() {
188 sendCond.notify_all();
189 receiveCond.notify_all();
190 sendWaitCond.notify_all();
192 if(sendThreadCreated && sendThread.joinable()) {
196 if(receiveThreadCreated && receiveThread.joinable()) {
197 receiveThread.join();
200 if(sendSetValid && deleteSendData) {
201 delete[] sendImageSet.getPixelData(0);
202 delete[] sendImageSet.getPixelData(1);
206 void AsyncTransfer::Pimpl::createSendThread() {
207 if(!sendThreadCreated) {
209 unique_lock<mutex> lock(sendMutex);
210 sendThread = thread(bind(&AsyncTransfer::Pimpl::sendLoop,
this));
211 sendThreadCreated =
true;
215 void AsyncTransfer::Pimpl::sendImageSetAsync(
const ImageSet& imageSet,
bool deleteData) {
219 unique_lock<mutex> lock(sendMutex);
223 std::rethrow_exception(sendException);
227 sendImageSet = imageSet;
229 deleteSendData = deleteData;
232 sendCond.notify_one();
237 sendWaitCond.wait(lock);
242 bool AsyncTransfer::Pimpl::collectReceivedImageSet(
ImageSet& imageSet,
double timeout) {
243 if(!receiveThreadCreated) {
245 unique_lock<timed_mutex> lock(receiveMutex);
246 receiveThreadCreated =
true;
247 receiveThread = thread(bind(&AsyncTransfer::Pimpl::receiveLoop,
this));
251 unique_lock<timed_mutex> lock(receiveMutex, std::defer_lock);
255 std::chrono::steady_clock::time_point lockStart =
256 std::chrono::steady_clock::now();
257 if(!lock.try_lock_for(std::chrono::microseconds(
static_cast<unsigned int>(timeout*1e6)))) {
263 unsigned int lockDuration =
static_cast<unsigned int>(std::chrono::duration_cast<std::chrono::microseconds>(
264 std::chrono::steady_clock::now() - lockStart).count());
265 timeout = std::max(0.0, timeout - lockDuration*1e-6);
269 if(receiveException) {
270 std::rethrow_exception(receiveException);
273 if(timeout == 0 && !newDataReceived) {
279 if(!newDataReceived) {
281 while(!terminate && !receiveException && !newDataReceived) {
282 receiveCond.wait(lock);
285 receiveCond.wait_for(lock, std::chrono::microseconds(
static_cast<unsigned int>(timeout*1e6)));
290 if(receiveException) {
291 std::rethrow_exception(receiveException);
294 if(newDataReceived) {
296 imageSet = receivedSet;
298 newDataReceived =
false;
301 receiveBufferIndex = (receiveBufferIndex + receivedSet.getNumberOfImages()) % NUM_BUFFERS;
304 if (uncollectedDroppedFrames < 0) uncollectedDroppedFrames = 0;
312 void AsyncTransfer::Pimpl::sendLoop() {
315 unique_lock<mutex> lock(sendMutex);
319 bool deleteSet =
false;
325 unique_lock<mutex> lock(sendMutex);
327 bool firstWait =
true;
328 while(!terminate && !sendSetValid) {
329 imgTrans.transferData();
330 sendCond.wait_for(lock, std::chrono::milliseconds(
331 firstWait ? SEND_THREAD_SHORT_WAIT_MS : SEND_THREAD_LONG_WAIT_MS));
338 imgSet = sendImageSet;
339 deleteSet = deleteSendData;
340 sendSetValid =
false;
342 sendWaitCond.notify_one();
345 imgTrans.setTransferImageSet(imgSet);
351 std::this_thread::sleep_for(std::chrono::milliseconds(SEND_THREAD_LONG_WAIT_MS));
364 sendException = std::current_exception();
366 sendWaitCond.notify_all();
378 void AsyncTransfer::Pimpl::receiveLoop() {
381 unique_lock<timed_mutex> lock(receiveMutex);
389 bool newImageSetArrived = imgTrans.receiveImageSet(currentSet);
391 if (newImageSetArrived) {
392 unique_lock<timed_mutex> lock(receiveMutex);
393 if (newDataReceived) {
395 if (uncollectedDroppedFrames > -1) uncollectedDroppedFrames++;
400 int newStride = currentSet.
getWidth() * bytesPerPixel;
401 int totalSize = currentSet.
getHeight() * newStride;
402 int bufIdxHere = (i + receiveBufferIndex) % NUM_BUFFERS;
403 if(
static_cast<int>(receivedData[bufIdxHere].size()) < totalSize) {
404 receivedData[bufIdxHere].resize(totalSize);
407 memcpy(&receivedData[bufIdxHere][0], currentSet.
getPixelData(i),
410 for(
int y = 0; y<currentSet.
getHeight(); y++) {
411 memcpy(&receivedData[bufIdxHere][y*newStride],
417 currentSet.
setPixelData(i, &receivedData[bufIdxHere][0]);
422 newDataReceived =
true;
423 receivedSet = currentSet;
424 receiveCond.notify_one();
430 if(!receiveException) {
431 receiveException = std::current_exception();
433 receiveCond.notify_all();
437 bool AsyncTransfer::Pimpl::isConnected()
const {
438 return imgTrans.isConnected();
441 void AsyncTransfer::Pimpl::disconnect() {
442 imgTrans.disconnect();
445 std::string AsyncTransfer::Pimpl::getRemoteAddress()
const {
446 return imgTrans.getRemoteAddress();
449 int AsyncTransfer::Pimpl::getNumDroppedFrames()
const {
450 return imgTrans.getNumDroppedFrames() + uncollectedDroppedFrames;
453 bool AsyncTransfer::Pimpl::tryAccept() {
454 return imgTrans.tryAccept();
457 void AsyncTransfer::Pimpl::setConnectionStateChangeCallback(std::function<
void(visiontransfer::ConnectionState)> callback) {
458 imgTrans.setConnectionStateChangeCallback(callback);
461 void AsyncTransfer::Pimpl::setAutoReconnect(
int secondsBetweenRetries) {
462 imgTrans.setAutoReconnect(secondsBetweenRetries);
465 constexpr
int AsyncTransfer::Pimpl::NUM_BUFFERS;
466 constexpr
int AsyncTransfer::Pimpl::SEND_THREAD_SHORT_WAIT_MS;
467 constexpr
int AsyncTransfer::Pimpl::SEND_THREAD_LONG_WAIT_MS;