helics  3.0.1
CommsInterface.hpp
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "NetworkBrokerData.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
11 #include "gmlc/concurrency/TripWire.hpp"
12 #include "gmlc/containers/BlockingPriorityQueue.hpp"
13 #include "helics/core/ActionMessage.hpp"
14 
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <utility>
20 
21 namespace helics {
22 enum class InterfaceNetworks : char;
23 
27  public:
30  enum class thread_generation {
31  single, // indicate that a single thread is used for transmitting and receiving
32  dual // indicate that separate threads are used 1 for transmission and one for reception
33  };
35  CommsInterface() = default;
36  explicit CommsInterface(thread_generation threads);
38  virtual ~CommsInterface();
39 
41  virtual void loadNetworkInfo(const NetworkBrokerData& netInfo);
42  void loadTargetInfo(const std::string& localTarget,
43  const std::string& brokerTarget,
47  void transmit(route_id rid, const ActionMessage& cmd);
50  void transmit(route_id rid, ActionMessage&& cmd);
53  void addRoute(route_id rid, const std::string& routeInfo);
55  void removeRoute(route_id rid);
59  bool connect();
60 
63  void disconnect();
64 
66  bool reconnect();
68  void setName(const std::string& commName);
69 
72  void setRequireBrokerConnection(bool requireBrokerConnection);
73 
76  void setCallback(std::function<void(ActionMessage&&)> callback);
79  void setLoggingCallback(
80  std::function<void(int level, const std::string& name, const std::string& message)>
81  callback);
84  void setMessageSize(int maxMsgSize, int maxCount);
87  bool isConnected() const;
88 
92  void setTimeout(std::chrono::milliseconds timeOut);
94  virtual void setFlag(const std::string& flag, bool val);
96  void setServerMode(bool serverActive);
97 
99  void logWarning(const std::string& message) const;
101  void logError(const std::string& message) const;
103  void logMessage(const std::string& message) const;
104 
105  protected:
108  enum class connection_status : int {
109 
110  startup = -1,
111  connected = 0,
112  reconnecting = 1,
113  terminated = 2,
114  error = 4
115  };
116 
117  private:
118  std::atomic<connection_status> rx_status{
119  connection_status::startup};
120  protected:
121  gmlc::concurrency::TriggerVariable rxTrigger;
122 
123  std::string name;
124  std::string localTargetAddress;
125  std::string brokerTargetAddress;
126  std::string brokerName;
127  std::string
129  private:
130  std::string randomID;
131  std::atomic<connection_status> tx_status{
132  connection_status::startup};
133  gmlc::concurrency::TriggerVariable txTrigger;
134  std::atomic<bool> operating{false};
135  const bool singleThread{false};
136 
137  protected:
138  bool mRequireBrokerConnection{
139  false};
140  bool serverMode{true};
141  bool autoBroker{false};
142  bool useJsonSerialization{false};
143  bool observer{false};
144 
145  std::chrono::milliseconds connectionTimeout{4000};
146  int maxMessageSize = 16 * 1024;
147  int maxMessageCount = 512;
148  std::atomic<bool> requestDisconnect{false};
149  std::function<void(ActionMessage&&)>
151  std::function<void(int level, const std::string& name, const std::string& message)>
153  gmlc::containers::BlockingPriorityQueue<std::pair<route_id, ActionMessage>>
155  // closing the files or connection can take some time so there is a need for inter-thread
156  // communication to not spit out warning messages if it is in the process of disconnecting
157  std::atomic<bool> disconnecting{
158  false};
159  InterfaceNetworks interfaceNetwork = InterfaceNetworks::LOCAL;
160 
161  private:
162  std::thread queue_transmitter;
163  std::thread queue_watcher;
164  std::mutex threadSyncLock;
165  virtual void queue_rx_function() = 0;
166  virtual void queue_tx_function() = 0;
167  virtual void closeTransmitter();
168  virtual void closeReceiver();
169  virtual void reconnectTransmitter();
170  virtual void reconnectReceiver();
171  protected:
172  void setTxStatus(connection_status txStatus);
173  void setRxStatus(connection_status rxStatus);
174  connection_status getRxStatus() const { return rx_status.load(); }
175  connection_status getTxStatus() const { return tx_status.load(); }
178  bool propertyLock();
179  void propertyUnLock();
181  void join_tx_rx_thread();
183  const std::string& getRandomID() const { return randomID; }
184 
185  private:
186  gmlc::concurrency::TripWireDetector
187  tripDetector;
188 };
189 
190 namespace CommFactory {
192  class CommBuilder {
193  public:
194  virtual std::unique_ptr<CommsInterface> build() = 0;
195  };
196 
198  template<class CommTYPE>
199  class CommTypeBuilder final: public CommBuilder {
200  public:
201  static_assert(std::is_base_of<CommsInterface, CommTYPE>::value,
202  "Type does not inherit from helics::CommsInterface");
203 
204  using comm_build_type = CommTYPE;
205  virtual std::unique_ptr<CommsInterface> build() override
206  {
207  return std::make_unique<CommTYPE>();
208  }
209  };
210 
212  void defineCommBuilder(std::shared_ptr<CommBuilder> cb,
213  const std::string& commTypeName,
214  int code);
215 
217  template<class CommTYPE>
218  std::shared_ptr<CommBuilder> addCommType(const std::string& commTypeName, int code)
219  {
220  auto bld = std::make_shared<CommTypeBuilder<CommTYPE>>();
221  std::shared_ptr<CommBuilder> cbld = std::static_pointer_cast<CommBuilder>(bld);
222  defineCommBuilder(cbld, commTypeName, code);
223  return cbld;
224  }
225 
226  std::unique_ptr<CommsInterface> create(CoreType type);
227  std::unique_ptr<CommsInterface> create(const std::string& type);
228 
229 } // namespace CommFactory
230 
231 template<class X>
233  private:
234  std::atomic<X>& aref;
235  X fval;
236  X expectedValue;
237 
238  public:
239  ConditionalChangeOnDestroy(std::atomic<X>& var, X finalValue, X expValue):
240  aref(var), fval(std::move(finalValue)), expectedValue(std::move(expValue))
241  {
242  }
243  ~ConditionalChangeOnDestroy() { aref.compare_exchange_strong(expectedValue, fval); }
244 };
245 
246 } // namespace helics
helics::CommsInterface::brokerTargetAddress
std::string brokerTargetAddress
the base for the broker address
Definition: CommsInterface.hpp:125
helics::InterfaceNetworks
InterfaceNetworks
Definition: NetworkBrokerData.hpp:16
helics::CommsInterface::name
std::string name
the name of the object
Definition: CommsInterface.hpp:123
helics::NetworkBrokerData::maxMessageSize
int maxMessageSize
maximum message size
Definition: NetworkBrokerData.hpp:56
helics::CoreType
CoreType
Definition: CoreTypes.hpp:36
helics::NetworkBrokerData::localInterface
std::string localInterface
the interface to use for the local connection
Definition: NetworkBrokerData.hpp:48
helics::CommsInterface::thread_generation
thread_generation
Definition: CommsInterface.hpp:30
helics::NetworkBrokerData::maxMessageCount
int maxMessageCount
maximum message count
Definition: NetworkBrokerData.hpp:57
HELICS_LOG_LEVEL_WARNING
@ HELICS_LOG_LEVEL_WARNING
Definition: helics_enums.h:185
helics::NetworkBrokerData::autobroker
bool autobroker
flag for specifying an automatic broker generation
Definition: NetworkBrokerData.hpp:63
helics::CommsInterface::brokerInitString
std::string brokerInitString
the initialization string for any automatically generated broker
Definition: CommsInterface.hpp:128
helics::NetworkBrokerData
Definition: NetworkBrokerData.hpp:36
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::NetworkBrokerData::observer
bool observer
specify that the network connection is used for observation only
Definition: NetworkBrokerData.hpp:69
helics::CommsInterface::txQueue
gmlc::containers::BlockingPriorityQueue< std::pair< route_id, ActionMessage > > txQueue
set of messages waiting to be transmitted
Definition: CommsInterface.hpp:154
helics::CommsInterface::CommsInterface
CommsInterface()=default
helics::CommFactory::CommTypeBuilder
Definition: CommsInterface.hpp:199
helics::route_id
Definition: GlobalFederateId.hpp:168
helics::CommsInterface::ActionCallback
std::function< void(ActionMessage &&)> ActionCallback
the callback for what to do with a received message
Definition: CommsInterface.hpp:150
helics::ActionMessage::messageID
int32_t messageID
8 – message ID for a variety of purposes
Definition: ActionMessage.hpp:36
helics::ActionMessage::setExtraData
void setExtraData(int32_t data)
Definition: ActionMessage.hpp:157
helics::NetworkBrokerData::brokerInitString
std::string brokerInitString
a string containing arguments for the broker initialization
Definition: NetworkBrokerData.hpp:49
helics::CommFactory::CommBuilder
Definition: CommsInterface.hpp:192
helics::NetworkBrokerData::brokerAddress
std::string brokerAddress
the address or domain name of the broker
Definition: NetworkBrokerData.hpp:47
helics::ActionMessage::payload
SmallBuffer payload
buffer to contain the data payload
Definition: ActionMessage.hpp:48
helics::CommsInterface::localTargetAddress
std::string localTargetAddress
the base for the receive address
Definition: CommsInterface.hpp:124
helics::CommsInterface::brokerName
std::string brokerName
the identifier for the broker
Definition: CommsInterface.hpp:126
helics::CommsInterface::connection_status
connection_status
Definition: CommsInterface.hpp:108
helics::CommsInterface
Definition: CommsInterface.hpp:26
helics::route_id::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:176
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::ConditionalChangeOnDestroy
Definition: CommsInterface.hpp:232
helics::CommsInterface::loggingCallback
std::function< void(int level, const std::string &name, const std::string &message)> loggingCallback
callback for logging
Definition: CommsInterface.hpp:152
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:181
helics::NetworkBrokerData::server_mode
ServerModeOptions server_mode
setup a server mode
Definition: NetworkBrokerData.hpp:70
helics::BrokerFactory::create
std::shared_ptr< Broker > create(CoreType type, const std::string &configureString)
Definition: BrokerFactory.cpp:98
helics::NetworkBrokerData::connectionAddress
std::string connectionAddress
the address for connecting
Definition: NetworkBrokerData.hpp:50
helics::NetworkBrokerData::brokerName
std::string brokerName
the identifier for the broker
Definition: NetworkBrokerData.hpp:46
HELICS_LOG_LEVEL_INTERFACES
@ HELICS_LOG_LEVEL_INTERFACES
Definition: helics_enums.h:192
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
helics::CommsInterface::getRandomID
const std::string & getRandomID() const
Definition: CommsInterface.hpp:183
helics::InterfaceNetworks::LOCAL
@ LOCAL
just open local ports