libvisiontransfer  10.8.0
datachannelservicebase.cpp
1 /*******************************************************************************
2  * Copyright (c) 2024 Allied Vision Technologies GmbH
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *******************************************************************************/
14 
15 #include <sys/types.h>
16 #include <cstring>
17 #include <stdexcept>
18 #include <fcntl.h>
19 #include <fstream>
20 
21 #include <visiontransfer/internal/internalinformation.h>
22 #include <visiontransfer/internal/networking.h>
23 #include <visiontransfer/internal/datachannelservicebase.h>
24 
25 #include <iostream>
26 
27 using namespace visiontransfer;
28 using namespace visiontransfer::internal;
29 
30 namespace visiontransfer {
31 namespace internal {
32 
33 DataChannelServiceBase::DataChannelServiceBase() {
34  // Create socket
35  if((dataChannelSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
36  throw std::runtime_error("Error creating data channel service socket!");
37  }
38 
39  Networking::enableReuseAddress(dataChannelSocket, true);
40 
41  // Bind to port
42  sockaddr_in localAddr;
43  memset(&localAddr, 0, sizeof(localAddr));
44  localAddr.sin_family = AF_INET;
45  localAddr.sin_port = htons(InternalInformation::DATACHANNELSERVICE_PORT);
46  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
47  if(::bind(dataChannelSocket, (sockaddr *)&localAddr, sizeof(localAddr)) != 0) {
48  throw std::runtime_error("Error binding dataChannel socket!");
49  }
50 
51  Networking::setSocketBlocking(dataChannelSocket, false);
52 }
53 
54 DataChannelServiceBase::~DataChannelServiceBase() {
55  Networking::closeSocket(dataChannelSocket);
56 }
57 
58 void DataChannelServiceBase::process() {
59  static unsigned char buffer[100000];
60  static sockaddr_in senderAddress;
61  static socklen_t senderLength = (socklen_t) sizeof(senderAddress);
62 
63  int received;
64  while (true) {
65  // socket is non-blocking
66  received = recvfrom(dataChannelSocket, (char*) buffer, sizeof(buffer), 0, (sockaddr *)&senderAddress, &senderLength);
67  if ((received > 0) && ((unsigned)received >= sizeof(DataChannelMessageHeader))) {
68  DataChannelMessageHeader* raw = reinterpret_cast<DataChannelMessageHeader*>(buffer);
69  DataChannelMessage message;
70  message.header.channelID = (DataChannel::ID) raw->channelID;
71  message.header.channelType = (DataChannel::Type) raw->channelType;
72  message.header.payloadSize = ntohl(raw->payloadSize);
73  message.payload = buffer + sizeof(DataChannelMessageHeader);
74  if ((sizeof(DataChannelMessageHeader) + message.header.payloadSize) != (unsigned) received) {
75  std::cerr << "DataChannelServiceBase: Size mismatch in UDP message, type " << (int) message.header.channelType << " ID " << (int) message.header.channelID << " - discarded!" << std::endl;
76  } else {
77  if (!(message.header.channelType)) {
78  handleChannel0Message(message, &senderAddress);
79  } else {
80  // Try to find a matching registered channel to handle the message
81  auto it = channels.find(message.header.channelID);
82  if (it != channels.end()) {
83  it->second->handleMessage(message, &senderAddress);
84  }
85  }
86  }
87  } else {
88  break;
89  }
90 
91  // Call channel process() iterations
92  for (auto& kv: channels) {
93  kv.second->process();
94  }
95  }
96 }
97 
98 // Actually send data, buffer must be stable
99 int DataChannelServiceBase::sendDataInternal(unsigned char* compiledMessage, unsigned int messageSize, sockaddr_in* recipient) {
100  if (!recipient) throw std::runtime_error("Requested sendDataInternal without recipient address");
101  if (messageSize < sizeof(DataChannelMessageHeader)) throw std::runtime_error("Message header too short");
102  DataChannelMessageHeader* header = reinterpret_cast<DataChannelMessageHeader*>(compiledMessage);
103  unsigned int reportedSize = sizeof(DataChannelMessageHeader) + ntohl(header->payloadSize);
104  if (messageSize != reportedSize) throw std::runtime_error("Message size does not match");
105  int result = 0;
106  result = sendto(dataChannelSocket, (char*) compiledMessage, reportedSize, 0, (sockaddr*) recipient, sizeof(*recipient));
107  if (result != (int) reportedSize) {
108  std::cerr << "Error sending DataChannel message to " << inet_ntoa(recipient->sin_addr) << ": " << Networking::getLastErrorString() << std::endl;
109  throw std::runtime_error("Error during sendto");
110  }
111  return result;
112 }
113 
114 // Generate a new message and send it
115 int DataChannelServiceBase::sendDataIsolatedPacket(DataChannel::ID id, DataChannel::Type type, unsigned char* data, unsigned int dataSize, sockaddr_in* recipient) {
116  unsigned int msgSize = sizeof(DataChannelMessageHeader) + dataSize;
117  unsigned char* buf = new unsigned char[msgSize]();
118  DataChannelMessageHeader* header = reinterpret_cast<DataChannelMessageHeader*>(buf);
119  header->channelID = id;
120  header->channelType = type;
121  header->payloadSize = htonl(dataSize);
122  std::memcpy(buf + sizeof(DataChannelMessageHeader), data, dataSize);
123 
124  int result = sendDataInternal(buf, msgSize, recipient);
125  delete[] buf;
126  return result;
127 }
128 
129 DataChannel::ID DataChannelServiceBase::registerChannel(std::shared_ptr<DataChannel> channel) {
130  // Preliminary implementation: set id:=type (should allocate dynamic IDs later)
131  DataChannel::ID id = (DataChannel::ID) channel->getChannelType();
132  if (channels.count(id)) {
133  return 0; // already registered this ID
134  }
135  // Checking dynamic init, if this fails the service is not registered (and will be auto cleaned)
136  if (!channel->initialize()) return 0;
137  channel->setChannelID(id);
138  channels[id] = channel;
139  channel->setService(shared_from_this());
140  return id;
141 }
142 int DataChannel::sendData(unsigned char* data, unsigned int dataLen, sockaddr_in* recipient) {
143  if (auto srv = service.lock()) {
144  return srv->sendDataIsolatedPacket(channelID, getChannelType(), data, dataLen, recipient);
145  } else return 0;
146 }
147 
148 }} // namespaces
149 
visiontransfer::internal::DataChannelMessage
Definition: datachannelservicebase.h:71
visiontransfer::internal::DataChannelMessageHeader
Transport-level DataChannel header.
Definition: datachannelservicebase.h:66
Allied Vision