helics  3.3.0
CommsInterface.hpp
1 /*
2 Copyright (c) 2017-2022,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "NetworkBrokerData.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
11 #include "gmlc/concurrency/TripWire.hpp"
12 #include "gmlc/containers/BlockingPriorityQueue.hpp"
13 #include "helics/core/ActionMessage.hpp"
14 
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <utility>
20 
21 namespace helics {
22 
26  public:
29  enum class thread_generation {
30  single,
31  dual
32  };
34  CommsInterface() = default;
35  explicit CommsInterface(thread_generation threads);
37  virtual ~CommsInterface();
38 
40  virtual void loadNetworkInfo(const NetworkBrokerData& netInfo);
41  void loadTargetInfo(std::string_view localTarget,
42  std::string_view brokerTarget,
43  gmlc::networking::InterfaceNetworks targetNetwork =
44  gmlc::networking::InterfaceNetworks::LOCAL);
47  void transmit(route_id rid, const ActionMessage& cmd);
50  void transmit(route_id rid, ActionMessage&& cmd);
53  void addRoute(route_id rid, std::string_view routeInfo);
55  void removeRoute(route_id rid);
59  bool connect();
60 
63  void disconnect();
64 
66  bool reconnect();
68  void setName(const std::string& commName);
69 
72  void setRequireBrokerConnection(bool requireBrokerConnection);
73 
76  void setCallback(std::function<void(ActionMessage&&)> callback);
79  void setLoggingCallback(
80  std::function<void(int level, std::string_view name, std::string_view message)> callback);
83  void setMessageSize(int maxMsgSize, int maxCount);
86  bool isConnected() const;
87 
91  void setTimeout(std::chrono::milliseconds timeOut);
93  virtual void setFlag(std::string_view flag, bool val);
95  void setServerMode(bool serverActive);
96 
98  void logWarning(std::string_view message) const;
100  void logError(std::string_view message) const;
102  void logMessage(std::string_view message) const;
103 
104  protected:
107  enum class ConnectionStatus : int {
108 
109  STARTUP = -1,
110  CONNECTED = 0,
111  RECONNECTING = 1,
112  TERMINATED = 2,
113  ERRORED = 4
114  };
115 
116  private:
118  std::atomic<ConnectionStatus> rxStatus{ConnectionStatus::STARTUP};
119 
120  protected:
121  gmlc::concurrency::TriggerVariable rxTrigger;
122 
123  std::string name;
124  std::string localTargetAddress;
125  std::string brokerTargetAddress;
126  std::string brokerName;
127  std::string brokerInitString;
129 
130  private:
131  std::string randomID;
132  std::atomic<ConnectionStatus> txStatus{ConnectionStatus::STARTUP};
134  gmlc::concurrency::TriggerVariable txTrigger;
135  std::atomic<bool> operating{false};
136  const bool singleThread{false};
137 
138  protected:
140  false};
141  bool serverMode{true};
142  bool autoBroker{false};
143  bool useJsonSerialization{false};
144  bool observer{false};
145 
146  std::chrono::milliseconds connectionTimeout{4000};
147  int maxMessageSize = 16 * 1024;
148  int maxMessageCount = 512;
149  std::atomic<bool> requestDisconnect{false};
150  std::function<void(ActionMessage&&)>
152  std::function<void(int level, std::string_view name, std::string_view message)>
154  gmlc::containers::BlockingPriorityQueue<std::pair<route_id, ActionMessage>>
156  // closing the files or connection can take some time so there is a need for inter-thread
157  // communication to not spit out warning messages if it is in the process of disconnecting
158  std::atomic<bool> disconnecting{
159  false};
160  gmlc::networking::InterfaceNetworks interfaceNetwork{
161  gmlc::networking::InterfaceNetworks::LOCAL};
162 
163  private:
164  std::thread queue_transmitter;
165  std::thread queue_watcher;
166  std::mutex threadSyncLock;
167  virtual void queue_rx_function() = 0;
168  virtual void queue_tx_function() = 0;
169  virtual void closeTransmitter();
170  virtual void closeReceiver();
171  virtual void reconnectTransmitter();
172  virtual void reconnectReceiver();
173  protected:
174  void setTxStatus(ConnectionStatus status);
175  void setRxStatus(ConnectionStatus status);
176  ConnectionStatus getRxStatus() const { return rxStatus.load(); }
177  ConnectionStatus getTxStatus() const { return txStatus.load(); }
180  bool propertyLock();
181  void propertyUnLock();
183  void join_tx_rx_thread();
185  const std::string& getRandomID() const { return randomID; }
186 
187  private:
188  gmlc::concurrency::TripWireDetector
189  tripDetector;
190 };
191 
192 namespace CommFactory {
194  class CommBuilder {
195  public:
196  virtual std::unique_ptr<CommsInterface> build() = 0;
197  };
198 
200  template<class CommTYPE>
201  class CommTypeBuilder final: public CommBuilder {
202  public:
203  static_assert(std::is_base_of<CommsInterface, CommTYPE>::value,
204  "Type does not inherit from helics::CommsInterface");
205 
206  using comm_build_type = CommTYPE;
207  virtual std::unique_ptr<CommsInterface> build() override
208  {
209  return std::make_unique<CommTYPE>();
210  }
211  };
212 
214  void
215  defineCommBuilder(std::shared_ptr<CommBuilder> cb, std::string_view commTypeName, int code);
216 
218  template<class CommTYPE>
219  std::shared_ptr<CommBuilder> addCommType(std::string_view commTypeName, int code)
220  {
221  auto bld = std::make_shared<CommTypeBuilder<CommTYPE>>();
222  std::shared_ptr<CommBuilder> cbld = std::static_pointer_cast<CommBuilder>(bld);
223  defineCommBuilder(cbld, commTypeName, code);
224  return cbld;
225  }
226 
227  std::unique_ptr<CommsInterface> create(CoreType type);
228  std::unique_ptr<CommsInterface> create(std::string_view type);
229 
230 } // namespace CommFactory
231 
232 template<class X>
234  private:
235  std::atomic<X>& aref;
236  X fval;
237  X expectedValue;
238 
239  public:
240  ConditionalChangeOnDestroy(std::atomic<X>& var, X finalValue, X expValue):
241  aref(var), fval(std::move(finalValue)), expectedValue(std::move(expValue))
242  {
243  }
244  ~ConditionalChangeOnDestroy() { aref.compare_exchange_strong(expectedValue, fval); }
245 };
246 
247 } // namespace helics
helics::CommsInterface::brokerTargetAddress
std::string brokerTargetAddress
the base for the broker address
Definition: CommsInterface.hpp:125
helics::CommsInterface::propertyLock
bool propertyLock()
Definition: CommsInterface.cpp:153
helics::CommsInterface::observer
bool observer
true for connections that are for observation only
Definition: CommsInterface.hpp:144
helics::CommsInterface::loggingCallback
std::function< void(int level, std::string_view name, std::string_view message)> loggingCallback
callback for logging
Definition: CommsInterface.hpp:153
helics::CommsInterface::name
std::string name
the name of the object
Definition: CommsInterface.hpp:123
helics::NetworkBrokerData::maxMessageSize
int maxMessageSize
maximum message size
Definition: NetworkBrokerData.hpp:43
helics::CommsInterface::~CommsInterface
virtual ~CommsInterface()
Definition: CommsInterface.cpp:99
helics::CommsInterface::disconnecting
std::atomic< bool > disconnecting
flag indicating that the comm system is in the process of disconnecting
Definition: CommsInterface.hpp:158
helics::CoreType
CoreType
Definition: CoreTypes.hpp:36
helics::NetworkBrokerData::localInterface
std::string localInterface
the interface to use for the local connection
Definition: NetworkBrokerData.hpp:35
helics::CommsInterface::thread_generation
thread_generation
Definition: CommsInterface.hpp:29
helics::NetworkBrokerData::maxMessageCount
int maxMessageCount
maximum message count
Definition: NetworkBrokerData.hpp:44
HELICS_LOG_LEVEL_WARNING
@ HELICS_LOG_LEVEL_WARNING
Definition: helics_enums.h:192
helics::CommsInterface::setCallback
void setCallback(std::function< void(ActionMessage &&)> callback)
Definition: CommsInterface.cpp:501
helics::NetworkBrokerData::autobroker
bool autobroker
Definition: NetworkBrokerData.hpp:51
helics::CommsInterface::brokerInitString
std::string brokerInitString
the initialization string for any automatically generated broker
Definition: CommsInterface.hpp:128
helics::NetworkBrokerData
Definition: NetworkBrokerData.hpp:23
helics::CommsInterface::connectionTimeout
std::chrono::milliseconds connectionTimeout
Definition: CommsInterface.hpp:146
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::NetworkBrokerData::observer
bool observer
specify that the network connection is used for observation only
Definition: NetworkBrokerData.hpp:57
helics::CommsInterface::logError
void logError(std::string_view message) const
Definition: CommsInterface.cpp:581
helics::CommsInterface::thread_generation::single
@ single
indicate that a single thread is used for transmitting and receiving
helics::BrokerFactory::create
std::shared_ptr< Broker > create(CoreType type, std::string_view configureString)
Definition: BrokerFactory.cpp:99
helics::CommsInterface::txQueue
gmlc::containers::BlockingPriorityQueue< std::pair< route_id, ActionMessage > > txQueue
set of messages waiting to be transmitted
Definition: CommsInterface.hpp:155
helics::CommsInterface::CommsInterface
CommsInterface()=default
helics::CommsInterface::setMessageSize
void setMessageSize(int maxMsgSize, int maxCount)
Definition: CommsInterface.cpp:518
helics::CommsInterface::thread_generation::dual
@ dual
indicate that separate threads are used, 1 for transmission and 1 for reception
helics::CommsInterface::setServerMode
void setServerMode(bool serverActive)
Definition: CommsInterface.cpp:550
helics::CommsInterface::logMessage
void logMessage(std::string_view message) const
Definition: CommsInterface.cpp:563
helics::CommFactory::CommTypeBuilder
Definition: CommsInterface.hpp:201
helics::route_id
Definition: GlobalFederateId.hpp:184
helics::CommsInterface::ActionCallback
std::function< void(ActionMessage &&)> ActionCallback
the callback for what to do with a received message
Definition: CommsInterface.hpp:151
helics::CommsInterface::join_tx_rx_thread
void join_tx_rx_thread()
Definition: CommsInterface.cpp:458
helics::ActionMessage::messageID
int32_t messageID
8 – message ID for a variety of purposes
Definition: ActionMessage.hpp:36
helics::CommsInterface::ConnectionStatus::ERRORED
@ ERRORED
some ERRORED occurred on the connection
helics::ActionMessage::setExtraData
void setExtraData(int32_t data)
Definition: ActionMessage.hpp:157
helics::CommsInterface::disconnect
void disconnect()
Definition: CommsInterface.cpp:385
helics::NetworkBrokerData::brokerInitString
std::string brokerInitString
a string containing arguments for the broker initialization
Definition: NetworkBrokerData.hpp:36
helics::CommsInterface::transmit
void transmit(route_id rid, const ActionMessage &cmd)
Definition: CommsInterface.cpp:170
helics::CommFactory::CommBuilder
Definition: CommsInterface.hpp:194
helics::CommsInterface::ConnectionStatus::TERMINATED
@ TERMINATED
the connection has been TERMINATED
helics::CommsInterface::useJsonSerialization
bool useJsonSerialization
true to make all connections use JSON serialization
Definition: CommsInterface.hpp:143
helics::CommsInterface::logWarning
void logWarning(std::string_view message) const
Definition: CommsInterface.cpp:572
helics::NetworkBrokerData::brokerAddress
std::string brokerAddress
the address or domain name of the broker
Definition: NetworkBrokerData.hpp:34
helics::ActionMessage::payload
SmallBuffer payload
buffer to contain the data payload
Definition: ActionMessage.hpp:48
helics::CommsInterface::localTargetAddress
std::string localTargetAddress
the base for the receive address
Definition: CommsInterface.hpp:124
helics::CommsInterface::brokerName
std::string brokerName
Definition: CommsInterface.hpp:126
helics::CommsInterface::setLoggingCallback
void setLoggingCallback(std::function< void(int level, std::string_view name, std::string_view message)> callback)
Definition: CommsInterface.cpp:509
helics::CommsInterface
Definition: CommsInterface.hpp:25
helics::CommsInterface::requestDisconnect
std::atomic< bool > requestDisconnect
flag gets set when disconnect is called
Definition: CommsInterface.hpp:149
helics::CommsInterface::ConnectionStatus::STARTUP
@ STARTUP
the connection is in STARTUP mode
helics::CommsInterface::addRoute
void addRoute(route_id rid, std::string_view routeInfo)
Definition: CommsInterface.cpp:188
helics::route_id::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:192
helics::CommsInterface::ConnectionStatus
ConnectionStatus
Definition: CommsInterface.hpp:107
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::CommsInterface::loadNetworkInfo
virtual void loadNetworkInfo(const NetworkBrokerData &netInfo)
Definition: CommsInterface.cpp:104
helics::CommsInterface::ConnectionStatus::RECONNECTING
@ RECONNECTING
we are trying reconnect
helics::ConditionalChangeOnDestroy
Definition: CommsInterface.hpp:233
helics::CommsInterface::serverMode
bool serverMode
some comms have a server mode and non-server mode
Definition: CommsInterface.hpp:141
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:188
helics::NetworkBrokerData::server_mode
ServerModeOptions server_mode
setup a server mode
Definition: NetworkBrokerData.hpp:58
helics::CommsInterface::setName
void setName(const std::string &commName)
Definition: CommsInterface.cpp:377
helics::CommsInterface::setTimeout
void setTimeout(std::chrono::milliseconds timeOut)
Definition: CommsInterface.cpp:542
helics::CommsInterface::setRequireBrokerConnection
void setRequireBrokerConnection(bool requireBrokerConnection)
Definition: CommsInterface.cpp:369
helics::CommsInterface::mRequireBrokerConnection
bool mRequireBrokerConnection
specify that the comms should assume we have a broker
Definition: CommsInterface.hpp:139
helics::NetworkBrokerData::connectionAddress
std::string connectionAddress
the address for connecting
Definition: NetworkBrokerData.hpp:37
helics::CommsInterface::autoBroker
bool autoBroker
the broker should be automatically generated if needed
Definition: CommsInterface.hpp:142
helics::NetworkBrokerData::brokerName
std::string brokerName
the identifier for the broker
Definition: NetworkBrokerData.hpp:33
helics::CommsInterface::maxMessageSize
int maxMessageSize
the maximum message size for the queues (if needed)
Definition: CommsInterface.hpp:147
helics::CommsInterface::setFlag
virtual void setFlag(std::string_view flag, bool val)
Definition: CommsInterface.cpp:531
helics::CommsInterface::reconnect
bool reconnect()
Definition: CommsInterface.cpp:471
HELICS_LOG_LEVEL_INTERFACES
@ HELICS_LOG_LEVEL_INTERFACES
Definition: helics_enums.h:199
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
helics::CommsInterface::ConnectionStatus::CONNECTED
@ CONNECTED
we are CONNECTED
helics::CommsInterface::connect
bool connect()
Definition: CommsInterface.cpp:262
helics::CommsInterface::isConnected
bool isConnected() const
Definition: CommsInterface.cpp:558
helics::CommsInterface::maxMessageCount
int maxMessageCount
the maximum number of message to buffer (if needed)
Definition: CommsInterface.hpp:148
helics::CommsInterface::getRandomID
const std::string & getRandomID() const
Definition: CommsInterface.hpp:185
helics::CommsInterface::removeRoute
void removeRoute(route_id rid)
Definition: CommsInterface.cpp:197