helics  3.5.2
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
14 #include "ActionMessage.hpp"
15 #include "FederateIdExtra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
26 namespace spdlog {
27 class logger;
28 }
30 namespace Json {
31 class Value;
32 }
34 namespace helics {
35 class BaseTimeCoordinator;
36 class helicsCLI11App;
37 class ProfilerBuffer;
38 class LogBuffer;
39 class LogManager;
42 class BrokerBase {
43  protected:
44  static constexpr double mInvalidSimulationTime{-98763.2};
46  std::atomic<GlobalBrokerId> global_id{parent_broker_id};
51  std::atomic<int32_t> maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
54  int32_t minFederateCount{1};
56  int32_t minBrokerCount{0};
57  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
58  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
60  int32_t minChildCount{0};
61  int32_t maxIterationCount{10000};
62  Time tickTimer{5.0};
63  Time timeout{30.0};
70  std::string identifier;
71  std::string brokerKey;
73  // address is mutable since during initial phases it may not be fixed so to maintain a
74  // consistent public interface for extracting it this variable may need to be updated in a
75  // constant function
76  mutable std::string address;
78  std::thread queueProcessingThread;
80  std::atomic<bool> haltOperations{false};
84  bool terminate_on_error{false};
86  bool debugging{false};
88  bool observer{false};
90  bool globalTime{false};
92  bool asyncTime{false};
94  bool dynamicFederation{false};
96  bool disableDynamicSources{false};
98  private:
100  std::atomic<bool> mainLoopIsRunning{false};
102  bool dumplog{false};
105  bool queueDisabled{false};
107  bool disable_timer{false};
109  std::atomic<std::size_t> messageCounter{0};
111  protected:
112  std::unique_ptr<BaseTimeCoordinator> timeCoord;
113  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
114  std::shared_ptr<LogManager> mLogManager;
116  enum class BrokerState : int16_t {
117  CREATED = -10,
118  CONFIGURING = -7,
119  CONFIGURED = -6,
120  CONNECTING = -4,
121  CONNECTED = -3,
123  OPERATING = 0,
127  TERMINATED = 6,
128  ERRORED = 7,
129  };
131  enum class TickForwardingReasons : uint32_t {
132  NONE = 0,
133  NO_COMMS = 0x01,
134  PING_RESPONSE = 0x02,
135  QUERY_TIMEOUT = 0x04,
136  GRANT_TIMEOUT = 0x08,
138  };
139  bool noAutomaticID{false};
140  bool hasTimeDependency{false};
142  bool enteredExecutionMode{false};
144  bool hasFilters{false};
146  bool no_ping{false};
147  bool uuid_like{false};
149  bool useJsonSerialization{false};
150  bool enable_profiling{false};
151  bool allowRemoteControl{true};
155  decltype(std::chrono::steady_clock::now()) errorTimeStart;
157  decltype(std::chrono::steady_clock::now()) disconnectTime;
158  std::atomic<int> lastErrorCode{0};
159  std::string lastErrorString;
160  private:
162  std::shared_ptr<ProfilerBuffer> prBuff;
165  bool forwardTick{false};
167  uint32_t forwardingReasons{0U};
169  std::atomic<BrokerState> brokerState{BrokerState::CREATED};
171  public:
172  explicit BrokerBase(bool DisableQueue = false) noexcept;
173  explicit BrokerBase(std::string_view broker_name, bool DisableQueue = false);
175  virtual ~BrokerBase();
179  int parseArgs(int argc, char* argv[]);
183  int parseArgs(std::vector<std::string> args);
187  int parseArgs(std::string_view initializationString);
190  virtual void configureBase();
193  void addActionMessage(const ActionMessage& message);
195  void addActionMessage(ActionMessage&& message);
202  void setLoggerFunction(
203  std::function<void(int level, std::string_view identifier, std::string_view message)>
204  logFunction);
206  void logFlush();
208  bool isRunning() const { return mainLoopIsRunning.load(); }
210  void setLogLevel(int32_t level);
215  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
217  GlobalBrokerId getGlobalId() const { return global_id.load(); }
219  private:
221  void queueProcessingLoop();
224  action_message_def::action_t commandProcessor(ActionMessage& command);
227  std::shared_ptr<helicsCLI11App> generateBaseCLI();
229  void baseConfigure(ActionMessage& command);
233  void addActionMessage(ActionMessage&& message) const;
235  protected:
237  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
238  {
239  return ((static_cast<std::uint32_t>(reason) & code) != 0);
240  }
242  void setTickForwarding(TickForwardingReasons reason, bool value = true);
243  BrokerState getBrokerState() const { return brokerState.load(); }
244  bool setBrokerState(BrokerState newState);
245  bool transitionBrokerState(BrokerState expectedState, BrokerState newState);
247  virtual void processDisconnect(bool skipUnregister = false) = 0;
250  virtual bool tryReconnect() = 0;
253  virtual void processCommand(ActionMessage&& cmd) = 0;
259  virtual void processPriorityCommand(ActionMessage&& command) = 0;
265  bool sendToLogger(GlobalFederateId federateID,
266  int logLevel,
267  std::string_view name,
268  std::string_view message,
269  bool fromRemote = false) const;
271  void saveProfilingData(std::string_view message);
273  void writeProfilingData();
275  void generateNewIdentifier();
277  virtual std::string generateLocalAddressString() const = 0;
279  virtual std::shared_ptr<helicsCLI11App> generateCLI();
281  void setErrorState(int eCode, std::string_view estring);
283  void setLoggingFile(std::string_view lfile);
285  bool getFlagValue(int32_t flag) const;
287  virtual double getSimulationTime() const { return mInvalidSimulationTime; }
289  std::pair<bool, std::vector<std::string_view>> processBaseCommands(ActionMessage& command);
291  void addBaseInformation(Json::Value& base, bool hasParent) const;
293  public:
295  std::function<void(int, std::string_view, std::string_view)> getLoggingCallback() const;
297  void joinAllThreads();
299  std::size_t currentMessageCounter() const
300  {
301  return messageCounter.load(std::memory_order_acquire);
302  }
303  friend class TimeoutMonitor;
304  friend const std::string& brokerStateName(BrokerState state);
305 };
309 const std::string& brokerStateName(BrokerBase::BrokerState state);
311 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: BrokerBase.hpp:42
std::thread queueProcessingThread
Definition: BrokerBase.hpp:78
void setLoggingFile(std::string_view lfile)
Definition: BrokerBase.cpp:458
decltype(std::chrono::steady_clock::now()) disconnectTime
time when the disconnect started
Definition: BrokerBase.hpp:157
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:86
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:584
friend const std::string & brokerStateName(BrokerState state)
Definition: BrokerBase.cpp:1004
void setLoggerFunction(std::function< void(int level, std::string_view identifier, std::string_view message)> logFunction)
Definition: BrokerBase.cpp:565
virtual void configureBase()
Definition: BrokerBase.cpp:332
void addBaseInformation(Json::Value &base, bool hasParent) const
Definition: BrokerBase.cpp:549
bool observer
flag indicating that the broker is an observer only
Definition: BrokerBase.hpp:88
int32_t minFederateCount
Definition: BrokerBase.hpp:54
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:70
void joinAllThreads()
Definition: BrokerBase.cpp:93
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:158
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:237
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:146
GlobalBrokerId higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:49
Definition: BrokerBase.hpp:116
the broker itself has been configured and is ready to connect
the termination process has started while in an error state
normal operating conditions
error state but still connected
the enter initialization process has started
the broker has been created
the connection process has started
the termination process has started
the connection process has completed
the broker is in the processing of configuring
the termination process has started
an error was encountered
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:63
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:897
void saveProfilingData(std::string_view message)
Definition: BrokerBase.cpp:413
std::string brokerKey
Definition: BrokerBase.hpp:71
bool isRunning() const
Definition: BrokerBase.hpp:208
bool sendToLogger(GlobalFederateId federateID, int logLevel, std::string_view name, std::string_view message, bool fromRemote=false) const
Definition: BrokerBase.cpp:380
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:150
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:64
std::atomic< int32_t > maxLogLevel
Definition: BrokerBase.hpp:51
virtual void processCommand(ActionMessage &&cmd)=0
bool errorOnUnmatchedConnections
error if there are unmatched connections on init
Definition: BrokerBase.hpp:153
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:299
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:80
std::shared_ptr< LogManager > mLogManager
object to handle the logging considerations
Definition: BrokerBase.hpp:114
GlobalBrokerId getGlobalId() const
Definition: BrokerBase.hpp:217
int32_t minChildCount
Definition: BrokerBase.hpp:60
std::unique_ptr< BaseTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:112
virtual std::string generateLocalAddressString() const =0
void logFlush()
Definition: BrokerBase.cpp:576
std::function< void(int, std::string_view, std::string_view)> getLoggingCallback() const
Definition: BrokerBase.cpp:86
bool asyncTime
flag indicating the use of async time keeping
Definition: BrokerBase.hpp:92
std::string address
network location of the broker
Definition: BrokerBase.hpp:76
Time queryTimeout
Definition: BrokerBase.hpp:65
bool disableDynamicSources
flag disabling dynamic data sources
Definition: BrokerBase.hpp:96
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:84
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:82
Time maxCoSimDuration
the maximum lifetime (wall clock time) of the co-simulation
Definition: BrokerBase.hpp:69
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:101
bool hasTimeDependency
Definition: BrokerBase.hpp:140
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:139
bool allowRemoteControl
Definition: BrokerBase.hpp:151
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:147
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:142
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:683
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:67
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:143
void setErrorState(int eCode, std::string_view estring)
Definition: BrokerBase.cpp:433
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:159
void writeProfilingData()
Definition: BrokerBase.cpp:422
virtual double getSimulationTime() const
Definition: BrokerBase.hpp:287
bool useJsonSerialization
Definition: BrokerBase.hpp:149
void generateNewIdentifier()
Definition: BrokerBase.cpp:407
int32_t minBrokerCount
Definition: BrokerBase.hpp:56
std::pair< bool, std::vector< std::string_view > > processBaseCommands(ActionMessage &command)
Definition: BrokerBase.cpp:476
virtual void processPriorityCommand(ActionMessage &&command)=0
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:463
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:113
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:144
std::atomic< GlobalBrokerId > global_id
Definition: BrokerBase.hpp:46
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:305
Time grantTimeout
timeout for triggering diagnostic action waiting for a time grant
Definition: BrokerBase.hpp:68
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:571
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:62
bool dynamicFederation
flag indicating that the broker supports dynamic federates
Definition: BrokerBase.hpp:94
virtual void processDisconnect(bool skipUnregister=false)=0
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:61
GlobalBrokerId global_broker_id_local
Definition: BrokerBase.hpp:47
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started; related to the errorDelay
Definition: BrokerBase.hpp:155
void addActionMessage(const ActionMessage &message)
Definition: BrokerBase.cpp:590
bool globalTime
flag indicating that the broker should use a global time coordinator
Definition: BrokerBase.hpp:90
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: TimeoutMonitor.h:27
Definition: helics_enums.h:204
Definition: ActionMessageDefintions.hpp:20
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:67
const std::string & brokerStateName(BrokerBase::BrokerState state)
Definition: BrokerBase.cpp:1004
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27