helics  3.5.2
BrokerBase.hpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
14 #include "ActionMessage.hpp"
15 #include "FederateIdExtra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
17 
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
25 
26 namespace spdlog {
27 class logger;
28 }
30 namespace Json {
31 class Value;
32 }
33 
34 namespace helics {
35 class BaseTimeCoordinator;
36 class helicsCLI11App;
37 class ProfilerBuffer;
38 class LogBuffer;
39 class LogManager;
42 class BrokerBase {
43  protected:
44  static constexpr double mInvalidSimulationTime{-98763.2};
46  std::atomic<GlobalBrokerId> global_id{parent_broker_id};
51  std::atomic<int32_t> maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
52 
54  int32_t minFederateCount{1};
56  int32_t minBrokerCount{0};
57  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
58  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
60  int32_t minChildCount{0};
61  int32_t maxIterationCount{10000};
62  Time tickTimer{5.0};
63  Time timeout{30.0};
70  std::string identifier;
71  std::string brokerKey;
73  // address is mutable since during initial phases it may not be fixed so to maintain a
74  // consistent public interface for extracting it this variable may need to be updated in a
75  // constant function
76  mutable std::string address;
77 
78  std::thread queueProcessingThread;
80  std::atomic<bool> haltOperations{false};
84  bool terminate_on_error{false};
86  bool debugging{false};
88  bool observer{false};
90  bool globalTime{false};
92  bool asyncTime{false};
94  bool dynamicFederation{false};
96  bool disableDynamicSources{false};
97 
98  private:
100  std::atomic<bool> mainLoopIsRunning{false};
102  bool dumplog{false};
105  bool queueDisabled{false};
107  bool disable_timer{false};
109  std::atomic<std::size_t> messageCounter{0};
110 
111  protected:
112  std::unique_ptr<BaseTimeCoordinator> timeCoord;
113  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
114  std::shared_ptr<LogManager> mLogManager;
116  enum class BrokerState : int16_t {
117  CREATED = -10,
118  CONFIGURING = -7,
119  CONFIGURED = -6,
120  CONNECTING = -4,
121  CONNECTED = -3,
122  INITIALIZING = -1,
123  OPERATING = 0,
124  CONNECTED_ERROR = 3,
125  TERMINATING = 4,
126  TERMINATING_ERROR = 5,
127  TERMINATED = 6,
128  ERRORED = 7,
129  };
130 
131  enum class TickForwardingReasons : uint32_t {
132  NONE = 0,
133  NO_COMMS = 0x01,
134  PING_RESPONSE = 0x02,
135  QUERY_TIMEOUT = 0x04,
136  GRANT_TIMEOUT = 0x08,
137  DISCONNECT_TIMEOUT = 0x10
138  };
139  bool noAutomaticID{false};
140  bool hasTimeDependency{false};
142  bool enteredExecutionMode{false};
144  bool hasFilters{false};
145 
146  bool no_ping{false};
147  bool uuid_like{false};
149  bool useJsonSerialization{false};
150  bool enable_profiling{false};
151  bool allowRemoteControl{true};
155  decltype(std::chrono::steady_clock::now()) errorTimeStart;
157  decltype(std::chrono::steady_clock::now()) disconnectTime;
158  std::atomic<int> lastErrorCode{0};
159  std::string lastErrorString;
160  private:
162  std::shared_ptr<ProfilerBuffer> prBuff;
163 
165  bool forwardTick{false};
167  uint32_t forwardingReasons{0U};
169  std::atomic<BrokerState> brokerState{BrokerState::CREATED};
170 
171  public:
172  explicit BrokerBase(bool DisableQueue = false) noexcept;
173  explicit BrokerBase(std::string_view broker_name, bool DisableQueue = false);
174 
175  virtual ~BrokerBase();
179  int parseArgs(int argc, char* argv[]);
183  int parseArgs(std::vector<std::string> args);
187  int parseArgs(std::string_view initializationString);
190  virtual void configureBase();
191 
193  void addActionMessage(const ActionMessage& message);
195  void addActionMessage(ActionMessage&& message);
196 
202  void setLoggerFunction(
203  std::function<void(int level, std::string_view identifier, std::string_view message)>
204  logFunction);
206  void logFlush();
208  bool isRunning() const { return mainLoopIsRunning.load(); }
210  void setLogLevel(int32_t level);
215  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
217  GlobalBrokerId getGlobalId() const { return global_id.load(); }
218 
219  private:
221  void queueProcessingLoop();
224  action_message_def::action_t commandProcessor(ActionMessage& command);
225 
227  std::shared_ptr<helicsCLI11App> generateBaseCLI();
229  void baseConfigure(ActionMessage& command);
230 
233  void addActionMessage(ActionMessage&& message) const;
234 
235  protected:
237  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
238  {
239  return ((static_cast<std::uint32_t>(reason) & code) != 0);
240  }
242  void setTickForwarding(TickForwardingReasons reason, bool value = true);
243  BrokerState getBrokerState() const { return brokerState.load(); }
244  bool setBrokerState(BrokerState newState);
245  bool transitionBrokerState(BrokerState expectedState, BrokerState newState);
247  virtual void processDisconnect(bool skipUnregister = false) = 0;
250  virtual bool tryReconnect() = 0;
253  virtual void processCommand(ActionMessage&& cmd) = 0;
259  virtual void processPriorityCommand(ActionMessage&& command) = 0;
260 
265  bool sendToLogger(GlobalFederateId federateID,
266  int logLevel,
267  std::string_view name,
268  std::string_view message,
269  bool fromRemote = false) const;
271  void saveProfilingData(std::string_view message);
273  void writeProfilingData();
275  void generateNewIdentifier();
277  virtual std::string generateLocalAddressString() const = 0;
279  virtual std::shared_ptr<helicsCLI11App> generateCLI();
281  void setErrorState(int eCode, std::string_view estring);
283  void setLoggingFile(std::string_view lfile);
285  bool getFlagValue(int32_t flag) const;
287  virtual double getSimulationTime() const { return mInvalidSimulationTime; }
289  std::pair<bool, std::vector<std::string_view>> processBaseCommands(ActionMessage& command);
291  void addBaseInformation(Json::Value& base, bool hasParent) const;
292 
293  public:
295  std::function<void(int, std::string_view, std::string_view)> getLoggingCallback() const;
297  void joinAllThreads();
299  std::size_t currentMessageCounter() const
300  {
301  return messageCounter.load(std::memory_order_acquire);
302  }
303  friend class TimeoutMonitor;
304  friend const std::string& brokerStateName(BrokerState state);
305 };
306 
309 const std::string& brokerStateName(BrokerBase::BrokerState state);
310 
311 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: BrokerBase.hpp:42
std::thread queueProcessingThread
Definition: BrokerBase.hpp:78
void setLoggingFile(std::string_view lfile)
Definition: BrokerBase.cpp:458
decltype(std::chrono::steady_clock::now()) disconnectTime
time when the disconnect started
Definition: BrokerBase.hpp:157
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:86
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:584
friend const std::string & brokerStateName(BrokerState state)
Definition: BrokerBase.cpp:1004
void setLoggerFunction(std::function< void(int level, std::string_view identifier, std::string_view message)> logFunction)
Definition: BrokerBase.cpp:565
virtual void configureBase()
Definition: BrokerBase.cpp:332
void addBaseInformation(Json::Value &base, bool hasParent) const
Definition: BrokerBase.cpp:549
bool observer
flag indicating that the broker is an observer only
Definition: BrokerBase.hpp:88
int32_t minFederateCount
Definition: BrokerBase.hpp:54
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:70
void joinAllThreads()
Definition: BrokerBase.cpp:93
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:158
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:237
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:146
GlobalBrokerId higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:49
BrokerState
Definition: BrokerBase.hpp:116
@ CONFIGURED
the broker itself has been configured and is ready to connect
@ TERMINATING_ERROR
the termination process has started while in an error state
@ OPERATING
normal operating conditions
@ CONNECTED_ERROR
error state but still connected
@ INITIALIZING
the enter initialization process has started
@ CREATED
the broker has been created
@ CONNECTING
the connection process has started
@ TERMINATED
the termination process has started
@ CONNECTED
the connection process has completed
@ CONFIGURING
the broker is in the processing of configuring
@ TERMINATING
the termination process has started
@ ERRORED
an error was encountered
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:63
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:897
void saveProfilingData(std::string_view message)
Definition: BrokerBase.cpp:413
std::string brokerKey
Definition: BrokerBase.hpp:71
bool isRunning() const
Definition: BrokerBase.hpp:208
bool sendToLogger(GlobalFederateId federateID, int logLevel, std::string_view name, std::string_view message, bool fromRemote=false) const
Definition: BrokerBase.cpp:380
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:150
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:64
std::atomic< int32_t > maxLogLevel
Definition: BrokerBase.hpp:51
virtual void processCommand(ActionMessage &&cmd)=0
bool errorOnUnmatchedConnections
error if there are unmatched connections on init
Definition: BrokerBase.hpp:153
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:299
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:80
std::shared_ptr< LogManager > mLogManager
object to handle the logging considerations
Definition: BrokerBase.hpp:114
GlobalBrokerId getGlobalId() const
Definition: BrokerBase.hpp:217
int32_t minChildCount
Definition: BrokerBase.hpp:60
std::unique_ptr< BaseTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:112
virtual std::string generateLocalAddressString() const =0
void logFlush()
Definition: BrokerBase.cpp:576
std::function< void(int, std::string_view, std::string_view)> getLoggingCallback() const
Definition: BrokerBase.cpp:86
bool asyncTime
flag indicating the use of async time keeping
Definition: BrokerBase.hpp:92
std::string address
network location of the broker
Definition: BrokerBase.hpp:76
Time queryTimeout
Definition: BrokerBase.hpp:65
bool disableDynamicSources
flag disabling dynamic data sources
Definition: BrokerBase.hpp:96
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:84
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:82
Time maxCoSimDuration
the maximum lifetime (wall clock time) of the co-simulation
Definition: BrokerBase.hpp:69
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:101
bool hasTimeDependency
Definition: BrokerBase.hpp:140
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:139
bool allowRemoteControl
Definition: BrokerBase.hpp:151
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:147
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:142
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:683
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:67
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:143
void setErrorState(int eCode, std::string_view estring)
Definition: BrokerBase.cpp:433
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:159
void writeProfilingData()
Definition: BrokerBase.cpp:422
virtual double getSimulationTime() const
Definition: BrokerBase.hpp:287
bool useJsonSerialization
Definition: BrokerBase.hpp:149
void generateNewIdentifier()
Definition: BrokerBase.cpp:407
int32_t minBrokerCount
Definition: BrokerBase.hpp:56
std::pair< bool, std::vector< std::string_view > > processBaseCommands(ActionMessage &command)
Definition: BrokerBase.cpp:476
virtual void processPriorityCommand(ActionMessage &&command)=0
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:463
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:113
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:144
std::atomic< GlobalBrokerId > global_id
Definition: BrokerBase.hpp:46
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:305
Time grantTimeout
timeout for triggering diagnostic action waiting for a time grant
Definition: BrokerBase.hpp:68
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:571
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:62
bool dynamicFederation
flag indicating that the broker supports dynamic federates
Definition: BrokerBase.hpp:94
virtual void processDisconnect(bool skipUnregister=false)=0
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:61
GlobalBrokerId global_broker_id_local
Definition: BrokerBase.hpp:47
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started; related to the errorDelay
Definition: BrokerBase.hpp:155
void addActionMessage(const ActionMessage &message)
Definition: BrokerBase.cpp:590
bool globalTime
flag indicating that the broker should use a global time coordinator
Definition: BrokerBase.hpp:90
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: TimeoutMonitor.h:27
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:204
action_t
Definition: ActionMessageDefintions.hpp:20
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:67
const std::string & brokerStateName(BrokerBase::BrokerState state)
Definition: BrokerBase.cpp:1004
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27