helics  2.8.1
CommsInterface.hpp
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "NetworkBrokerData.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
11 #include "gmlc/concurrency/TripWire.hpp"
12 #include "gmlc/containers/BlockingPriorityQueue.hpp"
13 #include "helics/core/ActionMessage.hpp"
14 
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <utility>
20 
21 namespace helics {
22 enum class interface_networks : char;
23 
27  public:
30  enum class thread_generation {
31  single, // indicate that a single thread is used for transmitting and receiving
32  dual // indicate that separate threads are used 1 for transmission and one for reception
33  };
35  CommsInterface() = default;
36  explicit CommsInterface(thread_generation threads);
38  virtual ~CommsInterface();
39 
41  virtual void loadNetworkInfo(const NetworkBrokerData& netInfo);
42  void loadTargetInfo(const std::string& localTarget,
43  const std::string& brokerTarget,
47  void transmit(route_id rid, const ActionMessage& cmd);
50  void transmit(route_id rid, ActionMessage&& cmd);
53  void addRoute(route_id rid, const std::string& routeInfo);
55  void removeRoute(route_id rid);
59  bool connect();
60 
63  void disconnect();
64 
66  bool reconnect();
68  void setName(const std::string& commName);
69 
72  void setRequireBrokerConnection(bool requireBrokerConnection);
73 
76  void setCallback(std::function<void(ActionMessage&&)> callback);
79  void setLoggingCallback(
80  std::function<void(int level, const std::string& name, const std::string& message)>
81  callback);
84  void setMessageSize(int maxMsgSize, int maxCount);
87  bool isConnected() const;
88 
92  void setTimeout(std::chrono::milliseconds timeOut);
94  virtual void setFlag(const std::string& flag, bool val);
96  void setServerMode(bool serverActive);
97 
99  void logWarning(const std::string& message) const;
101  void logError(const std::string& message) const;
103  void logMessage(const std::string& message) const;
104 
105  protected:
108  enum class connection_status : int {
109 
110  startup = -1,
111  connected = 0,
112  reconnecting = 1,
113  terminated = 2,
114  error = 4
115  };
116 
117  private:
118  std::atomic<connection_status> rx_status{
119  connection_status::startup};
120  protected:
121  gmlc::concurrency::TriggerVariable rxTrigger;
122 
123  std::string name;
124  std::string localTargetAddress;
125  std::string brokerTargetAddress;
126  std::string brokerName;
127  std::string
129  private:
130  std::string randomID;
131  std::atomic<connection_status> tx_status{
132  connection_status::startup};
133  gmlc::concurrency::TriggerVariable txTrigger;
134  std::atomic<bool> operating{false};
135  const bool singleThread{false};
136 
137  protected:
138  bool mRequireBrokerConnection{
139  false};
140  bool serverMode{true};
141  bool autoBroker{false};
142  bool useJsonSerialization{false};
143 
144  std::chrono::milliseconds connectionTimeout{4000};
145  int maxMessageSize = 16 * 1024;
146  int maxMessageCount = 512;
147  std::atomic<bool> requestDisconnect{false};
148  std::function<void(ActionMessage&&)>
150  std::function<void(int level, const std::string& name, const std::string& message)>
152  gmlc::containers::BlockingPriorityQueue<std::pair<route_id, ActionMessage>>
154  // closing the files or connection can take some time so there is a need for inter-thread
155  // communication to not spit out warning messages if it is in the process of disconnecting
156  std::atomic<bool> disconnecting{
157  false};
159 
160  private:
161  std::thread queue_transmitter;
162  std::thread queue_watcher;
163  std::mutex threadSyncLock;
164  virtual void queue_rx_function() = 0;
165  virtual void queue_tx_function() = 0;
166  virtual void closeTransmitter();
167  virtual void closeReceiver();
168  virtual void reconnectTransmitter();
169  virtual void reconnectReceiver();
170  protected:
171  void setTxStatus(connection_status txStatus);
172  void setRxStatus(connection_status rxStatus);
173  connection_status getRxStatus() const { return rx_status.load(); }
174  connection_status getTxStatus() const { return tx_status.load(); }
177  bool propertyLock();
178  void propertyUnLock();
180  void join_tx_rx_thread();
182  const std::string& getRandomID() const { return randomID; }
183 
184  private:
185  gmlc::concurrency::TripWireDetector
186  tripDetector;
187 };
188 
189 namespace CommFactory {
191  class CommBuilder {
192  public:
193  virtual std::unique_ptr<CommsInterface> build() = 0;
194  };
195 
197  template<class CommTYPE>
198  class CommTypeBuilder final: public CommBuilder {
199  public:
200  static_assert(std::is_base_of<CommsInterface, CommTYPE>::value,
201  "Type does not inherit from helics::CommsInterface");
202 
203  using comm_build_type = CommTYPE;
204  virtual std::unique_ptr<CommsInterface> build() override
205  {
206  return std::make_unique<CommTYPE>();
207  }
208  };
209 
211  void defineCommBuilder(std::shared_ptr<CommBuilder> cb,
212  const std::string& commTypeName,
213  int code);
214 
216  template<class CommTYPE>
217  std::shared_ptr<CommBuilder> addCommType(const std::string& commTypeName, int code)
218  {
219  auto bld = std::make_shared<CommTypeBuilder<CommTYPE>>();
220  std::shared_ptr<CommBuilder> cbld = std::static_pointer_cast<CommBuilder>(bld);
221  defineCommBuilder(cbld, commTypeName, code);
222  return cbld;
223  }
224 
225  std::unique_ptr<CommsInterface> create(core_type type);
226  std::unique_ptr<CommsInterface> create(const std::string& type);
227 
228 } // namespace CommFactory
229 
230 template<class X>
232  private:
233  std::atomic<X>& aref;
234  X fval;
235  X expectedValue;
236 
237  public:
238  ConditionalChangeOnDestroy(std::atomic<X>& var, X finalValue, X expValue):
239  aref(var), fval(std::move(finalValue)), expectedValue(std::move(expValue))
240  {
241  }
242  ~ConditionalChangeOnDestroy() { aref.compare_exchange_strong(expectedValue, fval); }
243 };
244 
245 } // namespace helics
helics::CommsInterface::brokerTargetAddress
std::string brokerTargetAddress
the base for the broker address
Definition: CommsInterface.hpp:125
helics::NetworkBrokerData::server_mode
server_mode_options server_mode
setup a server mode
Definition: NetworkBrokerData.hpp:69
helics::CommsInterface::name
std::string name
the name of the object
Definition: CommsInterface.hpp:123
helics::core_type
core_type
Definition: core-types.hpp:37
helics::NetworkBrokerData::maxMessageSize
int maxMessageSize
maximum message size
Definition: NetworkBrokerData.hpp:56
helics::NetworkBrokerData::localInterface
std::string localInterface
the interface to use for the local connection
Definition: NetworkBrokerData.hpp:48
helics::CommsInterface::thread_generation
thread_generation
Definition: CommsInterface.hpp:30
helics::NetworkBrokerData::maxMessageCount
int maxMessageCount
maximum message count
Definition: NetworkBrokerData.hpp:57
helics::interface_networks::local
@ local
just open local ports
helics::NetworkBrokerData::autobroker
bool autobroker
flag for specifying an automatic broker generation
Definition: NetworkBrokerData.hpp:63
helics::CommsInterface::brokerInitString
std::string brokerInitString
the initialization string for any automatically generated broker
Definition: CommsInterface.hpp:128
helics::NetworkBrokerData
Definition: NetworkBrokerData.hpp:36
helics::ActionMessage
Definition: ActionMessage.hpp:29
helics_log_level_warning
@ helics_log_level_warning
Definition: helics_enums.h:169
helics::CommsInterface::txQueue
gmlc::containers::BlockingPriorityQueue< std::pair< route_id, ActionMessage > > txQueue
set of messages waiting to be transmitted
Definition: CommsInterface.hpp:153
helics::CommsInterface::CommsInterface
CommsInterface()=default
helics_log_level_error
@ helics_log_level_error
Definition: helics_enums.h:167
helics::interface_networks
interface_networks
Definition: NetworkBrokerData.hpp:16
helics::CommFactory::CommTypeBuilder
Definition: CommsInterface.hpp:198
helics::route_id
Definition: global_federate_id.hpp:171
helics::CommsInterface::ActionCallback
std::function< void(ActionMessage &&)> ActionCallback
the callback for what to do with a received message
Definition: CommsInterface.hpp:149
helics_log_level_interfaces
@ helics_log_level_interfaces
Definition: helics_enums.h:176
helics::ActionMessage::payload
std::string payload
Definition: ActionMessage.hpp:44
helics::ActionMessage::messageID
int32_t messageID
8 – message ID for a variety of purposes
Definition: ActionMessage.hpp:35
helics::ActionMessage::setExtraData
void setExtraData(int32_t data)
Definition: ActionMessage.hpp:157
helics::NetworkBrokerData::brokerInitString
std::string brokerInitString
a string containing arguments for the broker initialization
Definition: NetworkBrokerData.hpp:49
helics::CommFactory::CommBuilder
Definition: CommsInterface.hpp:191
helics::NetworkBrokerData::brokerAddress
std::string brokerAddress
the address or domain name of the broker
Definition: NetworkBrokerData.hpp:47
helics::CommsInterface::localTargetAddress
std::string localTargetAddress
the base for the receive address
Definition: CommsInterface.hpp:124
helics::CommsInterface::brokerName
std::string brokerName
the identifier for the broker
Definition: CommsInterface.hpp:126
helics::CommsInterface::connection_status
connection_status
Definition: CommsInterface.hpp:108
helics::CommsInterface
Definition: CommsInterface.hpp:26
helics::BrokerFactory::create
std::shared_ptr< Broker > create(core_type type, const std::string &configureString)
Definition: BrokerFactory.cpp:100
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
error
@ error
only print errors
Definition: loggingHelper.hpp:22
helics::ConditionalChangeOnDestroy
Definition: CommsInterface.hpp:231
helics::CommsInterface::loggingCallback
std::function< void(int level, const std::string &name, const std::string &message)> loggingCallback
callback for logging
Definition: CommsInterface.hpp:151
helics::route_id::baseValue
constexpr base_type baseValue() const
Definition: global_federate_id.hpp:179
helics::NetworkBrokerData::connectionAddress
std::string connectionAddress
the address for connecting
Definition: NetworkBrokerData.hpp:50
helics::NetworkBrokerData::brokerName
std::string brokerName
the identifier for the broker
Definition: NetworkBrokerData.hpp:46
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
helics::CommsInterface::getRandomID
const std::string & getRandomID() const
Definition: CommsInterface.hpp:182