17 #include "visiontransfer/exceptions.h"
18 #include "visiontransfer/internal/parametertransfer.h"
19 #include "visiontransfer/internal/internalinformation.h"
20 #include "visiontransfer/internal/parametertransferdata.h"
21 #include "visiontransfer/internal/parameterserialization.h"
30 using namespace visiontransfer;
31 using namespace visiontransfer::internal;
32 using namespace visiontransfer::param;
34 namespace visiontransfer {
37 constexpr
int ParameterTransfer::SOCKET_TIMEOUT_MS;
39 thread_local
bool ParameterTransfer::transactionInProgress =
false;
40 thread_local std::vector<std::pair<std::string, std::string> > ParameterTransfer::transactionQueuedWrites = {};
41 thread_local
bool ParameterTransfer::writingProhibited =
false;
43 ParameterTransfer::ParameterTransfer(
const char* address,
const char* service)
44 : socket(INVALID_SOCKET), address(address), service(service), networkReady(false), featureDisabledTransactions(false) {
46 tabTokenizer.collapse(
false).separators({
"\t"});
47 spaceTokenizer.collapse(
true).separators({
" "});
48 slashTokenizer.collapse(
false).separators({
"/"});
50 Networking::initNetworking();
54 ParameterTransfer::~ParameterTransfer() {
55 threadRunning =
false;
56 if (receiverThread->joinable()) {
57 receiverThread->join();
60 if(socket != INVALID_SOCKET) {
61 Networking::closeSocket(socket);
72 void ParameterTransfer::attemptConnection() {
73 std::unique_lock<std::mutex> localLock(socketModificationMutex);
75 addrinfo* addressInfo = Networking::resolveAddress(address.c_str(), service.c_str());
77 socket = Networking::connectTcpSocket(addressInfo);
78 Networking::setSocketTimeout(socket, SOCKET_TIMEOUT_MS);
80 if (!receiverThread) {
81 receiverThread = std::make_shared<std::thread>(std::bind(&ParameterTransfer::receiverRoutine,
this));
87 size_t written = send(socket,
"A\n", 2, 0);
89 Networking::closeSocket(socket);
90 socket = INVALID_SOCKET;
93 TransferException ex(
"Error sending GetAllParameter request: " + Networking::getLastErrorString());
97 freeaddrinfo(addressInfo);
100 void ParameterTransfer::waitNetworkReady()
const {
103 std::unique_lock<std::mutex> readyLock(readyMutex);
104 auto status = readyCond.wait_for(readyLock, std::chrono::milliseconds(3000));
105 if (status == std::cv_status::timeout) {
111 void ParameterTransfer::readParameter(
unsigned char messageType,
const char*
id,
unsigned char* dest,
int length) {
115 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
118 for (
int i=0; i<length; ++i) { dest[i] =
'\0'; }
121 void ParameterTransfer::sendNetworkCommand(
const std::string& cmdline,
const std::string& diagStr) {
122 std::unique_lock<std::mutex> localLock(socketModificationMutex);
123 if (socket == INVALID_SOCKET) {
124 throw TransferException(
"Connection has been closed and not reconnected so far");
126 size_t written = send(socket, cmdline.c_str(), (
int) cmdline.size(), 0);
127 if(written != cmdline.size()) {
128 throw TransferException(
"Error sending "+diagStr+
" request: " + Networking::getLastErrorString());
137 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
145 std::stringstream ss;
146 ss <<
"S" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t" <<
id <<
"\t" << value <<
"\n";
149 blockingCallThisThread([
this, &ss](){
150 sendNetworkCommand(ss.str(),
"parameter set");
152 auto result = lastSetRequestResult[getThreadId()];
153 if (result.first ==
false) {
160 std::unique_lock<std::mutex> globalLock(mapMutex);
161 auto& param = paramSet[id];
162 if (param.isScalar()) {
163 param.setCurrent<T>(value);
166 auto toks = spaceTokenizer.tokenize(std::to_string(value));
167 std::vector<double> vs;
168 for (
auto& t: toks) vs.push_back(atof(t.c_str()));
169 param.setTensorData(vs);
174 sendNetworkCommand(ss.str(),
"parameter set");
184 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
192 std::stringstream ss;
193 ss <<
"S" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t" << id <<
"\t" << value <<
"\n";
196 blockingCallThisThread([
this, &
id, &value, &ss](){
197 sendNetworkCommand(ss.str(),
"parameter set");
199 auto result = lastSetRequestResult[getThreadId()];
200 if (result.first ==
false) {
207 std::unique_lock<std::mutex> globalLock(mapMutex);
208 auto& param = paramSet[id];
209 if (param.isScalar()) {
210 param.setCurrent<std::string>(value);
212 auto toks = spaceTokenizer.tokenize(value);
213 std::vector<double> vs;
214 for (
auto& t: toks) vs.push_back(atof(t.c_str()));
215 param.setTensorData(vs);
220 sendNetworkCommand(ss.str(),
"parameter set");
225 void ParameterTransfer::writeParameterTransactionGuardedImpl(
const char*
id,
const T& value) {
226 if (writingProhibited) {
227 throw ParameterException(
"Writing parameters is not valid inside an unthreaded event handler");
229 if (transactionInProgress) {
234 transactionQueuedWrites.push_back({std::string(
id), std::to_string(value)});
242 void ParameterTransfer::writeParameterTransactionUnguardedImpl(
const char*
id,
const T& value) {
250 if (writingProhibited) {
251 throw ParameterException(
"Writing parameters is not valid inside an unthreaded event handler");
253 if (transactionInProgress) {
258 transactionQueuedWrites.push_back({std::string(
id), value});
267 writeParameterTransactionGuardedImpl<double>(
id, value);
272 writeParameterTransactionGuardedImpl<int>(
id, value);
277 writeParameterTransactionGuardedImpl<bool>(
id, value);
282 writeParameterTransactionUnguardedImpl<bool>(
id, value);
289 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
291 std::unique_lock<std::mutex> globalLock(mapMutex);
292 if (!paramSet.count(
id)) {
302 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
304 std::unique_lock<std::mutex> globalLock(mapMutex);
305 if (!paramSet.count(
id)) {
315 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
317 std::unique_lock<std::mutex> globalLock(mapMutex);
318 if (!paramSet.count(
id)) {
340 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
342 std::map<std::string, ParameterInfo> compatMap;
344 std::unique_lock<std::mutex> globalLock(mapMutex);
345 for (
auto kv: paramSet) {
346 auto& name = kv.first;
347 auto& param = kv.second;
348 bool writeable = param.getAccessForApi() == param::Parameter::ACCESS_READWRITE;
349 switch(param.getType()) {
350 case param::ParameterValue::TYPE_INT: {
351 int min = -1, max = -1, increment = -1;
352 if (param.hasRange()) {
353 min = param.getMin<
int>();
354 max = param.getMax<
int>();
356 if (param.hasIncrement()) {
357 increment = param.getIncrement<
int>();
359 compatMap[name] = ParameterInfo::fromInt(name, writeable, param.getCurrent<
int>(), min, max, increment);
362 case param::ParameterValue::TYPE_DOUBLE: {
363 double min = -1, max = -1, increment = -1;
364 if (param.hasRange()) {
365 min = param.getMin<
double>();
366 max = param.getMax<
double>();
368 if (param.hasIncrement()) {
369 increment = param.getIncrement<
double>();
371 compatMap[name] = ParameterInfo::fromDouble(name, writeable, param.getCurrent<
double>(), min, max, increment);
374 case param::ParameterValue::TYPE_BOOL: {
375 compatMap[name] = ParameterInfo::fromBool(name, writeable, param.getCurrent<
bool>());
387 void ParameterTransfer::receiverRoutine() {
388 auto refTime = std::chrono::steady_clock::now();
390 threadRunning =
true;
391 (void) getThreadId();
392 while (threadRunning) {
393 if (socket == INVALID_SOCKET) {
400 std::this_thread::sleep_for(std::chrono::milliseconds(SOCKET_RECONNECT_INTERVAL_MS));
404 int bytesReceived = recv(socket, recvBuf+recvBufBytes, (RECV_BUF_SIZE - recvBufBytes), 0);
405 if (bytesReceived < 0) {
406 auto err = Networking::getErrno();
407 if(err == EAGAIN || err == EWOULDBLOCK || err == ETIMEDOUT) {
413 std::unique_lock<std::mutex> localLock(socketModificationMutex);
414 Networking::closeSocket(socket);
415 socket = INVALID_SOCKET;
416 refTime = std::chrono::steady_clock::now();
417 networkReady =
false;
420 networkErrorString = std::string(
"Error receiving network packet: ") + Networking::getLastErrorString();
421 if (connectionStateChangeCallback) {
423 std::lock_guard<std::mutex> callbackLock(callbackMutex);
424 connectionStateChangeCallback(ConnectionState::DISCONNECTED);
429 }
else if (bytesReceived == 0) {
431 std::unique_lock<std::mutex> localLock(socketModificationMutex);
432 Networking::closeSocket(socket);
433 socket = INVALID_SOCKET;
434 refTime = std::chrono::steady_clock::now();
435 networkReady =
false;
438 networkErrorString =
"Connection closed";
439 if (connectionStateChangeCallback) {
441 std::lock_guard<std::mutex> callbackLock(callbackMutex);
442 connectionStateChangeCallback(ConnectionState::DISCONNECTED);
447 recvBufBytes += bytesReceived;
449 unsigned int start=0;
450 for (
unsigned int i=0; i<recvBufBytes; ++i) {
451 unsigned char c = recvBuf[i];
453 std::string currentLine((
const char*) recvBuf+start, i-start);
455 auto toks = tabTokenizer.tokenize(currentLine);
457 const std::string& cmd = toks[0];
461 long reportedVersionMajor = atol(toks[1].c_str());
462 if(reportedVersionMajor !=
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)) {
465 networkErrorString = std::string(
"Protocol major version mismatch, expected ") + std::to_string(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR) +
" but got " + toks[1];
466 threadRunning =
false;
468 std::lock_guard<std::mutex> readyLock(readyMutex);
469 readyCond.notify_all();
472 long reportedVersionMinor = -1;
475 reportedVersionMinor = atol(toks[2].c_str());
476 if (reportedVersionMinor >
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR)) {
477 std::cerr <<
"Caution: remote parameter protocol version " << reportedVersionMajor <<
"." << reportedVersionMinor
478 <<
" is newer than our version " <<
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR)
479 <<
"." <<
static_cast<unsigned int>(InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR) << std::endl;
480 std::cerr <<
"Consider a library upgrade for maximum compatibility." << std::endl;
484 if (reportedVersionMinor == -1) {
486 std::cerr <<
"Warning: remote firmware is out of date - parameter batch transaction support disabled." << std::endl;
487 featureDisabledTransactions =
true;
490 std::stringstream ss;
491 ss <<
"P" <<
"\t" << (
unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MAJOR
492 <<
"\t" << (
unsigned int) visiontransfer::internal::InternalInformation::CURRENT_PARAMETER_PROTOCOL_VERSION_MINOR <<
"\n";
494 std::unique_lock<std::mutex> localLock(socketModificationMutex);
495 if (socket == INVALID_SOCKET) {
496 throw TransferException(
"Connection has been closed and not reconnected so far");
498 size_t written = send(socket, ss.str().c_str(), (
int) ss.str().size(), 0);
499 if(written != ss.str().size()) {
500 throw TransferException(
"Error sending protocol version handshake reply: " + Networking::getLastErrorString());
506 networkErrorString =
"Incomplete transfer of protocol version";
507 threadRunning =
false;
510 }
else if (cmd==
"I") {
512 Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks);
513 auto uid = param.
getUid();
514 bool alreadyPresent = paramSet.count(uid);
515 paramSet[uid] = param;
517 if (alreadyPresent && parameterUpdateCallback) {
519 if (parameterUpdateCallbackThreaded) {
520 std::thread([&, uid](){
521 std::lock_guard<std::mutex> callbackLock(callbackMutex);
522 parameterUpdateCallback(uid);
525 std::lock_guard<std::mutex> callbackLock(callbackMutex);
526 writingProhibited =
true;
527 parameterUpdateCallback(uid);
528 writingProhibited =
false;
532 }
else if (cmd==
"M") {
534 if (paramSet.count(toks[1])) {
535 Parameter param = ParameterSerialization::deserializeParameterFullUpdate(toks,
"M");
536 auto uid = param.
getUid();
538 paramSet[uid] = param;
540 if (parameterUpdateCallback) {
541 if (parameterUpdateCallbackThreaded) {
542 std::thread([&, uid](){
543 std::lock_guard<std::mutex> callbackLock(callbackMutex);
544 parameterUpdateCallback(uid);
547 std::lock_guard<std::mutex> callbackLock(callbackMutex);
548 writingProhibited =
true;
549 parameterUpdateCallback(uid);
550 writingProhibited =
false;
555 std::cerr <<
"Parameter not received yet - not updating metadata of: " << toks[1] << std::endl;;
557 }
else if (cmd==
"V") {
559 if (toks.size() < 3) {
562 if (paramSet.count(toks[1])) {
564 ParameterSerialization::deserializeParameterValueChange(toks, paramSet[toks[1]]);
565 std::string uid = toks[1];
567 if (parameterUpdateCallback) {
568 if (parameterUpdateCallbackThreaded) {
569 std::thread([&, uid](){
570 std::lock_guard<std::mutex> callbackLock(callbackMutex);
571 parameterUpdateCallback(uid);
574 std::lock_guard<std::mutex> callbackLock(callbackMutex);
575 writingProhibited =
true;
576 parameterUpdateCallback(uid);
577 writingProhibited =
false;
582 std::cerr <<
"Parameter not received yet - not updating value of: " << toks[1] << std::endl;;
584 }
else if (cmd==
"R") {
586 if (toks.size() < 4) {
589 std::unique_lock<std::mutex> globalLock(mapMutex);
590 auto subToks = slashTokenizer.tokenize(toks[1]);
591 int replyThreadId = atol(subToks[0].c_str());
592 std::string unblockClass =
"";
593 if (subToks.size() > 1) {
594 unblockClass = subToks[1];
596 bool hasCond = waitConds.count(replyThreadId);
597 if (hasCond && (waitCondClasses[replyThreadId]==unblockClass)) {
599 std::lock_guard<std::mutex> localLock(waitCondMutexes[replyThreadId]);
600 lastSetRequestResult[replyThreadId] = {toks[2] ==
"1", toks[3]};
601 waitConds[replyThreadId].notify_all();
603 if (replyThreadId != -1) {
604 std::cerr <<
"Ignoring unexpected request result " << toks[1] <<
" for thread " << replyThreadId << std::endl;
607 }
else if (cmd==
"E") {
611 std::lock_guard<std::mutex> readyLock(readyMutex);
612 readyCond.notify_all();
614 if (connectionStateChangeCallback) {
616 std::lock_guard<std::mutex> callbackLock(callbackMutex);
617 connectionStateChangeCallback(ConnectionState::CONNECTED);
620 }
else if (cmd==
"HB") {
622 }
else if (cmd==
"X") {
626 networkErrorString = std::string(
"Unknown update command received: ") + cmd;
627 threadRunning =
false;
635 if (start>=recvBufBytes) {
638 std::memmove(recvBuf, recvBuf+start, recvBufBytes-start);
639 recvBufBytes = recvBufBytes-start;
645 int ParameterTransfer::getThreadId() {
647 static std::atomic_int threadCount{0};
648 thread_local
int threadId = threadCount.fetch_add(1);
652 void ParameterTransfer::blockingCallThisThread(std::function<
void()> fn,
int waitMaxMilliseconds,
const std::string& waitClass) {
653 bool timeout =
false;
654 auto tid = getThreadId();
656 std::unique_lock<std::mutex> globalLock(mapMutex);
658 auto& localWaitCond = waitConds[tid];
659 auto& localWaitCondMutex = waitCondMutexes[tid];
660 waitCondClasses[tid] = waitClass;
661 std::unique_lock<std::mutex> localLock(localWaitCondMutex);
668 auto status = localWaitCond.wait_for(localLock, std::chrono::milliseconds(waitMaxMilliseconds));
669 timeout = (status == std::cv_status::timeout);
673 std::unique_lock<std::mutex> globalLock(mapMutex);
674 waitConds.erase(tid);
675 waitCondMutexes.erase(tid);
676 waitCondClasses.erase(tid);
680 TimeoutException ex(
"Timeout waiting for request reply from parameter server");
689 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
698 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
707 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
710 std::lock_guard<std::mutex> mapLock(mapMutex);
711 return paramSet.count(name) != 0;
718 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
721 std::lock_guard<std::mutex> mapLock(mapMutex);
722 if (paramSet.count(name)) {
723 return paramSet.at(name);
725 throw ParameterException(std::string(
"Invalid or inaccessible parameter name: ") + name);
729 void ParameterTransfer::setParameterUpdateCallback(std::function<
void(
const std::string& uid)> callback,
bool threaded) {
730 std::lock_guard<std::mutex> callbackLock(callbackMutex);
731 parameterUpdateCallbackThreaded = threaded;
732 parameterUpdateCallback = callback;
737 if (featureDisabledTransactions) {
741 if (transactionInProgress)
throw TransferException(
"Simultaneous and/or nested parameter transactions are not supported");
742 transactionInProgress =
true;
747 static int nextTransactionId = 0;
748 if (featureDisabledTransactions) {
753 if (!transactionInProgress)
return;
755 if (std::uncaught_exceptions() > 0) {
757 transactionInProgress =
false;
766 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
770 if (transactionQueuedWrites.size() > 0) {
772 std::set<std::string> affectedUids;
773 for (
auto& kv: transactionQueuedWrites) {
774 affectedUids.insert(kv.first);
778 std::string uniqueTransactionId = std::to_string(nextTransactionId++);
779 std::string transactionId;
780 if (maxWaitMilliseconds > 0) {
781 transactionId = std::to_string(getThreadId()) +
"/" + uniqueTransactionId;
784 transactionId = std::to_string(-1) +
"/" + uniqueTransactionId;
786 std::stringstream ss;
787 ss <<
"TS" <<
"\t" << transactionId <<
"\t";
789 for (
auto& uid: affectedUids) {
790 if (first) first=
false;
else ss <<
",";
794 sendNetworkCommand(ss.str(),
"transaction start");
797 for (
auto& kv: transactionQueuedWrites) {
798 auto& uid = kv.first;
799 auto& value = kv.second;
805 std::stringstream ssEnd;
806 ssEnd <<
"TE" <<
"\t" << transactionId <<
"\n";
809 if (maxWaitMilliseconds > 0) {
811 blockingCallThisThread([
this, &ssEnd](){
812 sendNetworkCommand(ssEnd.str(),
"transaction end");
813 }, maxWaitMilliseconds, uniqueTransactionId);
815 transactionQueuedWrites.clear();
816 transactionInProgress =
false;
819 auto result = lastSetRequestResult[getThreadId()];
820 if (result.first ==
false) {
825 sendNetworkCommand(ssEnd.str(),
"transaction end");
829 transactionQueuedWrites.clear();
832 transactionInProgress =
false;
836 transactionInProgress =
false;
840 if (writingProhibited) {
841 throw ParameterException(
"Saving parameters is not valid inside an unthreaded event handler");
844 if (transactionInProgress) {
845 throw TransferException(
"Saving parameters is invalid with an open transaction");
851 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
854 std::unique_lock<std::mutex> globalLock(mapMutex);
856 std::stringstream ss;
857 ss <<
"p" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t";
859 for (
auto&
id: uids) {
866 if (!paramSet.count(
id)) {
873 blockingCallThisThread([
this, &ss](){
874 sendNetworkCommand(ss.str(),
"parameter persist");
876 auto result = lastSetRequestResult[getThreadId()];
877 if (result.first ==
false) {
883 sendNetworkCommand(ss.str(),
"parameter persist");
888 if (writingProhibited) {
889 throw ParameterException(
"Polling parameters is not valid inside an unthreaded event handler");
892 if (transactionInProgress) {
893 throw TransferException(
"Polling parameters is invalid within an open transaction");
899 throw TransferException(
"ParameterTransfer currently not operational: " + networkErrorString);
905 std::stringstream ss;
907 ss <<
"O" <<
"\t" << (synchronous ? getThreadId() : -1) <<
"\t" << uid <<
"\n";
910 blockingCallThisThread([
this, &ss](){
911 sendNetworkCommand(ss.str(),
"parameter poll");
913 auto result = lastSetRequestResult[getThreadId()];
914 if (result.first ==
false) {
920 sendNetworkCommand(ss.str(),
"parameter poll");
927 std::lock_guard<std::mutex> callbackLock(callbackMutex);
928 connectionStateChangeCallback = callback;