9 #include "../../common/GuardedTypes.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
12 #include <asio/io_context.hpp>
13 #include <asio/ip/tcp.hpp>
26 class TcpConnection:
public std::enable_shared_from_this<TcpConnection> {
37 using pointer = std::shared_ptr<TcpConnection>;
38 static pointer create(asio::io_context& io_context,
39 const std::string& connection,
40 const std::string& port,
41 size_t bufferSize = 10192);
43 static pointer
create(asio::io_context& io_context,
size_t bufferSize)
48 asio::ip::tcp::socket&
socket() {
return socket_; }
63 std::function<
size_t(TcpConnection::pointer,
const char*,
size_t)> dataFunc);
66 std::function<
bool(TcpConnection::pointer,
const std::error_code&)> errorFunc);
69 std::function<
void(
int loglevel,
const std::string& logMessage)> logFunc);
72 size_t send(
const void* buffer,
size_t dataLength);
75 size_t send(
const std::string& dataString);
81 size_t receive(
void* buffer,
size_t maxDataSize);
90 template<
typename Process>
91 void send_async(
const void* buffer,
size_t dataLength, Process callback)
93 socket_.async_send(asio::buffer(buffer, dataLength), callback);
103 template<
typename Process>
106 socket_.async_receive(asio::buffer(buffer, dataLength), callback);
116 const std::error_code& error)> callback)
118 socket_.async_receive(asio::buffer(data, data.size()),
119 [connection = shared_from_this(),
120 callback = std::move(callback)](
const std::error_code&
error,
121 size_t bytes_transferred) {
122 connection->handle_read(bytes_transferred,
error, callback);
128 return (connected.isActive()) && (!connectionError.load(std::memory_order_acquire));
139 TcpConnection(asio::io_context& io_context,
size_t bufferSize):
140 socket_(io_context), context_(io_context), data(bufferSize), idcode(idcounter++)
143 TcpConnection(asio::io_context& io_context,
144 const std::string& connection,
145 const std::string& port,
148 void handle_read(
const std::error_code& error,
size_t bytes_transferred);
151 const std::error_code& error,
153 void(TcpConnection::pointer,
const char*,
size_t,
const std::error_code& error)>
156 callback(shared_from_this(), data.data(), message_size, error);
158 static std::atomic<int> idcounter;
160 std::atomic<size_t> residBufferSize{0};
161 asio::ip::tcp::socket socket_;
162 asio::io_context& context_;
163 std::vector<char> data;
164 std::atomic<bool> triggerhalt{
false};
165 const bool connecting{
false};
166 gmlc::concurrency::TriggerVariable receivingHalt;
167 std::atomic<bool> connectionError{
false};
168 gmlc::concurrency::TriggerVariable connected;
169 std::function<size_t(TcpConnection::pointer,
const char*,
size_t)> dataCall;
170 std::function<bool(TcpConnection::pointer,
const std::error_code&)> errorCall;
171 std::function<void(
int level,
const std::string& logMessage)> logFunction;
172 std::atomic<connection_state_t> state{connection_state_t::prestart};
174 void connect_handler(
const std::error_code& error);
178 class TcpAcceptor:
public std::enable_shared_from_this<TcpAcceptor> {
180 enum class accepting_state_t {
187 using pointer = std::shared_ptr<TcpAcceptor>;
189 static pointer
create(asio::io_context& io_context, asio::ip::tcp::endpoint& ep)
194 static pointer
create(asio::io_context& io_context, uint16_t port)
211 bool connect(std::chrono::milliseconds timeOut);
213 bool start(TcpConnection::pointer conn);
221 bool isConnected()
const {
return (state.load() == accepting_state_t::connected); }
224 setAcceptCall(std::function<
void(TcpAcceptor::pointer, TcpConnection::pointer)> accFunc)
226 acceptCall = std::move(accFunc);
231 std::function<
bool(TcpAcceptor::pointer,
const std::error_code&)> errorFunc)
233 errorCall = std::move(errorFunc);
239 acceptor_.set_option(option);
245 TcpAcceptor(asio::io_context& io_context, asio::ip::tcp::endpoint& ep);
246 TcpAcceptor(asio::io_context& io_context, uint16_t port);
248 void handle_accept(TcpAcceptor::pointer ptr,
249 TcpConnection::pointer new_connection,
250 const std::error_code& error);
251 asio::ip::tcp::endpoint endpoint_;
252 asio::ip::tcp::acceptor acceptor_;
253 std::function<void(TcpAcceptor::pointer, TcpConnection::pointer)> acceptCall;
254 std::function<bool(TcpAcceptor::pointer,
const std::error_code&)> errorCall;
255 std::atomic<accepting_state_t> state{accepting_state_t::opened};
256 gmlc::concurrency::TriggerVariable accepting;
261 class TcpServer:
public std::enable_shared_from_this<TcpServer> {
263 using pointer = std::shared_ptr<TcpServer>;
265 static pointer create(asio::io_context& io_context,
266 const std::string& address,
267 const std::string& port,
268 bool reuse_port =
false,
269 int nominalBufferSize = 10192);
271 static pointer create(asio::io_context& io_context,
272 const std::string& address,
274 bool reuse_port =
false,
275 int nominalBufferSize = 10192);
277 create(asio::io_context& io_context, uint16_t PortNum,
int nominalBufferSize = 10192);
289 bool isReady()
const {
return !(halted.load()); }
291 bool reConnect(std::chrono::milliseconds timeOut);
294 setDataCall(std::function<
size_t(TcpConnection::pointer,
const char*,
size_t)> dataFunc)
296 dataCall = std::move(dataFunc);
300 std::function<
bool(TcpConnection::pointer,
const std::error_code&)> errorFunc)
302 errorCall = std::move(errorFunc);
304 void handle_accept(TcpAcceptor::pointer acc, TcpConnection::pointer new_connection);
306 TcpConnection::pointer
findSocket(
int connectorID)
const;
310 const std::string& address,
313 int nominalBufferSize);
315 const std::string& address,
316 const std::string& port,
318 int nominalBufferSize);
319 TcpServer(asio::io_context& io_context, uint16_t portNum,
int nominalBufferSize);
321 void initialConnect();
322 asio::io_context& ioctx;
323 mutable std::mutex accepting;
324 std::vector<TcpAcceptor::pointer> acceptors;
325 std::vector<asio::ip::tcp::endpoint> endpoints;
327 std::function<size_t(TcpConnection::pointer,
const char*,
size_t)> dataCall;
328 std::function<bool(TcpConnection::pointer,
const std::error_code& error)> errorCall;
329 std::atomic<bool> halted{
false};
330 bool reuse_address =
false;
332 std::vector<TcpConnection::pointer> connections;