helics  3.0.1
TcpHelperClasses.h
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../../common/GuardedTypes.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
11 
12 #include <asio/io_context.hpp>
13 #include <asio/ip/tcp.hpp>
14 #include <functional>
15 #include <memory>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
23 namespace helics {
24 namespace tcp {
26  class TcpConnection: public std::enable_shared_from_this<TcpConnection> {
27  public:
29  enum class connection_state_t {
30  prestart = -1,
31  waiting = 0,
32  operating = 1,
33  halted = 3,
34  closed = 4,
35  };
36 
37  using pointer = std::shared_ptr<TcpConnection>;
38  static pointer create(asio::io_context& io_context,
39  const std::string& connection,
40  const std::string& port,
41  size_t bufferSize = 10192);
43  static pointer create(asio::io_context& io_context, size_t bufferSize)
44  {
45  return pointer(new TcpConnection(io_context, bufferSize));
46  }
48  auto& socket() { return socket_.lowest_layer(); }
50  void startReceive();
52  void cancel() { socket_.lowest_layer().cancel(); }
54  void close();
56  void closeNoWait();
58  void waitOnClose();
60  bool isReceiving() const { return receivingHalt.isActive(); }
62  void setDataCall(
63  std::function<size_t(TcpConnection::pointer, const char*, size_t)> dataFunc);
65  void setErrorCall(
66  std::function<bool(TcpConnection::pointer, const std::error_code&)> errorFunc);
68  void setLoggingFunction(
69  std::function<void(int loglevel, const std::string& logMessage)> logFunc);
72  size_t send(const void* buffer, size_t dataLength);
75  size_t send(const std::string& dataString);
76 
81  size_t receive(void* buffer, size_t maxDataSize);
90  template<typename Process>
91  void send_async(const void* buffer, size_t dataLength, Process callback)
92  {
93  socket_.async_write_some(asio::buffer(buffer, dataLength), callback);
94  }
103  template<typename Process>
104  void async_receive(void* buffer, size_t dataLength, Process callback)
105  {
106  socket_.async_read_some(asio::buffer(buffer, dataLength), callback);
107  }
108 
113  void async_receive(std::function<void(TcpConnection::pointer,
114  const char* buffer,
115  size_t dataLength,
116  const std::error_code& error)> callback)
117  {
118  socket_.async_read_some(asio::buffer(data, data.size()),
119  [connection = shared_from_this(),
120  callback = std::move(callback)](const std::error_code& error,
121  size_t bytes_transferred) {
122  connection->handle_read(bytes_transferred, error, callback);
123  });
124  }
126  bool isConnected() const
127  {
128  return (connected.isActive()) && (!connectionError.load(std::memory_order_acquire));
129  }
134  bool waitUntilConnected(std::chrono::milliseconds timeOut);
136  int getIdentifier() const { return idcode; }
137 
138  private:
139  TcpConnection(asio::io_context& io_context, size_t bufferSize):
140  socket_(io_context), context_(io_context), data(bufferSize), idcode(idcounter++)
141  {
142  }
143  TcpConnection(asio::io_context& io_context,
144  const std::string& connection,
145  const std::string& port,
146  size_t bufferSize);
148  void handle_read(const std::error_code& error, size_t bytes_transferred);
149  void handle_read(
150  size_t message_size,
151  const std::error_code& error,
152  std::function<
153  void(TcpConnection::pointer, const char*, size_t, const std::error_code& error)>
154  callback)
155  {
156  callback(shared_from_this(), data.data(), message_size, error);
157  }
158  static std::atomic<int> idcounter;
159 
160  std::atomic<size_t> residBufferSize{0};
161  asio::ip::tcp::socket socket_;
162  asio::io_context& context_;
163  std::vector<char> data;
164  std::atomic<bool> triggerhalt{false};
165  const bool connecting{false};
166  gmlc::concurrency::TriggerVariable receivingHalt;
167  std::atomic<bool> connectionError{false};
168  gmlc::concurrency::TriggerVariable connected;
169  std::function<size_t(TcpConnection::pointer, const char*, size_t)> dataCall;
170  std::function<bool(TcpConnection::pointer, const std::error_code&)> errorCall;
171  std::function<void(int level, const std::string& logMessage)> logFunction;
172  std::atomic<connection_state_t> state{connection_state_t::prestart};
173  const int idcode;
174  void connect_handler(const std::error_code& error);
175  };
176 
178  class TcpAcceptor: public std::enable_shared_from_this<TcpAcceptor> {
179  public:
180  enum class accepting_state_t {
181  opened = 0,
182  connecting = 1,
183  connected = 2,
184  halted = 3,
185  closed = 4,
186  };
187  using pointer = std::shared_ptr<TcpAcceptor>;
189  static pointer create(asio::io_context& io_context, asio::ip::tcp::endpoint& ep)
190  {
191  return pointer(new TcpAcceptor(io_context, ep));
192  }
193 
194  static pointer create(asio::io_context& io_context, uint16_t port)
195  {
196  return pointer(new TcpAcceptor(io_context, port));
197  }
200  {
201  try {
202  close();
203  }
204  catch (...) {
205  }
206  }
207 
209  bool connect();
211  bool connect(std::chrono::milliseconds timeOut);
213  bool start(TcpConnection::pointer conn);
215  void cancel() { acceptor_.cancel(); }
217  void close();
219  bool isAccepting() const { return accepting.isActive(); }
221  bool isConnected() const { return (state.load() == accepting_state_t::connected); }
223  void
224  setAcceptCall(std::function<void(TcpAcceptor::pointer, TcpConnection::pointer)> accFunc)
225  {
226  acceptCall = std::move(accFunc);
227  }
228 
231  std::function<bool(TcpAcceptor::pointer, const std::error_code&)> errorFunc)
232  {
233  errorCall = std::move(errorFunc);
234  }
236  template<class X>
237  void set_option(const X& option)
238  {
239  acceptor_.set_option(option);
240  }
242  std::string to_string() const;
243 
244  private:
245  TcpAcceptor(asio::io_context& io_context, asio::ip::tcp::endpoint& ep);
246  TcpAcceptor(asio::io_context& io_context, uint16_t port);
248  void handle_accept(TcpAcceptor::pointer ptr,
249  TcpConnection::pointer new_connection,
250  const std::error_code& error);
251  asio::ip::tcp::endpoint endpoint_;
252  asio::ip::tcp::acceptor acceptor_;
253  std::function<void(TcpAcceptor::pointer, TcpConnection::pointer)> acceptCall;
254  std::function<bool(TcpAcceptor::pointer, const std::error_code&)> errorCall;
255  std::atomic<accepting_state_t> state{accepting_state_t::opened};
256  gmlc::concurrency::TriggerVariable accepting;
257  };
258 
261  class TcpServer: public std::enable_shared_from_this<TcpServer> {
262  public:
263  using pointer = std::shared_ptr<TcpServer>;
264 
265  static pointer create(asio::io_context& io_context,
266  const std::string& address,
267  const std::string& port,
268  bool reuse_port = false,
269  int nominalBufferSize = 10192);
270 
271  static pointer create(asio::io_context& io_context,
272  const std::string& address,
273  uint16_t PortNum,
274  bool reuse_port = false,
275  int nominalBufferSize = 10192);
276  static pointer
277  create(asio::io_context& io_context, uint16_t PortNum, int nominalBufferSize = 10192);
278 
279  public:
280  ~TcpServer();
282  void setPortReuse(bool reuse) { reuse_address = reuse; }
285  bool start();
287  void close();
289  bool isReady() const { return !(halted.load()); }
291  bool reConnect(std::chrono::milliseconds timeOut);
293  void
294  setDataCall(std::function<size_t(TcpConnection::pointer, const char*, size_t)> dataFunc)
295  {
296  dataCall = std::move(dataFunc);
297  }
300  std::function<bool(TcpConnection::pointer, const std::error_code&)> errorFunc)
301  {
302  errorCall = std::move(errorFunc);
303  }
304  void handle_accept(TcpAcceptor::pointer acc, TcpConnection::pointer new_connection);
306  TcpConnection::pointer findSocket(int connectorID) const;
307 
308  private:
309  TcpServer(asio::io_context& io_context,
310  const std::string& address,
311  uint16_t portNum,
312  bool port_reuse,
313  int nominalBufferSize);
314  TcpServer(asio::io_context& io_context,
315  const std::string& address,
316  const std::string& port,
317  bool port_reuse,
318  int nominalBufferSize);
319  TcpServer(asio::io_context& io_context, uint16_t portNum, int nominalBufferSize);
320 
321  void initialConnect();
322  asio::io_context& ioctx;
323  mutable std::mutex accepting;
324  std::vector<TcpAcceptor::pointer> acceptors;
325  std::vector<asio::ip::tcp::endpoint> endpoints;
326  size_t bufferSize;
327  std::function<size_t(TcpConnection::pointer, const char*, size_t)> dataCall;
328  std::function<bool(TcpConnection::pointer, const std::error_code& error)> errorCall;
329  std::atomic<bool> halted{false};
330  bool reuse_address = false;
331  // this data structure is protected by the accepting mutex
332  std::vector<TcpConnection::pointer> connections;
333  };
334 
335 } // namespace tcp
336 } // namespace helics
helics::tcp::TcpConnection::async_receive
void async_receive(void *buffer, size_t dataLength, Process callback)
Definition: TcpHelperClasses.h:104
helics::tcp::TcpConnection::setLoggingFunction
void setLoggingFunction(std::function< void(int loglevel, const std::string &logMessage)> logFunc)
Definition: TcpHelperClasses.cpp:78
helics::tcp::TcpAcceptor
Definition: TcpHelperClasses.h:178
helics::tcp::TcpServer::reConnect
bool reConnect(std::chrono::milliseconds timeOut)
Definition: TcpHelperClasses.cpp:593
helics::tcp::TcpConnection::isConnected
bool isConnected() const
Definition: TcpHelperClasses.h:126
helics::tcp::TcpAcceptor::set_option
void set_option(const X &option)
Definition: TcpHelperClasses.h:237
helics::tcp::TcpConnection::waitOnClose
void waitOnClose()
Definition: TcpHelperClasses.cpp:194
helics::tcp::TcpAcceptor::isConnected
bool isConnected() const
Definition: TcpHelperClasses.h:221
helics::tcp::TcpAcceptor::isAccepting
bool isAccepting() const
Definition: TcpHelperClasses.h:219
helics::tcp::TcpConnection::connection_state_t
connection_state_t
Definition: TcpHelperClasses.h:29
helics::tcp::TcpConnection::socket
auto & socket()
Definition: TcpHelperClasses.h:48
helics::tcp::TcpConnection::getIdentifier
int getIdentifier() const
Definition: TcpHelperClasses.h:136
helics::tcp::TcpAcceptor::setErrorCall
void setErrorCall(std::function< bool(TcpAcceptor::pointer, const std::error_code &)> errorFunc)
Definition: TcpHelperClasses.h:230
TcpHelperClasses.h
helics::tcp::TcpAcceptor::create
static pointer create(asio::io_context &io_context, asio::ip::tcp::endpoint &ep)
Definition: TcpHelperClasses.h:189
helics::tcp::TcpConnection::cancel
void cancel()
Definition: TcpHelperClasses.h:52
helics::tcp::TcpServer::findSocket
TcpConnection::pointer findSocket(int connectorID) const
Definition: TcpHelperClasses.cpp:709
helics::tcp::TcpServer::setErrorCall
void setErrorCall(std::function< bool(TcpConnection::pointer, const std::error_code &)> errorFunc)
Definition: TcpHelperClasses.h:299
helics::tcp::TcpConnection::waitUntilConnected
bool waitUntilConnected(std::chrono::milliseconds timeOut)
Definition: TcpHelperClasses.cpp:307
helics::tcp::TcpAcceptor::to_string
std::string to_string() const
Definition: TcpHelperClasses.cpp:421
helics::tcp::TcpServer::setPortReuse
void setPortReuse(bool reuse)
Definition: TcpHelperClasses.h:282
helics::tcp::TcpAcceptor::setAcceptCall
void setAcceptCall(std::function< void(TcpAcceptor::pointer, TcpConnection::pointer)> accFunc)
Definition: TcpHelperClasses.h:224
helics::tcp::TcpConnection::receive
size_t receive(void *buffer, size_t maxDataSize)
Definition: TcpHelperClasses.cpp:302
helics::tcp::TcpConnection::send_async
void send_async(const void *buffer, size_t dataLength, Process callback)
Definition: TcpHelperClasses.h:91
helics::tcp::TcpServer::start
bool start()
Definition: TcpHelperClasses.cpp:642
helics::tcp::TcpAcceptor::~TcpAcceptor
~TcpAcceptor()
Definition: TcpHelperClasses.h:199
helics::tcp::TcpConnection::setDataCall
void setDataCall(std::function< size_t(TcpConnection::pointer, const char *, size_t)> dataFunc)
Definition: TcpHelperClasses.cpp:59
helics::tcp::TcpConnection
Definition: TcpHelperClasses.h:26
helics::tcp::TcpConnection::startReceive
void startReceive()
Definition: TcpHelperClasses.cpp:21
helics::tcp::TcpConnection::create
static pointer create(asio::io_context &io_context, size_t bufferSize)
Definition: TcpHelperClasses.h:43
helics::tcp::TcpConnection::send
size_t send(const void *buffer, size_t dataLength)
Definition: TcpHelperClasses.cpp:248
helics::tcp::TcpAcceptor::start
bool start(TcpConnection::pointer conn)
Definition: TcpHelperClasses.cpp:379
helics::tcp::TcpServer::isReady
bool isReady() const
Definition: TcpHelperClasses.h:289
helics::tcp::TcpConnection::isReceiving
bool isReceiving() const
Definition: TcpHelperClasses.h:60
helics::tcp::TcpServer::setDataCall
void setDataCall(std::function< size_t(TcpConnection::pointer, const char *, size_t)> dataFunc)
Definition: TcpHelperClasses.h:294
helics::tcp::TcpConnection::close
void close()
Definition: TcpHelperClasses.cpp:153
helics::tcp::TcpConnection::setErrorCall
void setErrorCall(std::function< bool(TcpConnection::pointer, const std::error_code &)> errorFunc)
Definition: TcpHelperClasses.cpp:68
helics::tcp::TcpAcceptor::cancel
void cancel()
Definition: TcpHelperClasses.h:215
helics::tcp::TcpServer::close
void close()
Definition: TcpHelperClasses.cpp:722
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::tcp::TcpAcceptor::connect
bool connect()
Definition: TcpHelperClasses.cpp:332
helics::tcp::TcpServer
Definition: TcpHelperClasses.h:261
helics::tcp::TcpConnection::closeNoWait
void closeNoWait()
Definition: TcpHelperClasses.cpp:159
helics::tcp::TcpConnection::async_receive
void async_receive(std::function< void(TcpConnection::pointer, const char *buffer, size_t dataLength, const std::error_code &error)> callback)
Definition: TcpHelperClasses.h:113
helics::tcp::TcpAcceptor::close
void close()
Definition: TcpHelperClasses.cpp:414