helics  3.3.0
BrokerBase.hpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2022,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
14 #include "ActionMessage.hpp"
15 #include "FederateIdExtra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
17 
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
25 
26 namespace spdlog {
27 class logger;
28 }
30 namespace Json {
31 class Value;
32 }
33 
34 namespace helics {
35 class BaseTimeCoordinator;
36 class helicsCLI11App;
37 class ProfilerBuffer;
38 class LogBuffer;
39 class LogManager;
42 class BrokerBase {
43  protected:
44  static constexpr double mInvalidSimulationTime{-98763.2};
46  std::atomic<GlobalBrokerId> global_id{parent_broker_id};
50 
51  std::atomic<int32_t> maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
52 
54  int32_t minFederateCount{1};
56  int32_t minBrokerCount{0};
57  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
58  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
60  int32_t minChildCount{0};
61  int32_t maxIterationCount{10000};
62  Time tickTimer{5.0};
63  Time timeout{30.0};
66  Time errorDelay{0.0};
70  std::string identifier;
71  std::string brokerKey;
72  // address is mutable since during initial phases it may not be fixed so to maintain a
74  // consistent public interface for extracting it this variable may need to be updated in a
75  // constant function
76  mutable std::string address;
77 
78  std::thread queueProcessingThread;
79  std::atomic<bool> haltOperations{false};
84  bool terminate_on_error{false};
86  bool debugging{false};
88  bool observer{false};
90  bool globalTime{false};
92  bool asyncTime{false};
93 
94  private:
96  std::atomic<bool> mainLoopIsRunning{false};
98  bool dumplog{false};
101  bool queueDisabled{false};
103  bool disable_timer{false};
105  std::atomic<std::size_t> messageCounter{0};
106 
107  protected:
108  std::unique_ptr<BaseTimeCoordinator> timeCoord;
109  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
110  std::shared_ptr<LogManager> mLogManager;
111 
112  enum class BrokerState : int16_t {
113  CREATED = -10,
114  CONFIGURING = -7,
115  CONFIGURED = -6,
116  CONNECTING = -4,
117  CONNECTED = -3,
118  INITIALIZING = -1,
119  OPERATING = 0,
120  CONNECTED_ERROR = 3,
121  TERMINATING = 4,
122  TERMINATING_ERROR = 5,
123  TERMINATED = 6,
124  ERRORED = 7,
125  };
126 
127  enum class TickForwardingReasons : uint32_t {
128  NONE = 0,
129  NO_COMMS = 0x01,
130  PING_RESPONSE = 0x02,
131  QUERY_TIMEOUT = 0x04,
132  GRANT_TIMEOUT = 0x08,
133  DISCONNECT_TIMEOUT = 0x10
134  };
135  bool noAutomaticID{false};
136  bool hasTimeDependency{false};
137  bool enteredExecutionMode{false};
140  bool hasFilters{false};
141 
142  bool no_ping{false};
143  bool uuid_like{false};
144 
145  bool useJsonSerialization{false};
146  bool enable_profiling{false};
147  decltype(std::chrono::steady_clock::now()) errorTimeStart;
150  decltype(std::chrono::steady_clock::now()) disconnectTime;
151  std::atomic<int> lastErrorCode{0};
152  std::string lastErrorString;
153  private:
155  std::shared_ptr<ProfilerBuffer> prBuff;
156 
158  bool forwardTick{false};
160  uint32_t forwardingReasons{0U};
162  std::atomic<BrokerState> brokerState{BrokerState::CREATED};
163 
164  public:
165  explicit BrokerBase(bool DisableQueue = false) noexcept;
166  explicit BrokerBase(std::string_view broker_name, bool DisableQueue = false);
167 
168  virtual ~BrokerBase();
172  int parseArgs(int argc, char* argv[]);
176  int parseArgs(std::vector<std::string> args);
180  int parseArgs(std::string_view initializationString);
183  virtual void configureBase();
184 
186  void addActionMessage(const ActionMessage& m);
188  void addActionMessage(ActionMessage&& m);
189 
195  void setLoggerFunction(
196  std::function<void(int level, std::string_view identifier, std::string_view message)>
197  logFunction);
199  void logFlush();
201  bool isRunning() const { return mainLoopIsRunning.load(); }
203  void setLogLevel(int32_t level);
208  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
210  GlobalBrokerId getGlobalId() const { return global_id.load(); }
211 
212  private:
214  void queueProcessingLoop();
217  action_message_def::action_t commandProcessor(ActionMessage& command);
218 
220  std::shared_ptr<helicsCLI11App> generateBaseCLI();
222  void baseConfigure(ActionMessage& command);
223 
226  void addActionMessage(ActionMessage&& m) const;
227 
228  protected:
230  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
231  {
232  return ((static_cast<std::uint32_t>(reason) & code) != 0);
233  }
235  void setTickForwarding(TickForwardingReasons reason, bool value = true);
236  BrokerState getBrokerState() const { return brokerState.load(); }
237  bool setBrokerState(BrokerState newState);
238  bool transitionBrokerState(BrokerState expectedState, BrokerState newState);
240  virtual void processDisconnect(bool skipUnregister = false) = 0;
243  virtual bool tryReconnect() = 0;
246  virtual void processCommand(ActionMessage&& cmd) = 0;
252  virtual void processPriorityCommand(ActionMessage&& command) = 0;
253 
258  bool sendToLogger(GlobalFederateId federateID,
259  int logLevel,
260  std::string_view name,
261  std::string_view message,
262  bool fromRemote = false) const;
264  void saveProfilingData(std::string_view message);
266  void writeProfilingData();
268  void generateNewIdentifier();
270  virtual std::string generateLocalAddressString() const = 0;
272  virtual std::shared_ptr<helicsCLI11App> generateCLI();
274  void setErrorState(int eCode, std::string_view estring);
276  void setLoggingFile(std::string_view lfile);
278  bool getFlagValue(int32_t flag) const;
280  virtual double getSimulationTime() const { return mInvalidSimulationTime; }
282  std::pair<bool, std::vector<std::string_view>> processBaseCommands(ActionMessage& command);
284  void addBaseInformation(Json::Value& base, bool hasParent) const;
285 
286  public:
288  std::function<void(int, std::string_view, std::string_view)> getLoggingCallback() const;
290  void joinAllThreads();
292  std::size_t currentMessageCounter() const
293  {
294  return messageCounter.load(std::memory_order_acquire);
295  }
296  friend class TimeoutMonitor;
297  friend const std::string& brokerStateName(BrokerState state);
298 };
299 
302 const std::string& brokerStateName(BrokerBase::BrokerState state);
303 
304 } // namespace helics
helics::timeZero
constexpr Time timeZero
Definition: helicsTime.hpp:31
helics::BrokerBase::joinAllThreads
void joinAllThreads()
Definition: BrokerBase.cpp:91
helics::BrokerBase::BrokerState::CONNECTED
@ CONNECTED
the connection process has completed
helics::BrokerBase::maxCoSimDuration
Time maxCoSimDuration
the maximum lifetime (wall clock time) of the co-simulation
Definition: BrokerBase.hpp:69
HELICS_ERROR_USER_ABORT
@ HELICS_ERROR_USER_ABORT
Definition: helics_enums.h:246
helics::BrokerBase::setLoggerFunction
void setLoggerFunction(std::function< void(int level, std::string_view identifier, std::string_view message)> logFunction)
Definition: BrokerBase.cpp:526
helics::BrokerBase::isRunning
bool isRunning() const
Definition: BrokerBase.hpp:201
helics::indicator_flag
@ indicator_flag
flag used for setting values
Definition: flagOperations.hpp:18
helics::GlobalFederateId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:84
helics::BrokerBase::minBrokerCount
int32_t minBrokerCount
Definition: BrokerBase.hpp:56
helics::BrokerBase::BrokerState::CONFIGURED
@ CONFIGURED
the broker itself has been configured and is ready to connect
helics::BrokerBase::higher_broker_id
GlobalBrokerId higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:49
helics::BrokerBase::useJsonSerialization
bool useJsonSerialization
Definition: BrokerBase.hpp:145
helics::BrokerBase::brokerKey
std::string brokerKey
Definition: BrokerBase.hpp:71
helics::BrokerBase::timeout
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:63
helics::BrokerBase::asyncTime
bool asyncTime
flag indicating the use of async time keeping
Definition: BrokerBase.hpp:92
helics::BrokerBase::noAutomaticID
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:135
helics::BrokerBase::enable_profiling
bool enable_profiling
Definition: BrokerBase.hpp:146
helics::action_message_def::action_t
action_t
Definition: ActionMessageDefintions.hpp:20
HELICS_LOG_LEVEL_NO_PRINT
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:186
helics::BrokerBase::tryReconnect
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:644
helics::BrokerBase::BrokerState::TERMINATING
@ TERMINATING
the termination process has started
helics::BrokerBase::getGlobalId
GlobalBrokerId getGlobalId() const
Definition: BrokerBase.hpp:210
helics::BrokerBase::hasTimeDependency
bool hasTimeDependency
Definition: BrokerBase.hpp:136
helics::BrokerBase::getFlagValue
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:426
helics::BrokerBase::queueProcessingThread
std::thread queueProcessingThread
Definition: BrokerBase.hpp:78
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
helics::BrokerBase::hasFilters
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:140
helics::GlobalBrokerId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:35
loggingHelper.hpp
helics::BrokerBase::sendToLogger
bool sendToLogger(GlobalFederateId federateID, int logLevel, std::string_view name, std::string_view message, bool fromRemote=false) const
Definition: BrokerBase.cpp:343
LogManager.hpp
helics::BrokerBase::global_id
std::atomic< GlobalBrokerId > global_id
Definition: BrokerBase.hpp:46
helics::BrokerBase::BrokerState::CONNECTING
@ CONNECTING
the connection process has started
helics::GlobalFederateId
Definition: GlobalFederateId.hpp:72
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::brokerStateName
const std::string & brokerStateName(BrokerBase::BrokerState state)
Definition: BrokerBase.cpp:965
helics::BrokerBase::maxIterationCount
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:61
helics::BrokerBase::uuid_like
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:143
helics::parent_broker_id
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:64
helics::BrokerBase::getSimulationTime
virtual double getSimulationTime() const
Definition: BrokerBase.hpp:280
helics::BrokerBase::brokerStateName
const friend std::string & brokerStateName(BrokerState state)
Definition: BrokerBase.cpp:965
helics::BrokerBase::setLoggingFile
void setLoggingFile(std::string_view lfile)
Definition: BrokerBase.cpp:421
helics::BrokerBase::parseArgs
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:268
helics::BrokerBase::haltOperations
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:80
helics::BrokerBase::mLogManager
std::shared_ptr< LogManager > mLogManager
object to handle the logging considerations
Definition: BrokerBase.hpp:110
helics::BrokerBase::maxLogLevel
std::atomic< int32_t > maxLogLevel
Definition: BrokerBase.hpp:51
HELICS_LOG_LEVEL_DUMPLOG
@ HELICS_LOG_LEVEL_DUMPLOG
Definition: helics_enums.h:184
helics::BrokerBase::configureBase
virtual void configureBase()
Definition: BrokerBase.cpp:295
helics::BrokerBase::minChildCount
int32_t minChildCount
Definition: BrokerBase.hpp:60
helics::BrokerBase::no_ping
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:142
helics::BrokerBase::generateCLI
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:99
helics::BrokerBase::lastErrorString
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:152
helics::BrokerBase::globalTime
bool globalTime
flag indicating that the broker should use a global time coordinator
Definition: BrokerBase.hpp:90
helics::BrokerBase::setLogLevel
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:532
helics::BrokerBase::addActionMessage
void addActionMessage(const ActionMessage &m)
Definition: BrokerBase.cpp:551
helics::BrokerBase::errorDelay
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:67
helics::BrokerBase::generateLocalAddressString
virtual std::string generateLocalAddressString() const =0
helics::BrokerBase::BrokerState
BrokerState
Definition: BrokerBase.hpp:112
helics::BrokerBase::processCommand
virtual void processCommand(ActionMessage &&cmd)=0
helics::BrokerBase::timeCoord
std::unique_ptr< BaseTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:108
helics::BrokerBase::BrokerState::INITIALIZING
@ INITIALIZING
the enter initialization process has started
helics::setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:130
helics::BrokerBase::BrokerState::CONFIGURING
@ CONFIGURING
the broker is in the processing of configuring
helics::BrokerBase::currentMessageCounter
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:292
helics::BrokerBase::disconnectTime
decltype(std::chrono::steady_clock::now()) disconnectTime
time when the disconnect started
Definition: BrokerBase.hpp:150
helics::BrokerBase::BrokerState::CONNECTED_ERROR
@ CONNECTED_ERROR
error state but still connected
helics::BrokerBase::enteredExecutionMode
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:138
helics::GlobalBrokerId
Definition: GlobalFederateId.hpp:27
helics::BrokerBase::terminate_on_error
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:84
helics::BrokerBase::lastErrorCode
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:151
helics::BrokerBase::networkTimeout
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:64
HELICS_FLAG_FORCE_LOGGING_FLUSH
@ HELICS_FLAG_FORCE_LOGGING_FLUSH
Definition: helics_enums.h:171
helics::prettyPrintString
std::string prettyPrintString(const ActionMessage &command)
Definition: ActionMessage.cpp:846
helics::BrokerBase::processBaseCommands
std::pair< bool, std::vector< std::string_view > > processBaseCommands(ActionMessage &command)
Definition: BrokerBase.cpp:439
helics::isDisconnectCommand
bool isDisconnectCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:277
helics::BrokerBase::generateNewIdentifier
void generateNewIdentifier()
Definition: BrokerBase.cpp:370
helics::BrokerBase::processPriorityCommand
virtual void processPriorityCommand(ActionMessage &&command)=0
helics::BrokerBase::writeProfilingData
void writeProfilingData()
Definition: BrokerBase.cpp:385
helics::BrokerBase::addBaseInformation
void addBaseInformation(Json::Value &base, bool hasParent) const
Definition: BrokerBase.cpp:510
helics::BrokerBase::BrokerState::CREATED
@ CREATED
the broker has been created
helics::BrokerBase::saveProfilingData
void saveProfilingData(std::string_view message)
Definition: BrokerBase.cpp:376
helics::BrokerBase::setLogLevels
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:545
helics::BrokerBase::BrokerState::TERMINATED
@ TERMINATED
the termination process has started
helics::BrokerBase::BrokerState::ERRORED
@ ERRORED
an error was encountered
helics::BrokerBase::tickTimer
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:62
helics::BrokerBase::processDisconnect
virtual void processDisconnect(bool skipUnregister=false)=0
HELICS_ERROR_TERMINATED
@ HELICS_ERROR_TERMINATED
Definition: helics_enums.h:248
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::BrokerBase::debugging
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:86
helics::BrokerBase::queryTimeout
Time queryTimeout
Definition: BrokerBase.hpp:65
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:188
helics::checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:138
helics::WARNING
@ WARNING
print/log warning and errors
Definition: logging.hpp:25
helics::BrokerBase::setTickForwarding
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:858
helics::BrokerBase::restrictive_time_policy
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:82
helics::BrokerBase::waitingForBrokerPingReply
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:139
helics::BrokerBase::getLoggingCallback
std::function< void(int, std::string_view, std::string_view)> getLoggingCallback() const
Definition: BrokerBase.cpp:84
helics::TimeoutMonitor
Definition: TimeoutMonitor.h:27
helics::BrokerBase::grantTimeout
Time grantTimeout
timeout for triggering diagnostic action waiting for a time grant
Definition: BrokerBase.hpp:68
helics::BrokerBase::logFlush
void logFlush()
Definition: BrokerBase.cpp:537
helics::PROFILING
@ PROFILING
profiling log level
Definition: logging.hpp:24
helics::BrokerBase::isReasonForTick
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:230
helics::BrokerBase::BrokerState::OPERATING
@ OPERATING
normal operating conditions
helics::BrokerBase::observer
bool observer
flag indicating that the broker is an observer only
Definition: BrokerBase.hpp:88
helics::BrokerBase::actionQueue
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:109
helics::error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:17
helics::BrokerBase::minFederateCount
int32_t minFederateCount
Definition: BrokerBase.hpp:54
helics::BrokerBase::BrokerState::TERMINATING_ERROR
@ TERMINATING_ERROR
the termination process has started while in an error state
flagOperations.hpp
helics::BrokerBase::identifier
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:70
helics::addJsonConfig
HelicsConfigJSON * addJsonConfig(CLI::App *app)
Add the HELICS JSON configuration processor to the app.
Definition: helicsCLI11JsonConfig.cpp:96
helics::BrokerBase
Definition: BrokerBase.hpp:42
helics::BrokerBase::setErrorState
void setErrorState(int eCode, std::string_view estring)
Definition: BrokerBase.cpp:396
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
BrokerBase.hpp
helics::BrokerBase::address
std::string address
network location of the broker
Definition: BrokerBase.hpp:76
HELICS_FLAG_DUMPLOG
@ HELICS_FLAG_DUMPLOG
Definition: helics_enums.h:173
helics::BrokerBase::global_broker_id_local
GlobalBrokerId global_broker_id_local
Definition: BrokerBase.hpp:47
helics::BrokerBase::errorTimeStart
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started; related to the errorDelay
Definition: BrokerBase.hpp:148