helics  3.5.2
CoreBroker.hpp
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/JsonBuilder.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
12 #include "Broker.hpp"
13 #include "BrokerBase.hpp"
14 #include "FederateIdExtra.hpp"
15 #include "HandleManager.hpp"
16 #include "TimeDependencies.hpp"
17 #include "UnknownHandleManager.hpp"
18 #include "gmlc/concurrency/DelayedObjects.hpp"
19 #include "gmlc/concurrency/TriggerVariable.hpp"
20 #include "gmlc/containers/AirLock.hpp"
21 #include "gmlc/containers/DualStringMappedVector.hpp"
22 #include "gmlc/containers/SimpleQueue.hpp"
23 
24 #include <any>
25 #include <array>
26 #include <atomic>
27 #include <deque>
28 #include <functional>
29 #include <map>
30 #include <memory>
31 #include <string>
32 #include <thread>
33 #include <tuple>
34 #include <unordered_map>
35 #include <utility>
36 #include <vector>
37 
38 namespace helics {
39 
41 enum class ConnectionState : std::uint8_t {
42  CONNECTED = 0,
43  INIT_REQUESTED = 1,
44  OPERATING = 2,
45  ERROR_STATE = 40,
46  REQUEST_DISCONNECT = 48,
47  DISCONNECTED = 50
48 };
49 
51 enum class QueryReuse : std::uint8_t;
52 
54 struct BasicFedInfo {
55  const std::string name;
59  ConnectionState state{ConnectionState::CONNECTED};
60  bool nonCounting{false};
61  bool observer{false};
62  bool dynamic{false};
63  bool reentrant{false};
64  explicit BasicFedInfo(std::string_view fedname): name(fedname) {}
65 };
66 
69  public:
70  const std::string name;
71 
76  ConnectionState state{ConnectionState::CONNECTED};
78  bool _hasTimeDependency{false};
79  bool _core{false};
80  bool _nonLocal{false};
81  bool _route_key{false};
82  bool _sent_disconnect_ack{false};
83  bool _disable_ping{false};
84  bool _observer{false};
85  bool initIterating{false};
86  std::string routeInfo;
87  explicit BasicBrokerInfo(std::string_view brokerName): name(brokerName) {}
88 };
89 
90 class TimeCoordinator;
91 class Logger;
92 class TimeoutMonitor;
93 
98 class CoreBroker: public Broker, public BrokerBase {
99  protected:
100  bool _gateway = false;
101  private:
102  std::atomic<bool> _isRoot{false};
103  bool isRootc{false};
104  bool connectionEstablished{false};
105  bool initIterating{false};
106  int routeCount = 1;
108  gmlc::containers::
109  DualStringMappedVector<BasicFedInfo, GlobalFederateId, reference_stability::unstable>
110  mFederates;
112  gmlc::containers::
113  DualStringMappedVector<BasicBrokerInfo, GlobalBrokerId, reference_stability::unstable>
114  mBrokers;
116  std::string mPreviousLocalBrokerIdentifier;
117 
118  HandleManager handles;
119  UnknownHandleManager unknownHandles;
121  std::vector<std::pair<std::string, GlobalFederateId>> delayedDependencies;
123  std::unordered_map<GlobalFederateId, LocalFederateId> global_id_translation;
125  std::unordered_map<GlobalFederateId, route_id> routing_table;
127  std::unordered_map<std::string, route_id> knownExternalEndpoints;
128  std::unordered_map<std::string, std::string> global_values;
129  std::unordered_map<std::string, std::int64_t> renamers;
130  std::mutex name_mutex_;
131  std::atomic<int> queryCounter{1}; // counter for active queries going to the local API
132  bool force_connection{false};
133  gmlc::concurrency::DelayedObjects<std::string> activeQueries;
135  std::vector<std::tuple<fileops::JsonMapBuilder, std::vector<ActionMessage>, QueryReuse>>
136  mapBuilders;
138  std::deque<std::pair<int32_t, decltype(std::chrono::steady_clock::now())>> queryTimeouts;
139 
140  std::vector<ActionMessage> earlyMessages;
141  gmlc::concurrency::TriggerVariable disconnection;
143  std::unique_ptr<TimeoutMonitor> timeoutMon;
144  std::atomic<uint16_t> nextAirLock{0};
146  std::array<gmlc::containers::AirLock<std::any>, 3> dataAirlocks;
147  // variables for a time logging federate
148  std::string
149  mTimeMonitorFederate;
150  GlobalFederateId mTimeMonitorFederateId{};
152  GlobalFederateId mTimeMonitorLocalFederateId{};
153  Time mTimeMonitorPeriod{timeZero};
154  Time mTimeMonitorLastLogTime{Time::minVal()};
155  Time mTimeMonitorCurrentTime{Time::minVal()};
156  std::atomic<double> simTime{mInvalidSimulationTime};
157  Time mNextTimeBarrier{Time::maxVal()};
158  private:
162  virtual void processCommand(ActionMessage&& command) override;
168  void processPriorityCommand(ActionMessage&& command) override;
169 
171  void processBrokerConfigureCommands(ActionMessage& cmd);
173  gmlc::containers::SimpleQueue<ActionMessage> delayTransmitQueue;
174  /* function to transmit the delayed messages*/
175  void transmitDelayedMessages();
179  void routeMessage(ActionMessage& cmd, GlobalFederateId dest);
180  void routeMessage(ActionMessage&& cmd, GlobalFederateId dest);
183  void routeMessage(const ActionMessage& cmd);
184  void routeMessage(ActionMessage&& cmd);
186  void transmitToParent(ActionMessage&& cmd);
188  void propagateError(ActionMessage&& cmd);
190  void broadcast(ActionMessage& cmd);
191 
192  route_id fillMessageRouteInformation(ActionMessage& mess);
193 
195  void executeInitializationOperations(bool iterating);
197  uint16_t getNextAirlockIndex();
200  bool verifyBrokerKey(ActionMessage& mess) const;
203  bool verifyBrokerKey(std::string_view key) const;
204 
205  public:
208  virtual bool connect() override final;
211  virtual void disconnect() override final;
213  void unregister();
218  virtual void processDisconnect(bool skipUnregister = false) override final;
220  virtual bool isConnected() const override final;
223  virtual void setAsRoot() override final;
226  virtual bool isRoot() const override final { return _isRoot; };
227 
228  virtual bool isOpenToNewFederates() const override;
229 
230  virtual void setLoggingCallback(
231  std::function<void(int, std::string_view, std::string_view)> logFunction) override final;
232 
233  virtual bool waitForDisconnect(
234  std::chrono::milliseconds msToWait = std::chrono::milliseconds(0)) const override final;
235 
236  virtual void setTimeBarrier(Time barrierTime) override final;
237 
238  virtual void clearTimeBarrier() override final;
239 
240  virtual void globalError(int32_t errorCode, std::string_view errorString) override final;
241 
242  private:
245  virtual bool brokerConnect() = 0;
248  virtual void brokerDisconnect() = 0;
249 
250  protected:
257  virtual void transmit(route_id route, const ActionMessage& command) = 0;
265  virtual void transmit(route_id route, ActionMessage&& command) = 0;
274  virtual void addRoute(route_id rid, int interfaceId, std::string_view routeInfo) = 0;
278  virtual void removeRoute(route_id rid) = 0;
279 
280  public:
283  explicit CoreBroker(bool setAsRootBroker = false) noexcept;
285  explicit CoreBroker(std::string_view broker_name);
287  virtual ~CoreBroker();
289  virtual void configure(std::string_view configureString) override final;
292  virtual void configureFromArgs(int argc, char* argv[]) override final;
294  virtual void configureFromVector(std::vector<std::string> args) override final;
295 
299  bool allInitReady() const;
302 
304  void setIdentifier(std::string_view name);
306  virtual const std::string& getIdentifier() const override final { return identifier; }
307  virtual const std::string& getAddress() const override final;
308  virtual void setLoggingLevel(int logLevel) override final;
309  virtual void setLogFile(std::string_view lfile) override final;
310  virtual std::string
311  query(std::string_view target,
312  std::string_view queryStr,
313  HelicsSequencingModes mode = HELICS_SEQUENCING_MODE_FAST) override final;
314  virtual void setGlobal(std::string_view valueName, std::string_view value) override final;
315  virtual void sendCommand(std::string_view target,
316  std::string_view commandStr,
317  HelicsSequencingModes mode) override final;
318  virtual void makeConnections(const std::string& file) override final;
319  virtual void linkEndpoints(std::string_view source, std::string_view target) override final;
320  virtual void dataLink(std::string_view publication, std::string_view input) override final;
321 
322  virtual void addSourceFilterToEndpoint(std::string_view filter,
323  std::string_view endpoint) override final;
324 
325  virtual void addDestinationFilterToEndpoint(std::string_view filter,
326  std::string_view endpoint) override final;
327  virtual void addAlias(std::string_view interfaceKey, std::string_view alias) override final;
328 
329  protected:
330  virtual std::shared_ptr<helicsCLI11App> generateCLI() override;
331 
332  virtual double getSimulationTime() const override;
333 
334  private:
335  int getCountableFederates() const;
337  void checkDependencies();
338 
339  void connectInterfaces(
340  const BasicHandleInfo& origin,
341  uint32_t originFlags,
342  const BasicHandleInfo& target,
343  uint32_t targetFlags,
344  std::pair<action_message_def::action_t, action_message_def::action_t> actions);
345 
347  void findAndNotifyInputTargets(BasicHandleInfo& handleInfo, const std::string& key);
348  void findAndNotifyPublicationTargets(BasicHandleInfo& handleInfo, const std::string& key);
349 
350  void findAndNotifyFilterTargets(BasicHandleInfo& handleInfo, const std::string& key);
351  void findAndNotifyEndpointTargets(BasicHandleInfo& handleInfo, const std::string& key);
352 
353  void findRegexMatch(const std::string& target,
354  InterfaceType type,
355  GlobalHandle handle,
356  uint16_t flags);
358  void processDisconnectCommand(ActionMessage& command);
360  void disconnectTiming(ActionMessage& command);
362  void processBrokerDisconnect(ActionMessage& command, BasicBrokerInfo* brk);
364  void processError(ActionMessage& command);
366  void disconnectBroker(BasicBrokerInfo& brk);
368  void markAsDisconnected(GlobalBrokerId brkid);
370  void checkInFlightQueries(GlobalBrokerId brkid);
372  void checkForNamedInterface(ActionMessage& command);
374  void removeNamedTarget(ActionMessage& command);
376  void processQueryCommand(ActionMessage& cmd);
378  void processQuery(ActionMessage& message);
380  void processInitCommand(ActionMessage& cmd);
382  void checkQueryTimeouts();
384  void processQueryResponse(const ActionMessage& message);
386  void processLocalQuery(const ActionMessage& message);
388  std::string generateQueryAnswer(std::string_view request, bool force_ordering);
390  std::string quickBrokerQueries(std::string_view request) const;
392  void processCommandInstruction(ActionMessage& message);
394  void processLocalCommandInstruction(ActionMessage& message);
396  std::string getNameList(std::string_view gidString) const;
398  route_id getRoute(GlobalFederateId fedid) const;
400  route_id getRoute(int32_t fedid) const { return getRoute(GlobalFederateId(fedid)); }
401 
402  const BasicBrokerInfo* getBrokerById(GlobalBrokerId brokerid) const;
403 
404  BasicBrokerInfo* getBrokerById(GlobalBrokerId brokerid);
405 
406  void addLocalInfo(BasicHandleInfo& handleInfo, const ActionMessage& message);
407  void addPublication(ActionMessage& message);
408  void addInput(ActionMessage& message);
409  void addEndpoint(ActionMessage& message);
410  void addFilter(ActionMessage& message);
411  void addTranslator(ActionMessage& message);
412  void addDataSink(ActionMessage& message);
413 
414  bool checkInterfaceCreation(ActionMessage& message, InterfaceType type);
415  // Handle the registration of new brokers
416  void brokerRegistration(ActionMessage&& command);
420  void sendBrokerErrorAck(ActionMessage& command, std::int32_t errorCode);
421  // Helper function for linking interfaces
422  void linkInterfaces(ActionMessage& command);
423  // Handle the registration of new federates
424  void fedRegistration(ActionMessage&& command);
428  void sendFedErrorAck(ActionMessage& command, std::int32_t errorCode);
429  // bool updateSourceFilterOperator (ActionMessage &m);
431  void initializeMapBuilder(std::string_view request,
432  std::uint16_t index,
433  QueryReuse reuse,
434  bool force_ordering);
435 
436  std::string generateGlobalStatus(fileops::JsonMapBuilder& builder);
438  void sendErrorToImmediateBrokers(int errorCode);
440  void sendDisconnect(action_message_def::action_t disconnectType);
442  std::string generateFederationSummary() const;
444  void labelAsDisconnected(GlobalBrokerId brkid);
445 
447  void processTimeMonitorMessage(ActionMessage& message);
449  void loadTimeMonitor(bool firstLoad, std::string_view newFederate);
451  void generateTimeBarrier(ActionMessage& message);
452  int generateMapObjectCounter() const;
454  std::string generateRename(std::string_view name);
455  friend class TimeoutMonitor;
456 };
457 
458 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: CoreBroker.hpp:68
bool _core
if set to true the broker is a core, false is a broker
Definition: CoreBroker.hpp:79
bool _sent_disconnect_ack
indicator that the disconnect ack has been sent
Definition: CoreBroker.hpp:82
bool _hasTimeDependency
Definition: CoreBroker.hpp:78
const std::string name
the name of the broker
Definition: CoreBroker.hpp:70
bool initIterating
indicator that initIteration was requested
Definition: CoreBroker.hpp:85
bool _observer
indicator that the broker is an observer
Definition: CoreBroker.hpp:84
GlobalBrokerId global_id
the global identifier for the broker
Definition: CoreBroker.hpp:72
bool _route_key
indicator that the broker has a unique route id
Definition: CoreBroker.hpp:81
std::string routeInfo
string describing the connection information for the route
Definition: CoreBroker.hpp:86
ConnectionState state
specify the current status of the broker
Definition: CoreBroker.hpp:76
bool _disable_ping
indicator that the broker doesn't respond to pings
Definition: CoreBroker.hpp:83
bool _nonLocal
indicator that the broker has a subbroker as a parent.
Definition: CoreBroker.hpp:80
GlobalBrokerId parent
Definition: CoreBroker.hpp:74
route_id route
the identifier for the route to take to the broker
Definition: CoreBroker.hpp:73
Definition: BasicHandleInfo.hpp:20
Definition: BrokerBase.hpp:42
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:70
Definition: core/Broker.hpp:18
Definition: CoreBroker.hpp:98
void setIdentifier(std::string_view name)
Definition: CoreBroker.cpp:68
virtual void globalError(int32_t errorCode, std::string_view errorString) override final
Definition: CoreBroker.cpp:2444
virtual void dataLink(std::string_view publication, std::string_view input) override final
Definition: CoreBroker.cpp:185
virtual void addSourceFilterToEndpoint(std::string_view filter, std::string_view endpoint) override final
Definition: CoreBroker.cpp:193
virtual void linkEndpoints(std::string_view source, std::string_view target) override final
Definition: CoreBroker.cpp:177
virtual void setGlobal(std::string_view valueName, std::string_view value) override final
Definition: CoreBroker.cpp:3545
virtual void addAlias(std::string_view interfaceKey, std::string_view alias) override final
Definition: CoreBroker.cpp:210
virtual void makeConnections(const std::string &file) override final
Definition: CoreBroker.cpp:168
virtual void setLogFile(std::string_view lfile) override final
Definition: CoreBroker.cpp:3448
virtual void processDisconnect(bool skipUnregister=false) override final
Definition: CoreBroker.cpp:2468
virtual bool isRoot() const override final
Definition: CoreBroker.hpp:226
virtual void addRoute(route_id rid, int interfaceId, std::string_view routeInfo)=0
virtual bool waitForDisconnect(std::chrono::milliseconds msToWait=std::chrono::milliseconds(0)) const override final
Definition: CoreBroker.cpp:2459
void unregister()
Definition: CoreBroker.cpp:2487
virtual void setTimeBarrier(Time barrierTime) override final
Definition: CoreBroker.cpp:2424
virtual const std::string & getAddress() const override final
Definition: CoreBroker.cpp:77
virtual void configureFromVector(std::vector< std::string > args) override final
Definition: CoreBroker.cpp:2332
virtual std::shared_ptr< helicsCLI11App > generateCLI() override
Definition: CoreBroker.cpp:2352
virtual void transmit(route_id route, const ActionMessage &command)=0
virtual void disconnect() override final
Definition: CoreBroker.cpp:2505
virtual bool connect() override final
Definition: CoreBroker.cpp:2377
virtual void configure(std::string_view configureString) override final
Definition: CoreBroker.cpp:2302
virtual void configureFromArgs(int argc, char *argv[]) override final
Definition: CoreBroker.cpp:2317
virtual const std::string & getIdentifier() const override final
Definition: CoreBroker.hpp:306
bool allInitReady() const
Definition: CoreBroker.cpp:4657
virtual void clearTimeBarrier() override final
Definition: CoreBroker.cpp:2435
virtual bool isConnected() const override final
Definition: CoreBroker.cpp:2453
virtual void setLoggingLevel(int logLevel) override final
Definition: CoreBroker.cpp:3439
virtual void addDestinationFilterToEndpoint(std::string_view filter, std::string_view endpoint) override final
Definition: CoreBroker.cpp:201
bool _gateway
set to true if this broker should act as a gateway.
Definition: CoreBroker.hpp:100
virtual void setAsRoot() override final
Definition: CoreBroker.cpp:2369
virtual void removeRoute(route_id rid)=0
virtual void setLoggingCallback(std::function< void(int, std::string_view, std::string_view)> logFunction) override final
Definition: CoreBroker.cpp:117
virtual void sendCommand(std::string_view target, std::string_view commandStr, HelicsSequencingModes mode) override final
Definition: CoreBroker.cpp:3554
virtual double getSimulationTime() const override
Definition: CoreBroker.cpp:2347
ConnectionState getAllConnectionState() const
Definition: CoreBroker.cpp:4630
virtual std::string query(std::string_view target, std::string_view queryStr, HelicsSequencingModes mode=HELICS_SEQUENCING_MODE_FAST) override final
Definition: CoreBroker.cpp:3458
virtual bool isOpenToNewFederates() const override
Definition: CoreBroker.cpp:233
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: GlobalFederateId.hpp:147
Definition: helicsCLI11.hpp:46
Definition: GlobalFederateId.hpp:187
HelicsSequencingModes
Definition: helics_enums.h:425
@ HELICS_SEQUENCING_MODE_FAST
Definition: helics_enums.h:427
action_t
Definition: ActionMessageDefintions.hpp:20
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
constexpr Time timeZero
Definition: helicsTime.hpp:31
ConnectionState
Definition: CoreBroker.hpp:41
InterfaceType
Definition: CoreTypes.hpp:111
QueryReuse
Enumeration of if query result is reusable.
Definition: queryHelpers.hpp:39
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
forward declaration of QueryReuse
Definition: CoreBroker.hpp:54
const std::string name
name of the federate
Definition: CoreBroker.hpp:55
GlobalBrokerId parent
the id of the parent broker/core
Definition: CoreBroker.hpp:58
route_id route
the routing information for data to be sent to the federate
Definition: CoreBroker.hpp:57
GlobalFederateId global_id
the identification code for the federate
Definition: CoreBroker.hpp:56