helics  3.5.2
CommsInterface.hpp
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "NetworkBrokerData.hpp"
10 #include "gmlc/concurrency/TriggerVariable.hpp"
11 #include "gmlc/concurrency/TripWire.hpp"
12 #include "gmlc/containers/BlockingPriorityQueue.hpp"
13 #include "helics/core/ActionMessage.hpp"
14 
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <thread>
19 #include <utility>
20 
21 namespace helics {
22 
26  public:
29  enum class thread_generation {
30  single,
31  dual
32  };
34  CommsInterface() = default;
35  explicit CommsInterface(thread_generation threads);
37  virtual ~CommsInterface();
38 
40  virtual void loadNetworkInfo(const NetworkBrokerData& netInfo);
41  void loadTargetInfo(std::string_view localTarget,
42  std::string_view brokerTarget,
43  gmlc::networking::InterfaceNetworks targetNetwork =
44  gmlc::networking::InterfaceNetworks::LOCAL);
47  void transmit(route_id rid, const ActionMessage& cmd);
50  void transmit(route_id rid, ActionMessage&& cmd);
53  void addRoute(route_id rid, std::string_view routeInfo);
55  void removeRoute(route_id rid);
59  bool connect();
60 
63  void disconnect();
64 
66  bool reconnect();
68  void setName(const std::string& commName);
69 
72  void setRequireBrokerConnection(bool requireBrokerConnection);
73 
76  void setCallback(std::function<void(ActionMessage&&)> callback);
79  void setLoggingCallback(
80  std::function<void(int level, std::string_view name, std::string_view message)> callback);
83  void setMessageSize(int maxMsgSize, int maxCount);
86  bool isConnected() const;
87 
91  void setTimeout(std::chrono::milliseconds timeOut);
93  virtual void setFlag(std::string_view flag, bool val);
95  void setServerMode(bool serverActive);
96 
98  void logWarning(std::string_view message) const;
100  void logError(std::string_view message) const;
102  void logMessage(std::string_view message) const;
103 
104  protected:
107  enum class ConnectionStatus : int {
108 
109  STARTUP = -1,
110  CONNECTED = 0,
111  RECONNECTING = 1,
112  TERMINATED = 2,
113  ERRORED = 4
114  };
115 
116  private:
118  std::atomic<ConnectionStatus> rxStatus{ConnectionStatus::STARTUP};
119 
120  protected:
121  gmlc::concurrency::TriggerVariable rxTrigger;
122 
123  std::string name;
124  std::string localTargetAddress;
125  std::string brokerTargetAddress;
126  std::string brokerName;
128  std::string brokerInitString;
129 
130  private:
131  std::string randomID;
133  std::atomic<ConnectionStatus> txStatus{ConnectionStatus::STARTUP};
134  gmlc::concurrency::TriggerVariable txTrigger;
135  std::atomic<bool> operating{false};
136  const bool singleThread{false};
137 
138  protected:
140  false};
141  bool serverMode{true};
142  bool autoBroker{false};
143  bool useJsonSerialization{false};
144  bool observer{false};
146  std::chrono::milliseconds connectionTimeout{4000};
147  int maxMessageSize = 16 * 1024;
148  int maxMessageCount = 512;
149  std::atomic<bool> requestDisconnect{false};
150  std::function<void(ActionMessage&&)>
152  std::function<void(int level, std::string_view name, std::string_view message)>
154  gmlc::containers::BlockingPriorityQueue<std::pair<route_id, ActionMessage>>
156  // closing the files or connection can take some time so there is a need for inter-thread
157  // communication to not spit out warning messages if it is in the process of disconnecting
158  std::atomic<bool> disconnecting{
159  false};
160  gmlc::networking::InterfaceNetworks interfaceNetwork{
161  gmlc::networking::InterfaceNetworks::LOCAL};
162 
163  private:
164  std::thread queue_transmitter;
165  std::thread queue_watcher;
166  std::mutex threadSyncLock;
167  virtual void queue_rx_function() = 0;
168  virtual void queue_tx_function() = 0;
169  virtual void closeTransmitter();
170  virtual void closeReceiver();
171  virtual void reconnectTransmitter();
172  virtual void reconnectReceiver();
173  protected:
174  void setTxStatus(ConnectionStatus status);
175  void setRxStatus(ConnectionStatus status);
176  ConnectionStatus getRxStatus() const { return rxStatus.load(); }
177  ConnectionStatus getTxStatus() const { return txStatus.load(); }
180  bool propertyLock();
181  void propertyUnLock();
183  void join_tx_rx_thread();
185  const std::string& getRandomID() const { return randomID; }
186 
187  private:
188  gmlc::concurrency::TripWireDetector
189  tripDetector;
190 };
191 
192 namespace CommFactory {
194  class CommBuilder {
195  public:
196  virtual std::unique_ptr<CommsInterface> build() = 0;
197  };
198 
200  template<class CommTYPE>
201  class CommTypeBuilder final: public CommBuilder {
202  public:
203  static_assert(std::is_base_of<CommsInterface, CommTYPE>::value,
204  "Type does not inherit from helics::CommsInterface");
205 
206  using comm_build_type = CommTYPE;
207  virtual std::unique_ptr<CommsInterface> build() override
208  {
209  return std::make_unique<CommTYPE>();
210  }
211  };
212 
214  void defineCommBuilder(std::shared_ptr<CommBuilder> builder,
215  std::string_view commTypeName,
216  int code);
217 
219  template<class CommTYPE>
220  std::shared_ptr<CommBuilder> addCommType(std::string_view commTypeName, int code)
221  {
222  auto bld = std::make_shared<CommTypeBuilder<CommTYPE>>();
223  std::shared_ptr<CommBuilder> cbld = std::static_pointer_cast<CommBuilder>(bld);
224  defineCommBuilder(cbld, commTypeName, code);
225  return cbld;
226  }
227 
228  std::unique_ptr<CommsInterface> create(CoreType type);
229  std::unique_ptr<CommsInterface> create(std::string_view type);
230 
231 } // namespace CommFactory
232 
233 template<class X>
235  private:
236  std::atomic<X>& aref;
237  X fval;
238  X expectedValue;
239 
240  public:
241  ConditionalChangeOnDestroy(std::atomic<X>& var, X finalValue, X expValue):
242  aref(var), fval(std::move(finalValue)), expectedValue(std::move(expValue))
243  {
244  }
245  ~ConditionalChangeOnDestroy() { aref.compare_exchange_strong(expectedValue, fval); }
246 };
247 
248 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: CommsInterface.hpp:194
Definition: CommsInterface.hpp:201
Definition: CommsInterface.hpp:25
void setRequireBrokerConnection(bool requireBrokerConnection)
Definition: CommsInterface.cpp:370
ConnectionStatus
Definition: CommsInterface.hpp:107
@ STARTUP
the connection is in STARTUP mode
@ TERMINATED
the connection has been TERMINATED
@ RECONNECTING
we are trying reconnect
@ ERRORED
some ERRORED occurred on the connection
void join_tx_rx_thread()
Definition: CommsInterface.cpp:459
std::string name
the name of the object
Definition: CommsInterface.hpp:123
void transmit(route_id rid, const ActionMessage &cmd)
Definition: CommsInterface.cpp:171
thread_generation
Definition: CommsInterface.hpp:29
@ dual
indicate that separate threads are used, 1 for transmission and 1 for reception
@ single
indicate that a single thread is used for transmitting and receiving
void setName(const std::string &commName)
Definition: CommsInterface.cpp:378
virtual void setFlag(std::string_view flag, bool val)
Definition: CommsInterface.cpp:532
std::function< void(int level, std::string_view name, std::string_view message)> loggingCallback
callback for logging
Definition: CommsInterface.hpp:153
int maxMessageCount
the maximum number of message to buffer (if needed)
Definition: CommsInterface.hpp:148
void setTimeout(std::chrono::milliseconds timeOut)
Definition: CommsInterface.cpp:543
const std::string & getRandomID() const
Definition: CommsInterface.hpp:185
bool reconnect()
Definition: CommsInterface.cpp:472
void removeRoute(route_id rid)
Definition: CommsInterface.cpp:198
bool propertyLock()
Definition: CommsInterface.cpp:154
void disconnect()
Definition: CommsInterface.cpp:386
int maxMessageSize
the maximum message size for the queues (if needed)
Definition: CommsInterface.hpp:147
void setServerMode(bool serverActive)
Definition: CommsInterface.cpp:551
bool serverMode
some comms have a server mode and non-server mode
Definition: CommsInterface.hpp:141
bool observer
true for connections that are for observation only
Definition: CommsInterface.hpp:144
void logMessage(std::string_view message) const
Definition: CommsInterface.cpp:564
virtual ~CommsInterface()
Definition: CommsInterface.cpp:100
virtual void loadNetworkInfo(const NetworkBrokerData &netInfo)
Definition: CommsInterface.cpp:105
bool autoBroker
the broker should be automatically generated if needed
Definition: CommsInterface.hpp:142
std::chrono::milliseconds connectionTimeout
Definition: CommsInterface.hpp:146
void setMessageSize(int maxMsgSize, int maxCount)
Definition: CommsInterface.cpp:519
bool isConnected() const
Definition: CommsInterface.cpp:559
std::string localTargetAddress
the base for the receive address
Definition: CommsInterface.hpp:124
void setCallback(std::function< void(ActionMessage &&)> callback)
Definition: CommsInterface.cpp:502
std::function< void(ActionMessage &&)> ActionCallback
the callback for what to do with a received message
Definition: CommsInterface.hpp:151
std::string brokerTargetAddress
the base for the broker address
Definition: CommsInterface.hpp:125
void logWarning(std::string_view message) const
Definition: CommsInterface.cpp:573
std::string brokerInitString
the initialization string for any automatically generated broker
Definition: CommsInterface.hpp:128
std::string brokerName
Definition: CommsInterface.hpp:126
bool useJsonSerialization
true to make all connections use JSON serialization
Definition: CommsInterface.hpp:143
void logError(std::string_view message) const
Definition: CommsInterface.cpp:582
bool connect()
Definition: CommsInterface.cpp:263
std::atomic< bool > disconnecting
flag indicating that the comm system is in the process of disconnecting
Definition: CommsInterface.hpp:158
void addRoute(route_id rid, std::string_view routeInfo)
Definition: CommsInterface.cpp:189
void setLoggingCallback(std::function< void(int level, std::string_view name, std::string_view message)> callback)
Definition: CommsInterface.cpp:510
std::atomic< bool > requestDisconnect
flag gets set when disconnect is called
Definition: CommsInterface.hpp:149
bool mRequireBrokerConnection
specify that the comms should assume we have a broker
Definition: CommsInterface.hpp:139
gmlc::containers::BlockingPriorityQueue< std::pair< route_id, ActionMessage > > txQueue
set of messages waiting to be transmitted
Definition: CommsInterface.hpp:155
Definition: CommsInterface.hpp:234
Definition: NetworkBrokerData.hpp:23
Definition: GlobalFederateId.hpp:187
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
CoreType
Definition: CoreTypes.hpp:46