libvisiontransfer  10.8.0
parametertransfer.cpp
1 /*******************************************************************************
2  * Copyright (c) 2024 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <iostream>
16 
17 #include "visiontransfer/exceptions.h"
18 #include "visiontransfer/internal/parametertransfer.h"
19 #include "visiontransfer/internal/internalinformation.h"
20 #include "visiontransfer/internal/parametertransferdata.h"
21 #include "visiontransfer/internal/parameterserialization.h"
22 
23 #include <cstring>
24 #include <string>
25 #include <functional>
26 #include <atomic>
27 #include <chrono>
28 
29 using namespace std;
30 using namespace visiontransfer;
31 using namespace visiontransfer::internal;
32 using namespace visiontransfer::param;
33 
34 namespace visiontransfer {
35 namespace internal {
36 
37 constexpr int ParameterTransfer::SOCKET_TIMEOUT_MS;
38 
39 thread_local bool ParameterTransfer::transactionInProgress = false;
40 thread_local std::vector<std::pair<std::string, std::string> > ParameterTransfer::transactionQueuedWrites = {};
41 thread_local bool ParameterTransfer::writingProhibited = false;
42 
43 ParameterTransfer::ParameterTransfer(const char* address, const char* service)
44  : socket(INVALID_SOCKET), address(address), service(service), networkReady(false), featureDisabledTransactions(false) {
45 
46  tabTokenizer.collapse(false).separators({"\t"});
47  spaceTokenizer.collapse(true).separators({" "});
48  slashTokenizer.collapse(false).separators({"/"});
49 
50  Networking::initNetworking();
51  attemptConnection();
52 }
53 
54 ParameterTransfer::~ParameterTransfer() {
55  threadRunning = false;
56  if (receiverThread->joinable()) {
57  receiverThread->join();
58  }
59 
60  if(socket != INVALID_SOCKET) {
61  Networking::closeSocket(socket);
62  }
63 }
64 
66  // This is technically 'more' than just being connected raw;
67  // when true, this connection has passed the full handshake and is operational.
68  return networkReady;
69 }
70 
71 // Will attempt initial connection or reconnection after error
72 void ParameterTransfer::attemptConnection() {
73  std::unique_lock<std::mutex> localLock(socketModificationMutex);
74 
75  addrinfo* addressInfo = Networking::resolveAddress(address.c_str(), service.c_str());
76 
77  socket = Networking::connectTcpSocket(addressInfo);
78  Networking::setSocketTimeout(socket, SOCKET_TIMEOUT_MS);
79 
80  if (!receiverThread) {
81  receiverThread = std::make_shared<std::thread>(std::bind(&ParameterTransfer::receiverRoutine, this));
82  }
83  networkError = false;
84  pollDelay = 1000;
85 
86  // Initial 'GetAll' command
87  size_t written = send(socket, "A\n", 2, 0);
88  if(written != 2) {
89  Networking::closeSocket(socket);
90  socket = INVALID_SOCKET;
91  networkReady = false;
92  networkError = true;
93  TransferException ex("Error sending GetAllParameter request: " + Networking::getLastErrorString());
94  throw ex;
95  }
96 
97  freeaddrinfo(addressInfo);
98 }
99 
100 void ParameterTransfer::waitNetworkReady() const {
101  if (!networkReady) {
102  // Block for network to become ready
103  std::unique_lock<std::mutex> readyLock(readyMutex);
104  auto status = readyCond.wait_for(readyLock, std::chrono::milliseconds(3000));
105  if (status == std::cv_status::timeout) {
106  throw TransferException("Timeout waiting for parameter server ready state");
107  }
108  }
109 }
110 
111 void ParameterTransfer::readParameter(unsigned char messageType, const char* id, unsigned char* dest, int length) {
112  waitNetworkReady();
113  if (networkError) {
114  // collecting deferred error from background thread
115  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
116  }
117 
118  for (int i=0; i<length; ++i) { dest[i] = '\0'; } // PLACEHOLDER
119 }
120 
121 void ParameterTransfer::sendNetworkCommand(const std::string& cmdline, const std::string& diagStr) {
122  std::unique_lock<std::mutex> localLock(socketModificationMutex);
123  if (socket == INVALID_SOCKET) {
124  throw TransferException("Connection has been closed and not reconnected so far");
125  }
126  size_t written = send(socket, cmdline.c_str(), (int) cmdline.size(), 0);
127  if(written != cmdline.size()) {
128  throw TransferException("Error sending "+diagStr+" request: " + Networking::getLastErrorString());
129  }
130 }
131 
132 template<typename T>
133 void ParameterTransfer::writeParameter(const char* id, const T& value, bool synchronous) {
134  waitNetworkReady();
135  if (networkError) {
136  // collecting deferred error from background thread
137  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
138  }
139  if (!hasParameter(id)) {
140  throw ParameterException("Invalid parameter: " + std::string(id));
141  }
142 
143  // Assemble a set request with our thread id - the receiver thread can unblock us based on that ID
144  // For 'fire-and-forget' commands without reply, we use -1
145  std::stringstream ss;
146  ss << "S" << "\t" << (synchronous ? getThreadId() : -1) << "\t" << id << "\t" << value << "\n";
147 
148  if (synchronous) {
149  blockingCallThisThread([this, &ss](){
150  sendNetworkCommand(ss.str(), "parameter set");
151  });
152  auto result = lastSetRequestResult[getThreadId()];
153  if (result.first == false) {
154  // There was a remote error, append its info to the exception
155  throw ParameterException("Remote parameter error: " + result.second);
156  } else {
157  // Local preliminary value update - the (successful!) async remote update may need additional time.
158  // The actual value MIGHT have been revised by the server, but in the vast majority of cases this allows
159  // reading back the successfully written parameter. The safest way is via setParameterUpdateCallback.
160  std::unique_lock<std::mutex> globalLock(mapMutex);
161  auto& param = paramSet[id];
162  if (param.isScalar()) {
163  param.setCurrent<T>(value);
164  } else {
165  // Should not be required here (only for the std::string specialization below)
166  auto toks = spaceTokenizer.tokenize(std::to_string(value));
167  std::vector<double> vs;
168  for (auto& t: toks) vs.push_back(atof(t.c_str()));
169  param.setTensorData(vs);
170  }
171  }
172  } else {
173  // 'Fire and forget' immediate-return mode, e.g. for sending a trigger
174  sendNetworkCommand(ss.str(), "parameter set");
175  }
176 }
177 
178 // Explicit instantiation for std::string
179 template<>
180 void ParameterTransfer::writeParameter(const char* id, const std::string& value, bool synchronous) {
181  waitNetworkReady();
182  if (networkError) {
183  // collecting deferred error from background thread
184  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
185  }
186  if (!hasParameter(id)) {
187  throw ParameterException("Invalid parameter: " + std::string(id));
188  }
189 
190  // Assemble a set request with our thread id - the receiver thread can unblock us based on that ID
191  // For 'fire-and-forget' commands without reply, we use -1
192  std::stringstream ss;
193  ss << "S" << "\t" << (synchronous ? getThreadId() : -1) << "\t" << id << "\t" << value << "\n";
194 
195  if (synchronous) {
196  blockingCallThisThread([this, &id, &value, &ss](){
197  sendNetworkCommand(ss.str(), "parameter set");
198  });
199  auto result = lastSetRequestResult[getThreadId()];
200  if (result.first == false) {
201  // There was a remote error, append its info to the exception
202  throw ParameterException("Remote parameter error: " + result.second);
203  } else {
204  // Local preliminary value update - the (successful!) async remote update may need additional time.
205  // The actual value MIGHT have been revised by the server, but in the vast majority of cases this allows
206  // reading back the successfully written parameter. The safest way is via setParameterUpdateCallback.
207  std::unique_lock<std::mutex> globalLock(mapMutex);
208  auto& param = paramSet[id];
209  if (param.isScalar()) {
210  param.setCurrent<std::string>(value);
211  } else {
212  auto toks = spaceTokenizer.tokenize(value);
213  std::vector<double> vs;
214  for (auto& t: toks) vs.push_back(atof(t.c_str()));
215  param.setTensorData(vs);
216  }
217  }
218  } else {
219  // 'Fire and forget' immediate-return mode, e.g. for sending a trigger
220  sendNetworkCommand(ss.str(), "parameter set");
221  }
222 }
223 
224 template<typename T>
225 void ParameterTransfer::writeParameterTransactionGuardedImpl(const char* id, const T& value) {
226  if (writingProhibited) {
227  throw ParameterException("Writing parameters is not valid inside an unthreaded event handler");
228  }
229  if (transactionInProgress) {
230  if (!hasParameter(id)) {
231  throw ParameterException("Invalid parameter: " + std::string(id));
232  }
233  // Queue is thread_local
234  transactionQueuedWrites.push_back({std::string(id), std::to_string(value)});
235  } else {
236  // No transaction, immediate dispatch
237  writeParameter(id, value);
238  }
239 }
240 
241 template<typename T>
242 void ParameterTransfer::writeParameterTransactionUnguardedImpl(const char* id, const T& value) {
243  // No transaction, immediate dispatch
244  writeParameter(id, value, false);
245 }
246 
247 // Explicit instantiation for std::string
248 template<>
249 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const std::string& value) {
250  if (writingProhibited) {
251  throw ParameterException("Writing parameters is not valid inside an unthreaded event handler");
252  }
253  if (transactionInProgress) {
254  if (!hasParameter(id)) {
255  throw ParameterException("Invalid parameter: " + std::string(id));
256  }
257  // Queue is thread_local
258  transactionQueuedWrites.push_back({std::string(id), value});
259  } else {
260  // No transaction, immediate dispatch
261  writeParameter(id, value);
262  }
263 }
264 
265 template<>
266 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const double& value) {
267  writeParameterTransactionGuardedImpl<double>(id, value);
268 }
269 
270 template<>
271 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const int& value) {
272  writeParameterTransactionGuardedImpl<int>(id, value);
273 }
274 
275 template<>
276 void ParameterTransfer::writeParameterTransactionGuarded(const char* id, const bool& value) {
277  writeParameterTransactionGuardedImpl<bool>(id, value);
278 }
279 
280 template<>
281 void ParameterTransfer::writeParameterTransactionUnguarded(const char* id, const bool& value) {
282  writeParameterTransactionUnguardedImpl<bool>(id, value);
283 }
284 
286  waitNetworkReady();
287  if (networkError) {
288  // collecting deferred error from background thread
289  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
290  }
291  std::unique_lock<std::mutex> globalLock(mapMutex);
292  if (!paramSet.count(id)) {
293  throw ParameterException("Invalid parameter: " + std::string(id));
294  }
295  return paramSet[id].getCurrent<int>();
296 }
297 
299  waitNetworkReady();
300  if (networkError) {
301  // collecting deferred error from background thread
302  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
303  }
304  std::unique_lock<std::mutex> globalLock(mapMutex);
305  if (!paramSet.count(id)) {
306  throw ParameterException("Invalid parameter: " + std::string(id));
307  }
308  return paramSet[id].getCurrent<double>();
309 }
310 
312  waitNetworkReady();
313  if (networkError) {
314  // collecting deferred error from background thread
315  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
316  }
317  std::unique_lock<std::mutex> globalLock(mapMutex);
318  if (!paramSet.count(id)) {
319  throw ParameterException("Invalid parameter: " + std::string(id));
320  }
321  return paramSet[id].getCurrent<bool>();
322 }
323 
324 void ParameterTransfer::writeIntParameter(const char* id, int value) {
325  writeParameter(id, value);
326 }
327 
328 void ParameterTransfer::writeDoubleParameter(const char* id, double value) {
329  writeParameter(id, value);
330 }
331 
332 void ParameterTransfer::writeBoolParameter(const char* id, bool value) {
333  writeParameter(id, value);
334 }
335 
336 std::map<std::string, ParameterInfo> ParameterTransfer::getAllParameters() {
337  waitNetworkReady();
338  if (networkError) {
339  // collecting deferred error from background thread
340  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
341  }
342  std::map<std::string, ParameterInfo> compatMap;
343  {
344  std::unique_lock<std::mutex> globalLock(mapMutex);
345  for (auto kv: paramSet) {
346  auto& name = kv.first;
347  auto& param = kv.second;
348  bool writeable = param.getAccessForApi() == param::Parameter::ACCESS_READWRITE;
349  switch(param.getType()) {
350  case param::ParameterValue::TYPE_INT: {
351  int min = -1, max = -1, increment = -1;
352  if (param.hasRange()) {
353  min = param.getMin<int>();
354  max = param.getMax<int>();
355  }
356  if (param.hasIncrement()) {
357  increment = param.getIncrement<int>();
358  }
359  compatMap[name] = ParameterInfo::fromInt(name, writeable, param.getCurrent<int>(), min, max, increment);
360  break;
361  }
362  case param::ParameterValue::TYPE_DOUBLE: {
363  double min = -1, max = -1, increment = -1;
364  if (param.hasRange()) {
365  min = param.getMin<double>();
366  max = param.getMax<double>();
367  }
368  if (param.hasIncrement()) {
369  increment = param.getIncrement<double>();
370  }
371  compatMap[name] = ParameterInfo::fromDouble(name, writeable, param.getCurrent<double>(), min, max, increment);
372  break;
373  }
374  case param::ParameterValue::TYPE_BOOL: {
375  compatMap[name] = ParameterInfo::fromBool(name, writeable, param.getCurrent<bool>());
376  break;
377  }
378  default:
379  // Omit parameters with other types from legacy compatibility API
380  break;
381  }
382  }
383  }
384  return compatMap;
385 }
386 
387 void ParameterTransfer::receiverRoutine() {
388  auto refTime = std::chrono::steady_clock::now();
389  recvBufBytes = 0;
390  threadRunning = true;
391  (void) getThreadId(); // we just grab and reserve ID 0 for the receiver
392  while (threadRunning) {
393  if (socket == INVALID_SOCKET) {
394  // Error that is recoverable by reconnection (otherwise this thread would have been terminated)
395  try {
396  attemptConnection();
397  } catch(...) {
398  //std::cerr << "Failed to reconnect to parameter server." << std::endl;
399  // Sleep receiver thread and retry reconnection in next iteration
400  std::this_thread::sleep_for(std::chrono::milliseconds(SOCKET_RECONNECT_INTERVAL_MS));
401  }
402  } else {
403  // Regular connection state - handle incoming events and replies
404  int bytesReceived = recv(socket, recvBuf+recvBufBytes, (RECV_BUF_SIZE - recvBufBytes), 0);
405  if (bytesReceived < 0) {
406  auto err = Networking::getErrno();
407  if(err == EAGAIN || err == EWOULDBLOCK || err == ETIMEDOUT) {
408  // No event or reply - no problem
409  //std::this_thread::sleep_for(std::chrono::milliseconds(1));
410  continue;
411  } else {
412  //std::cerr << "Network error (will periodically attempt reconnection)." << std::endl;
413  std::unique_lock<std::mutex> localLock(socketModificationMutex);
414  Networking::closeSocket(socket);
415  socket = INVALID_SOCKET;
416  refTime = std::chrono::steady_clock::now();
417  networkReady = false;
418  recvBufBytes = 0;
419  networkError = true;
420  networkErrorString = std::string("Error receiving network packet: ") + Networking::getLastErrorString();
421  if (connectionStateChangeCallback) {
422  std::thread([&](){
423  std::lock_guard<std::mutex> callbackLock(callbackMutex);
424  connectionStateChangeCallback(ConnectionState::DISCONNECTED);
425  }).detach();
426  }
427  continue;
428  }
429  } else if (bytesReceived == 0) {
430  //std::cerr << "Connection closed by remote side (will periodically attempt reconnection)." << std::endl;
431  std::unique_lock<std::mutex> localLock(socketModificationMutex);
432  Networking::closeSocket(socket);
433  socket = INVALID_SOCKET;
434  refTime = std::chrono::steady_clock::now();
435  networkReady = false;
436  recvBufBytes = 0;
437  networkError = true;
438  networkErrorString = "Connection closed";
439  if (connectionStateChangeCallback) {
440  std::thread([&](){
441  std::lock_guard<std::mutex> callbackLock(callbackMutex);
442  connectionStateChangeCallback(ConnectionState::DISCONNECTED);
443  }).detach();
444  }
445  continue;
446  } else {
447  recvBufBytes += bytesReceived;
448  }
449  unsigned int start=0;
450  for (unsigned int i=0; i<recvBufBytes; ++i) {
451  unsigned char c = recvBuf[i];
452  if (c=='\n') {
453  std::string currentLine((const char*) recvBuf+start, i-start);
454  //std::cout << "PARAM RECV: " << currentLine << std::endl;
455  auto toks = tabTokenizer.tokenize(currentLine);
456  if (toks.size()>0) {
457  const std::string& cmd = toks[0];
458  if (cmd=="P") {
459  if (toks.size()>1) {
460  // Check of protocol version - old (non-nvparam) firmwares do not send a newline-terminated version, which will just time out waitNetworkReady()
461  long reportedVersionMajor = atol(toks[1].c_str());
462  if(reportedVersionMajor != static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)) {
463  // Unhandled / incompatible version
464  networkError = true;
465  networkErrorString = std::string("Protocol major version mismatch, expected ") + std::to_string(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR) + " but got " + toks[1];
466  threadRunning = false;
467  // Wake up the network wait (to propagate error quickly)
468  std::lock_guard<std::mutex> readyLock(readyMutex);
469  readyCond.notify_all();
470  break;
471  }
472  long reportedVersionMinor = -1; // = unreported, legacy version
473  if (toks.size()>2) {
474  // Minor version reported
475  reportedVersionMinor = atol(toks[2].c_str());
476  if (reportedVersionMinor > static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR)) {
477  std::cerr << "Caution: remote parameter protocol version " << reportedVersionMajor << "." << reportedVersionMinor
478  << " is newer than our version " <<static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)
479  << "." << static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR) << std::endl;
480  std::cerr << "Consider a library upgrade for maximum compatibility." << std::endl;
481  }
482  // Further toks fields are reserved for future extensions
483  }
484  if (reportedVersionMinor == -1) {
485  // Device is protocol 7.0, batch transactions added in 7.1 --> fallback: write parameters one-by-one
486  std::cerr << "Warning: remote firmware is out of date - parameter batch transaction support disabled." << std::endl;
487  featureDisabledTransactions = true;
488  } else {
489  // Device accepts full version description handshake, report ours
490  std::stringstream ss;
491  ss << "P" << "\t" << (unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR
492  << "\t" << (unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR << "\n";
493  {
494  std::unique_lock<std::mutex> localLock(socketModificationMutex);
495  if (socket == INVALID_SOCKET) {
496  throw TransferException("Connection has been closed and not reconnected so far");
497  }
498  size_t written = send(socket, ss.str().c_str(), (int) ss.str().size(), 0);
499  if(written != ss.str().size()) {
500  throw TransferException("Error sending protocol version handshake reply: " + Networking::getLastErrorString());
501  }
502  }
503  }
504  } else {
505  networkError = true;
506  networkErrorString = "Incomplete transfer of protocol version";
507  threadRunning = false;
508  break;
509  }
510  } else if (cmd=="I") {
511  // Full parameter info (value and metadata): add or overwrite local parameter
512  Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks);
513  auto uid = param.getUid();
514  bool alreadyPresent = paramSet.count(uid);
515  paramSet[uid] = param;
516  if (networkReady) {
517  if (alreadyPresent && parameterUpdateCallback) {
518  // Only call the user callback for metadata updates, but not for the initial enumeration
519  if (parameterUpdateCallbackThreaded) {
520  std::thread([&, uid](){
521  std::lock_guard<std::mutex> callbackLock(callbackMutex);
522  parameterUpdateCallback(uid);
523  }).detach();
524  } else {
525  std::lock_guard<std::mutex> callbackLock(callbackMutex);
526  writingProhibited = true; // thread_local
527  parameterUpdateCallback(uid);
528  writingProhibited = false;
529  }
530  }
531  }
532  } else if (cmd=="M") {
533  // Metadata-only update: overwrite an existing local parameter, but preserve its previous value.
534  if (paramSet.count(toks[1])) {
535  Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks, "M");
536  auto uid = param.getUid();
537  param.setCurrentFrom(paramSet[uid]);
538  paramSet[uid] = param;
539  if (networkReady) {
540  if (parameterUpdateCallback) {
541  if (parameterUpdateCallbackThreaded) {
542  std::thread([&, uid](){
543  std::lock_guard<std::mutex> callbackLock(callbackMutex);
544  parameterUpdateCallback(uid);
545  }).detach();
546  } else {
547  std::lock_guard<std::mutex> callbackLock(callbackMutex);
548  writingProhibited = true; // thread_local
549  parameterUpdateCallback(uid);
550  writingProhibited = false;
551  }
552  }
553  }
554  } else {
555  std::cerr << "Parameter not received yet - not updating metadata of: " << toks[1] << std::endl;;
556  }
557  } else if (cmd=="V") {
558  // Current-value-only update
559  if (toks.size() < 3) {
560  throw TransferException("Received malformed parameter value update");
561  }
562  if (paramSet.count(toks[1])) {
563  // In-place update
564  ParameterSerialization::deserializeParameterValueChange(toks, paramSet[toks[1]]);
565  std::string uid = toks[1];
566  if (networkReady) {
567  if (parameterUpdateCallback) {
568  if (parameterUpdateCallbackThreaded) {
569  std::thread([&, uid](){
570  std::lock_guard<std::mutex> callbackLock(callbackMutex);
571  parameterUpdateCallback(uid);
572  }).detach();
573  } else {
574  std::lock_guard<std::mutex> callbackLock(callbackMutex);
575  writingProhibited = true; // thread_local
576  parameterUpdateCallback(uid);
577  writingProhibited = false;
578  }
579  }
580  }
581  } else {
582  std::cerr << "Parameter not received yet - not updating value of: " << toks[1] << std::endl;;
583  }
584  } else if (cmd=="R") {
585  // Reply with asynchronous result of a request to set parameter[s]
586  if (toks.size() < 4) {
587  throw TransferException("Received malformed reply for parameter set request");
588  }
589  std::unique_lock<std::mutex> globalLock(mapMutex);
590  auto subToks = slashTokenizer.tokenize(toks[1]);
591  int replyThreadId = atol(subToks[0].c_str());
592  std::string unblockClass = ""; // filter by block class so we can ignore non-matching replies
593  if (subToks.size() > 1) {
594  unblockClass = subToks[1];
595  }
596  bool hasCond = waitConds.count(replyThreadId);
597  if (hasCond && (waitCondClasses[replyThreadId]==unblockClass)) {
598  // Reanimating the waiting thread - it will clean up after itself
599  std::lock_guard<std::mutex> localLock(waitCondMutexes[replyThreadId]);
600  lastSetRequestResult[replyThreadId] = {toks[2] == "1", toks[3]};
601  waitConds[replyThreadId].notify_all();
602  } else {
603  if (replyThreadId != -1) { // dummy ID -1 for fire-and-forget command (no reply expected)
604  std::cerr << "Ignoring unexpected request result " << toks[1] << " for thread " << replyThreadId << std::endl;
605  }
606  }
607  } else if (cmd=="E") {
608  // 'End of Transmission' - at least one full enumeration has arrived - we are ready
609  networkReady = true;
610  // Wake any sleeping threads that were blocked until network became ready
611  std::lock_guard<std::mutex> readyLock(readyMutex);
612  readyCond.notify_all();
613  // The actual ready state is the user-visible connected state
614  if (connectionStateChangeCallback) {
615  std::thread([&](){
616  std::lock_guard<std::mutex> callbackLock(callbackMutex);
617  connectionStateChangeCallback(ConnectionState::CONNECTED);
618  }).detach();
619  }
620  } else if (cmd=="HB") {
621  // Heartbeat
622  } else if (cmd=="X") {
623  // Reserved extension
624  } else {
625  networkError = true;
626  networkErrorString = std::string("Unknown update command received: ") + cmd;
627  threadRunning = false;
628  break;
629  }
630  }
631  start = i+1;
632  }
633  }
634  // Move any incomplete line to front of recv buffer
635  if (start>=recvBufBytes) {
636  recvBufBytes = 0;
637  } else {
638  std::memmove(recvBuf, recvBuf+start, recvBufBytes-start);
639  recvBufBytes = recvBufBytes-start;
640  }
641  }
642  }
643 }
644 
645 int ParameterTransfer::getThreadId() {
646  // Always returns an int type (which may not be the case for std::thread::id)
647  static std::atomic_int threadCount{0};
648  thread_local int threadId = threadCount.fetch_add(1);
649  return threadId;
650 }
651 
652 void ParameterTransfer::blockingCallThisThread(std::function<void()> fn, int waitMaxMilliseconds, const std::string& waitClass) {
653  bool timeout = false;
654  auto tid = getThreadId();
655  {
656  std::unique_lock<std::mutex> globalLock(mapMutex);
657  // Populate maps
658  auto& localWaitCond = waitConds[tid];
659  auto& localWaitCondMutex = waitCondMutexes[tid];
660  waitCondClasses[tid] = waitClass;
661  std::unique_lock<std::mutex> localLock(localWaitCondMutex);
662  // First do the actual handshake setup, like emitting the network message
663  // (The current thread is protected against a reply race at this point)
664  fn();
665  // Allow receiver thread to access its checks (it is still blocked by our specific localLock)
666  globalLock.unlock();
667  // Wait for receiver thread to notify us with the reply
668  auto status = localWaitCond.wait_for(localLock, std::chrono::milliseconds(waitMaxMilliseconds));
669  timeout = (status == std::cv_status::timeout);
670  }
671  {
672  // Cleanup, so that any spurious network replies can get detected and discarded
673  std::unique_lock<std::mutex> globalLock(mapMutex);
674  waitConds.erase(tid);
675  waitCondMutexes.erase(tid);
676  waitCondClasses.erase(tid);
677  }
678  // Outcome
679  if (timeout) {
680  TimeoutException ex("Timeout waiting for request reply from parameter server");
681  throw ex;
682  }
683 }
684 
686  waitNetworkReady();
687  if (networkError) {
688  // collecting deferred error from background thread
689  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
690  }
691  return paramSet;
692 }
693 
695  waitNetworkReady();
696  if (networkError) {
697  // collecting deferred error from background thread
698  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
699  }
700  return paramSet;
701 }
702 
703 bool ParameterTransfer::hasParameter(const std::string& name) const {
704  waitNetworkReady();
705  if (networkError) {
706  // collecting deferred error from background thread
707  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
708  }
709 
710  std::lock_guard<std::mutex> mapLock(mapMutex);
711  return paramSet.count(name) != 0;
712 }
713 
714 Parameter ParameterTransfer::getParameter(const std::string& name) const {
715  waitNetworkReady();
716  if (networkError) {
717  // collecting deferred error from background thread
718  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
719  }
720 
721  std::lock_guard<std::mutex> mapLock(mapMutex);
722  if (paramSet.count(name)) {
723  return paramSet.at(name);
724  } else {
725  throw ParameterException(std::string("Invalid or inaccessible parameter name: ") + name);
726  }
727 }
728 
729 void ParameterTransfer::setParameterUpdateCallback(std::function<void(const std::string& uid)> callback, bool threaded) {
730  std::lock_guard<std::mutex> callbackLock(callbackMutex);
731  parameterUpdateCallbackThreaded = threaded;
732  parameterUpdateCallback = callback;
733 }
734 
736  // N.B. the flag is thread_local static
737  if (featureDisabledTransactions) {
738  // Fallback mode for outdated firmware versions -> ignore transaction batching
739  return;
740  }
741  if (transactionInProgress) throw TransferException("Simultaneous and/or nested parameter transactions are not supported");
742  transactionInProgress = true;
743  // We are now in batch-write mode
744 }
745 
746 void ParameterTransfer::transactionCommitQueue(int maxWaitMilliseconds) {
747  static int nextTransactionId = 0;
748  if (featureDisabledTransactions) {
749  // Fallback mode for outdated firmware versions -> ignore transaction batching
750  return;
751  }
752 
753  if (!transactionInProgress) return; // Already released
754 
755  if (std::uncaught_exceptions() > 0) {
756  // Transaction is NOT finalized during exception unwind
757  transactionInProgress = false; // and cannot retry
758  return;
759  }
760 
761  // Send queued parameter transactions
762  try {
763  waitNetworkReady();
764  if (networkError) {
765  // collecting deferred error from background thread
766  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
767  }
768 
769  // If there are no actual changes, do not send anything
770  if (transactionQueuedWrites.size() > 0) {
771  // Collect affected UIDs for transaction
772  std::set<std::string> affectedUids;
773  for (auto& kv: transactionQueuedWrites) {
774  affectedUids.insert(kv.first);
775  }
776 
777  // Start transaction on server, incorporating all affected UIDs
778  std::string uniqueTransactionId = std::to_string(nextTransactionId++);
779  std::string transactionId;
780  if (maxWaitMilliseconds > 0) {
781  transactionId = std::to_string(getThreadId()) + "/" + uniqueTransactionId; // use unique ID as unblock filter
782  } else {
783  // Marked as fire-and-forget (ignore later committed message)
784  transactionId = std::to_string(-1) + "/" + uniqueTransactionId; // use unique ID as unblock filter
785  }
786  std::stringstream ss;
787  ss << "TS" << "\t" << transactionId << "\t";
788  bool first = true;
789  for (auto& uid: affectedUids) {
790  if (first) first=false; else ss << ",";
791  ss << uid;
792  }
793  ss << "\n";
794  sendNetworkCommand(ss.str(), "transaction start");
795 
796  // Play back queued writes
797  for (auto& kv: transactionQueuedWrites) {
798  auto& uid = kv.first;
799  auto& value = kv.second;
800  writeParameter(uid.c_str(), value);
801  }
802 
803  // Finish transaction on server - automatic updates are then applied (and resumed).
804  // The transaction will be finalized anyway on the server, but only after a timeout.
805  std::stringstream ssEnd;
806  ssEnd << "TE" << "\t" << transactionId << "\n";
807 
808  // Block for the returning 'completed' signal, if requested
809  if (maxWaitMilliseconds > 0) {
810  try {
811  blockingCallThisThread([this, &ssEnd](){
812  sendNetworkCommand(ssEnd.str(), "transaction end");
813  }, maxWaitMilliseconds, uniqueTransactionId); // timeout and unblock class
814  } catch(...) {
815  transactionQueuedWrites.clear();
816  transactionInProgress = false; // May not retry
817  throw;
818  }
819  auto result = lastSetRequestResult[getThreadId()];
820  if (result.first == false) {
821  // There was a remote error, append its info to the exception
822  throw ParameterException("Remote transaction error: " + result.second);
823  }
824  } else {
825  sendNetworkCommand(ssEnd.str(), "transaction end");
826  }
827 
828  // Cleanup
829  transactionQueuedWrites.clear();
830  }
831  } catch(...) {
832  transactionInProgress = false; // May not retry
833  throw;
834  }
835 
836  transactionInProgress = false;
837 }
838 
839 void ParameterTransfer::persistParameters(const std::vector<std::string>& uids, bool synchronous) {
840  if (writingProhibited) {
841  throw ParameterException("Saving parameters is not valid inside an unthreaded event handler");
842  }
843 
844  if (transactionInProgress) {
845  throw TransferException("Saving parameters is invalid with an open transaction");
846  }
847 
848  waitNetworkReady();
849  if (networkError) {
850  // collecting deferred error from background thread
851  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
852  }
853 
854  std::unique_lock<std::mutex> globalLock(mapMutex);
855 
856  std::stringstream ss;
857  ss << "p" << "\t" << (synchronous ? getThreadId() : -1) << "\t"; // "p" -> persist request
858  bool first = true;
859  for (auto& id: uids) {
860  if (first) {
861  first=false;
862  } else {
863  ss << ",";
864  }
865  ss << id;
866  if (!paramSet.count(id)) {
867  throw ParameterException("Invalid parameter: " + std::string(id));
868  }
869  }
870  ss << "\n";
871 
872  if (synchronous) {
873  blockingCallThisThread([this, &ss](){
874  sendNetworkCommand(ss.str(), "parameter persist");
875  });
876  auto result = lastSetRequestResult[getThreadId()];
877  if (result.first == false) {
878  // There was a remote error, append its info to the exception
879  throw ParameterException("Remote parameter error: " + result.second);
880  }
881  } else {
882  // 'Fire and forget' immediate-return mode, e.g. for sending a trigger
883  sendNetworkCommand(ss.str(), "parameter persist");
884  }
885 }
886 
887 Parameter ParameterTransfer::pollParameter(const std::string& uid, bool synchronous) {
888  if (writingProhibited) {
889  throw ParameterException("Polling parameters is not valid inside an unthreaded event handler");
890  }
891 
892  if (transactionInProgress) {
893  throw TransferException("Polling parameters is invalid within an open transaction");
894  }
895 
896  waitNetworkReady();
897  if (networkError) {
898  // collecting deferred error from background thread
899  throw TransferException("ParameterTransfer currently not operational: " + networkErrorString);
900  }
901  if (!hasParameter(uid)) {
902  throw ParameterException("Invalid parameter: " + std::string(uid));
903  }
904 
905  std::stringstream ss;
906  // cmd "O" -> pOll for update
907  ss << "O" << "\t" << (synchronous ? getThreadId() : -1) << "\t" << uid << "\n";
908 
909  if (synchronous) {
910  blockingCallThisThread([this, &ss](){
911  sendNetworkCommand(ss.str(), "parameter poll");
912  });
913  auto result = lastSetRequestResult[getThreadId()];
914  if (result.first == false) {
915  // There was a remote error, append its info to the exception
916  throw ParameterException("Remote parameter error: " + result.second);
917  }
918  } else {
919  // 'Fire and forget' immediate-return mode (force update, do not wait for new value)
920  sendNetworkCommand(ss.str(), "parameter poll");
921  }
922 
923  return getParameter(uid);
924 }
925 
926 void ParameterTransfer::setConnectionStateChangeCallback(std::function<void(visiontransfer::ConnectionState)> callback) {
927  std::lock_guard<std::mutex> callbackLock(callbackMutex);
928  connectionStateChangeCallback = callback;
929 }
930 
931 }} // namespace
932 
visiontransfer::internal::ParameterTransfer::isConnected
bool isConnected() const
Returns whether the background connection is currently up and running (it may be temporarily false du...
Definition: parametertransfer.cpp:65
visiontransfer::internal::ParameterTransfer::writeDoubleParameter
void writeDoubleParameter(const char *id, double value)
Writes a double precision floating point value to a parameter of the parameter server.
Definition: parametertransfer.cpp:328
visiontransfer::internal::ParameterTransfer::writeIntParameter
void writeIntParameter(const char *id, int value)
Writes an integer value to a parameter of the parameter server.
Definition: parametertransfer.cpp:324
visiontransfer::TimeoutException
Exception class that indicates an unexpected timeout of a request.
Definition: exceptions.h:69
visiontransfer::internal::ParameterTransfer::writeParameter
void writeParameter(const char *id, const T &value, bool synchronous=true)
Writes a scalar value to a parameter of the parameter server.
Definition: parametertransfer.cpp:133
visiontransfer::internal::ParameterTransfer::getParameterSet
param::ParameterSet & getParameterSet()
Returns a reference to the internal parameter set (once the network handshake is complete)
Definition: parametertransfer.cpp:685
visiontransfer::param::ParameterSet::getCurrent
VT_EXPORT T getCurrent(const std::string &key)
Convenience function for safe bulk parameter access (throws for invalid UIDs). Will return any defaul...
Definition: parameterset.h:95
visiontransfer::param::Parameter::setCurrentFrom
VT_EXPORT Parameter & setCurrentFrom(const Parameter &from)
Definition: parameter.cpp:930
visiontransfer::internal::ParameterTransfer::hasParameter
bool hasParameter(const std::string &uid) const
Returns true if the matching parameter UID is present in the parameter set.
Definition: parametertransfer.cpp:703
visiontransfer::param::ParameterSet
Definition: parameterset.h:59
visiontransfer::internal::ParameterTransfer::persistParameters
void persistParameters(const std::vector< std::string > &uids, bool synchronous=true)
Requests to save the current values for the specified parameter UIDs to permanent storage.
Definition: parametertransfer.cpp:839
visiontransfer::internal::ParameterTransfer::pollParameter
param::Parameter pollParameter(const std::string &uid, bool blockingCall)
Returns a disconnected copy of the matching parameter from the parameter set after polling for its cu...
Definition: parametertransfer.cpp:887
visiontransfer::TransferException
Exception class that is used for all transfer exceptions.
Definition: exceptions.h:45
visiontransfer::param::Parameter
Definition: parameter.h:71
visiontransfer::internal::ParameterTransfer::writeParameterTransactionUnguarded
void writeParameterTransactionUnguarded(const char *id, const T &value)
Writes a scalar value to a parameter of the parameter server, using 'fire-and-forget' for real-time c...
visiontransfer::internal::ParameterTransfer::getAllParameters
std::map< std::string, ParameterInfo > getAllParameters()
Enumerates all parameters as reported by the device.
Definition: parametertransfer.cpp:336
visiontransfer::internal::ParameterTransfer::setConnectionStateChangeCallback
void setConnectionStateChangeCallback(std::function< void(visiontransfer::ConnectionState)> callback)
Sets the callback function to inform of background disconnection / reconnection.
Definition: parametertransfer.cpp:926
visiontransfer::ParameterException
Exception class that is used for all parameter-related exceptions.
Definition: exceptions.h:61
visiontransfer::internal::ParameterTransfer::readBoolParameter
bool readBoolParameter(const char *id)
Reads a boolean value from the parameter server.
Definition: parametertransfer.cpp:311
visiontransfer::internal::ParameterTransfer::transactionStartQueue
void transactionStartQueue()
Start batch parameter transaction.
Definition: parametertransfer.cpp:735
visiontransfer::internal::ParameterTransfer::writeParameterTransactionGuarded
void writeParameterTransactionGuarded(const char *id, const T &value)
Writes a scalar value to a parameter of the parameter server, transparently deferring for a batch upd...
visiontransfer::param::Parameter::getUid
VT_EXPORT std::string getUid() const
Definition: parameter.cpp:734
visiontransfer::internal::ParameterTransfer::readDoubleParameter
double readDoubleParameter(const char *id)
Reads a double precision floating point value from the parameter server.
Definition: parametertransfer.cpp:298
visiontransfer::internal::ParameterTransfer::writeBoolParameter
void writeBoolParameter(const char *id, bool value)
Writes a boolean value to a parameter of the parameter server.
Definition: parametertransfer.cpp:332
visiontransfer::internal::ParameterTransfer::readIntParameter
int readIntParameter(const char *id)
Reads an integer value from the parameter server.
Definition: parametertransfer.cpp:285
visiontransfer::internal::ParameterTransfer::transactionCommitQueue
void transactionCommitQueue(int maxWaitMilliseconds)
Complete the started parameter transaction.
Definition: parametertransfer.cpp:746
visiontransfer::internal::ParameterTransfer::getParameter
param::Parameter getParameter(const std::string &uid) const
Returns a disconnected copy of the matching parameter from the parameter set; exception if not found.
Definition: parametertransfer.cpp:714
Allied Vision