helics  3.6.1
BrokerBase.hpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2025,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
14 #include "ActionMessage.hpp"
15 #include "FederateIdExtra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
17 
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <nlohmann/json_fwd.hpp>
22 #include <string>
23 #include <thread>
24 #include <utility>
25 #include <vector>
26 
27 namespace spdlog {
28 class logger;
29 }
30 
31 namespace helics {
32 class BaseTimeCoordinator;
33 class helicsCLI11App;
34 class ProfilerBuffer;
35 class LogBuffer;
36 class LogManager;
39 class BrokerBase {
40  protected:
41  static constexpr double mInvalidSimulationTime{-98763.2};
43  std::atomic<GlobalBrokerId> global_id{parent_broker_id};
48  std::atomic<int32_t> maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
49 
51  int32_t minFederateCount{1};
53  int32_t minBrokerCount{0};
54  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
55  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
57  int32_t minChildCount{0};
58  int32_t maxIterationCount{10000};
59  Time tickTimer{5.0};
60  Time timeout{30.0};
67  std::string identifier;
68  std::string brokerKey;
70  // address is mutable since during initial phases it may not be fixed so to maintain a
71  // consistent public interface for extracting it this variable may need to be updated in a
72  // constant function
73  mutable std::string address;
74 
75  std::thread queueProcessingThread;
77  std::atomic<bool> haltOperations{false};
81  bool terminate_on_error{false};
83  bool debugging{false};
85  bool observer{false};
87  bool globalTime{false};
89  bool asyncTime{false};
91  bool dynamicFederation{false};
93  bool disableDynamicSources{false};
94 
95  private:
97  std::atomic<bool> mainLoopIsRunning{false};
99  bool dumplog{false};
102  bool queueDisabled{false};
104  bool disable_timer{false};
106  std::atomic<std::size_t> messageCounter{0};
107 
108  protected:
109  std::unique_ptr<BaseTimeCoordinator> timeCoord;
110  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
111  std::shared_ptr<LogManager> mLogManager;
113  enum class BrokerState : int16_t {
114  CREATED = -10,
115  CONFIGURING = -7,
116  CONFIGURED = -6,
117  CONNECTING = -4,
118  CONNECTED = -3,
119  INITIALIZING = -1,
120  OPERATING = 0,
121  CONNECTED_ERROR = 3,
122  TERMINATING = 4,
123  TERMINATING_ERROR = 5,
124  TERMINATED = 6,
125  ERRORED = 7,
126  };
127 
128  enum class TickForwardingReasons : uint32_t {
129  NONE = 0,
130  NO_COMMS = 0x01,
131  PING_RESPONSE = 0x02,
132  QUERY_TIMEOUT = 0x04,
133  GRANT_TIMEOUT = 0x08,
134  DISCONNECT_TIMEOUT = 0x10
135  };
136  bool noAutomaticID{false};
137  bool hasTimeDependency{false};
139  bool enteredExecutionMode{false};
141  bool hasFilters{false};
142 
143  bool no_ping{false};
144  bool uuid_like{false};
146  bool useJsonSerialization{false};
147  bool enable_profiling{false};
148  bool allowRemoteControl{true};
151  bool globalDisconnect{false};
154  decltype(std::chrono::steady_clock::now()) errorTimeStart;
156  decltype(std::chrono::steady_clock::now()) disconnectTime;
157  std::atomic<int> lastErrorCode{0};
158  std::string lastErrorString;
159  std::string configString;
160  bool fileInUse{false};
161 
162  private:
164  std::shared_ptr<ProfilerBuffer> prBuff;
165 
167  bool forwardTick{false};
169  uint32_t forwardingReasons{0U};
171  std::atomic<BrokerState> brokerState{BrokerState::CREATED};
172 
173  public:
174  explicit BrokerBase(bool DisableQueue = false) noexcept;
175  explicit BrokerBase(std::string_view broker_name, bool DisableQueue = false);
176 
177  virtual ~BrokerBase();
178 
182  void loadInfoFromToml(const std::string& toml, bool runArgParser = true);
183 
187  void loadInfoFromJson(const std::string& json, bool runArgParser = true);
188 
192  int parseArgs(int argc, char* argv[]);
196  int parseArgs(std::vector<std::string> args);
200  int parseArgs(std::string_view initializationString);
203  virtual void configureBase();
204 
206  void addActionMessage(const ActionMessage& message);
208  void addActionMessage(ActionMessage&& message);
209 
215  void setLoggerFunction(
216  std::function<void(int level, std::string_view identifier, std::string_view message)>
217  logFunction);
219  void logFlush();
221  bool isRunning() const { return mainLoopIsRunning.load(); }
223  void setLogLevel(int32_t level);
228  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
230  GlobalBrokerId getGlobalId() const { return global_id.load(); }
231 
232  private:
234  void queueProcessingLoop();
237  action_message_def::action_t commandProcessor(ActionMessage& command);
238 
240  std::shared_ptr<helicsCLI11App> generateBaseCLI();
242  void baseConfigure(ActionMessage& command);
243 
246  void addActionMessage(ActionMessage&& message) const;
247 
248  protected:
250  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
251  {
252  return ((static_cast<std::uint32_t>(reason) & code) != 0);
253  }
255  void setTickForwarding(TickForwardingReasons reason, bool value = true);
256  BrokerState getBrokerState() const { return brokerState.load(); }
257  bool setBrokerState(BrokerState newState);
258  bool transitionBrokerState(BrokerState expectedState, BrokerState newState);
260  virtual void processDisconnect(bool skipUnregister = false) = 0;
263  virtual bool tryReconnect() = 0;
266  virtual void processCommand(ActionMessage&& cmd) = 0;
272  virtual void processPriorityCommand(ActionMessage&& command) = 0;
273 
278  bool sendToLogger(GlobalFederateId federateID,
279  int logLevel,
280  std::string_view name,
281  std::string_view message,
282  bool fromRemote = false) const;
284  void saveProfilingData(std::string_view message);
286  void writeProfilingData();
288  void generateNewIdentifier();
290  virtual std::string generateLocalAddressString() const = 0;
292  virtual std::shared_ptr<helicsCLI11App> generateCLI();
294  void setErrorState(int eCode, std::string_view estring);
296  void setLoggingFile(std::string_view lfile);
298  bool getFlagValue(int32_t flag) const;
300  virtual double getSimulationTime() const { return mInvalidSimulationTime; }
302  std::pair<bool, std::vector<std::string_view>> processBaseCommands(ActionMessage& command);
304  void addBaseInformation(nlohmann::json& base, bool hasParent) const;
305 
306  public:
308  std::function<void(int, std::string_view, std::string_view)> getLoggingCallback() const;
310  void joinAllThreads();
312  std::size_t currentMessageCounter() const
313  {
314  return messageCounter.load(std::memory_order_acquire);
315  }
316  friend class TimeoutMonitor;
317  friend const std::string& brokerStateName(BrokerState state);
318 };
319 
322 const std::string& brokerStateName(BrokerBase::BrokerState state);
323 
324 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: BrokerBase.hpp:39
std::thread queueProcessingThread
Definition: BrokerBase.hpp:75
void setLoggingFile(std::string_view lfile)
Definition: BrokerBase.cpp:640
decltype(std::chrono::steady_clock::now()) disconnectTime
time when the disconnect started
Definition: BrokerBase.hpp:156
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:83
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:766
friend const std::string & brokerStateName(BrokerState state)
Definition: BrokerBase.cpp:1186
std::string configString
storage for a config file location
Definition: BrokerBase.hpp:159
void setLoggerFunction(std::function< void(int level, std::string_view identifier, std::string_view message)> logFunction)
Definition: BrokerBase.cpp:747
virtual void configureBase()
Definition: BrokerBase.cpp:510
bool observer
flag indicating that the broker is an observer only
Definition: BrokerBase.hpp:85
int32_t minFederateCount
Definition: BrokerBase.hpp:51
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:67
void joinAllThreads()
Definition: BrokerBase.cpp:97
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:157
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:250
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:143
GlobalBrokerId higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:46
BrokerState
Definition: BrokerBase.hpp:113
@ CONFIGURED
the broker itself has been configured and is ready to connect
@ TERMINATING_ERROR
the termination process has started while in an error state
@ OPERATING
normal operating conditions
@ CONNECTED_ERROR
error state but still connected
@ INITIALIZING
the enter initialization process has started
@ CREATED
the broker has been created
@ CONNECTING
the connection process has started
@ TERMINATED
the termination process has started
@ CONNECTED
the connection process has completed
@ CONFIGURING
the broker is in the processing of configuring
@ TERMINATING
the termination process has started
@ ERRORED
an error was encountered
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:60
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:1079
void saveProfilingData(std::string_view message)
Definition: BrokerBase.cpp:591
std::string brokerKey
Definition: BrokerBase.hpp:68
bool isRunning() const
Definition: BrokerBase.hpp:221
bool sendToLogger(GlobalFederateId federateID, int logLevel, std::string_view name, std::string_view message, bool fromRemote=false) const
Definition: BrokerBase.cpp:558
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:147
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:61
std::atomic< int32_t > maxLogLevel
Definition: BrokerBase.hpp:48
virtual void processCommand(ActionMessage &&cmd)=0
bool errorOnUnmatchedConnections
error if there are unmatched connections on init
Definition: BrokerBase.hpp:150
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:312
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:77
std::shared_ptr< LogManager > mLogManager
object to handle the logging considerations
Definition: BrokerBase.hpp:111
GlobalBrokerId getGlobalId() const
Definition: BrokerBase.hpp:230
int32_t minChildCount
Definition: BrokerBase.hpp:57
std::unique_ptr< BaseTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:109
void loadInfoFromJson(const std::string &json, bool runArgParser=true)
Definition: BrokerBase.cpp:394
virtual std::string generateLocalAddressString() const =0
void logFlush()
Definition: BrokerBase.cpp:758
std::function< void(int, std::string_view, std::string_view)> getLoggingCallback() const
Definition: BrokerBase.cpp:90
bool asyncTime
flag indicating the use of async time keeping
Definition: BrokerBase.hpp:89
std::string address
network location of the broker
Definition: BrokerBase.hpp:73
Time queryTimeout
Definition: BrokerBase.hpp:62
bool disableDynamicSources
flag disabling dynamic data sources
Definition: BrokerBase.hpp:93
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:81
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:79
Time maxCoSimDuration
the maximum lifetime (wall clock time) of the co-simulation
Definition: BrokerBase.hpp:66
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:105
bool hasTimeDependency
Definition: BrokerBase.hpp:137
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:136
bool allowRemoteControl
Definition: BrokerBase.hpp:148
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:144
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:139
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:865
void addBaseInformation(nlohmann::json &base, bool hasParent) const
Definition: BrokerBase.cpp:731
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:64
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:140
void setErrorState(int eCode, std::string_view estring)
Definition: BrokerBase.cpp:615
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:158
void writeProfilingData()
Definition: BrokerBase.cpp:600
virtual double getSimulationTime() const
Definition: BrokerBase.hpp:300
bool useJsonSerialization
Definition: BrokerBase.hpp:146
void generateNewIdentifier()
Definition: BrokerBase.cpp:585
int32_t minBrokerCount
Definition: BrokerBase.hpp:53
std::pair< bool, std::vector< std::string_view > > processBaseCommands(ActionMessage &command)
Definition: BrokerBase.cpp:658
virtual void processPriorityCommand(ActionMessage &&command)=0
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:645
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:110
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:141
std::atomic< GlobalBrokerId > global_id
Definition: BrokerBase.hpp:43
void loadInfoFromToml(const std::string &toml, bool runArgParser=true)
Definition: BrokerBase.cpp:476
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:312
Time grantTimeout
timeout for triggering diagnostic action waiting for a time grant
Definition: BrokerBase.hpp:65
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:753
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:59
bool dynamicFederation
flag indicating that the broker supports dynamic federates
Definition: BrokerBase.hpp:91
virtual void processDisconnect(bool skipUnregister=false)=0
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:58
bool globalDisconnect
Definition: BrokerBase.hpp:151
GlobalBrokerId global_broker_id_local
Definition: BrokerBase.hpp:44
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started; related to the errorDelay
Definition: BrokerBase.hpp:154
void addActionMessage(const ActionMessage &message)
Definition: BrokerBase.cpp:772
bool globalTime
flag indicating that the broker should use a global time coordinator
Definition: BrokerBase.hpp:87
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: TimeoutMonitor.h:27
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:206
action_t
Definition: ActionMessageDefintions.hpp:20
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:67
const std::string & brokerStateName(BrokerBase::BrokerState state)
Definition: BrokerBase.cpp:1186
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27