helics  2.8.1
BrokerBase.hpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
14 #include "ActionMessage.hpp"
15 #include "federate_id_extra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
17 
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 namespace spdlog {
26 class logger;
27 }
28 namespace helics {
29 class ForwardingTimeCoordinator;
30 class helicsCLI11App;
31 class ProfilerBuffer;
34 class BrokerBase {
35  protected:
36  std::atomic<global_broker_id> global_id{
41  std::atomic<int32_t> maxLogLevel{
42  1};
43  int32_t consoleLogLevel{1};
44  int32_t fileLogLevel{1};
46  1};
47  int32_t minBrokerCount{
48  0};
49  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
50  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
51  int32_t maxIterationCount{10000};
52  Time tickTimer{5.0};
53  Time timeout{30.0};
56  Time errorDelay{10.0};
58  std::string identifier;
59  std::string brokerKey;
60  // address is mutable since during initial phases it may not be fixed so to maintain a
62  // consistent public interface for extracting it this variable may need to be updated in a
63  // constant function
64  mutable std::string address;
65  std::shared_ptr<spdlog::logger>
67  std::shared_ptr<spdlog::logger>
69  std::thread queueProcessingThread;
70 
71  std::function<void(int, const std::string&, const std::string&)> loggerFunction;
72 
73  std::atomic<bool> haltOperations{
74  false};
76  false};
78  false};
79  bool debugging{false};
80  private:
81  std::atomic<bool> mainLoopIsRunning{
82  false};
83  bool dumplog{false};
84  std::atomic<bool> forceLoggingFlush{false};
85  bool queueDisabled{
86  false};
87  bool disable_timer{false};
89  std::atomic<std::size_t> messageCounter{
90  0};
91  protected:
92  std::string logFile;
93  std::unique_ptr<ForwardingTimeCoordinator> timeCoord;
94  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
95  // time coordinator for managing filters
96  // std::unique_ptr<TimeCoordinator> filterTimeCoord;
97  // global_federate_id filterFedID;
99  enum class broker_state_t : int16_t {
100  created = -6,
101  configuring = -5,
102  configured = -4,
103  connecting = -3,
104  connected = -2,
105  initializing = -1,
106  operating = 0,
107  terminating = 1,
108  terminated = 3,
109  errored = 7,
110  };
111 
112  enum class TickForwardingReasons : uint32_t {
113  none = 0,
114  no_comms = 0x01,
115  ping_response = 0x02,
116  query_timeout = 0x04
117  };
118  bool noAutomaticID{false};
119  bool hasTimeDependency{false};
121  false};
123  bool hasFilters{false};
124 
125  bool no_ping{false};
126  bool uuid_like{false};
127 
128  bool useJsonSerialization{false};
129  bool enable_profiling{false};
130  decltype(std::chrono::steady_clock::now())
132  std::atomic<int> lastErrorCode{0};
133  std::string lastErrorString;
134  private:
136  std::shared_ptr<ProfilerBuffer> prBuff;
137 
139  bool forwardTick{false};
141  uint32_t forwardingReasons{0U};
143  std::atomic<broker_state_t> brokerState{broker_state_t::created};
144 
145  public:
146  explicit BrokerBase(bool DisableQueue = false) noexcept;
147  explicit BrokerBase(const std::string& broker_name, bool DisableQueue = false);
148 
149  virtual ~BrokerBase();
153  int parseArgs(int argc, char* argv[]);
157  int parseArgs(std::vector<std::string> args);
161  int parseArgs(const std::string& initializationString);
164  virtual void configureBase();
165 
167  void addActionMessage(const ActionMessage& m);
169  void addActionMessage(ActionMessage&& m);
170 
176  void setLoggerFunction(
177  std::function<void(int, const std::string&, const std::string&)> logFunction);
179  void logFlush();
181  bool isRunning() const { return mainLoopIsRunning.load(); }
183  void setLogLevel(int32_t level);
188  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
190  global_broker_id getGlobalId() const { return global_id.load(); }
191 
192  private:
194  void queueProcessingLoop();
197  action_message_def::action_t commandProcessor(ActionMessage& command);
198 
200  std::shared_ptr<helicsCLI11App> generateBaseCLI();
202  void generateLoggers();
204  void baseConfigure(ActionMessage& command);
205 
206  protected:
208  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
209  {
210  return ((static_cast<std::uint32_t>(reason) & code) != 0);
211  }
213  void setTickForwarding(TickForwardingReasons reason, bool value = true);
214  broker_state_t getBrokerState() const { return brokerState.load(); }
215  bool setBrokerState(broker_state_t newState);
216  bool transitionBrokerState(broker_state_t expectedState, broker_state_t newState);
218  virtual void processDisconnect(bool skipUnregister = false) = 0;
221  virtual bool tryReconnect() = 0;
224  virtual void processCommand(ActionMessage&& cmd) = 0;
230  virtual void processPriorityCommand(ActionMessage&& command) = 0;
231 
235  virtual bool sendToLogger(global_federate_id federateID,
236  int logLevel,
237  const std::string& name,
238  const std::string& message) const;
239 
241  void saveProfilingData(const std::string& message);
243  void writeProfilingData();
244 
246  void generateNewIdentifier();
248  virtual std::string generateLocalAddressString() const = 0;
250  virtual std::shared_ptr<helicsCLI11App> generateCLI();
252  void setErrorState(int eCode, const std::string& estring);
254  void setLoggingFile(const std::string& lfile);
256  bool getFlagValue(int32_t flag) const;
257 
258  public:
260  std::function<void(int, const std::string&, const std::string&)> getLoggingCallback() const;
262  void joinAllThreads();
264  std::size_t currentMessageCounter() const
265  {
266  return messageCounter.load(std::memory_order_acquire);
267  }
268  friend class TimeoutMonitor;
269  friend const std::string& brokerStateName(broker_state_t state);
270 };
271 
274 const std::string& brokerStateName(BrokerBase::broker_state_t state);
275 } // namespace helics
helics::timeZero
constexpr Time timeZero
Definition: helics-time.hpp:31
helics::BrokerBase::joinAllThreads
void joinAllThreads()
Definition: BrokerBase.cpp:91
helics::BrokerBase::broker_state_t::configuring
@ configuring
the broker is in the processing of configuring
helics::BrokerBase::broker_state_t::terminated
@ terminated
the termination process has started
helics::BrokerBase::isRunning
bool isRunning() const
Definition: BrokerBase.hpp:181
helics::BrokerBase::minBrokerCount
int32_t minBrokerCount
the minimum number of brokers that must connect before entering init mode
Definition: BrokerBase.hpp:47
helics::BrokerBase::setLoggerFunction
void setLoggerFunction(std::function< void(int, const std::string &, const std::string &)> logFunction)
Definition: BrokerBase.cpp:513
helics::BrokerBase::broker_state_t::errored
@ errored
an error was encountered
helics::BrokerBase::useJsonSerialization
bool useJsonSerialization
Definition: BrokerBase.hpp:128
helics::BrokerBase::brokerKey
std::string brokerKey
Definition: BrokerBase.hpp:59
helics::BrokerBase::timeout
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:53
AsioContextManager::getContextPointer
static std::shared_ptr< AsioContextManager > getContextPointer(const std::string &contextName=std::string())
Definition: AsioContextManager.cpp:38
helics::BrokerBase::noAutomaticID
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:118
helics::BrokerBase::enable_profiling
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:129
helics::action_message_def::action_t
action_t
Definition: ActionMessageDefintions.hpp:20
helics::BrokerBase::tryReconnect
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:636
helics::global_broker_id
Definition: global_federate_id.hpp:26
helics::BrokerBase::hasTimeDependency
bool hasTimeDependency
set to true if the broker has Time dependencies
Definition: BrokerBase.hpp:119
helics_log_level_summary
@ helics_log_level_summary
Definition: helics_enums.h:171
helics::BrokerBase::broker_state_t::operating
@ operating
normal operating conditions
helics::BrokerBase::getFlagValue
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:501
warning
@ warning
print/log warning and errors
Definition: loggingHelper.hpp:23
helics::global_federate_id
Definition: global_federate_id.hpp:68
helics::BrokerBase::queueProcessingThread
std::thread queueProcessingThread
thread for running the broker
Definition: BrokerBase.hpp:69
helics::BrokerBase::getGlobalId
global_broker_id getGlobalId() const
Definition: BrokerBase.hpp:190
helics::BrokerBase::broker_state_t::terminating
@ terminating
the termination process has started
helics::parent_broker_id
constexpr global_broker_id parent_broker_id
Definition: global_federate_id.hpp:60
helics::BrokerBase::setErrorState
void setErrorState(int eCode, const std::string &estring)
Definition: BrokerBase.cpp:468
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helics-time.hpp:27
helics::BrokerBase::hasFilters
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:123
loggingHelper.hpp
helics::BrokerBase::broker_state_t
broker_state_t
Definition: BrokerBase.hpp:99
helics::ActionMessage
Definition: ActionMessage.hpp:29
helics::BrokerBase::maxIterationCount
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:51
helics::BrokerBase::uuid_like
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:126
helics::BrokerBase::fileLogLevel
int32_t fileLogLevel
the logging level for logging to a file
Definition: BrokerBase.hpp:44
helics::BrokerBase::parseArgs
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:299
helics_flag_force_logging_flush
@ helics_flag_force_logging_flush
Definition: helics_enums.h:155
helics::BrokerBase::haltOperations
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:73
helics::BrokerBase::maxLogLevel
std::atomic< int32_t > maxLogLevel
the logging level to use levels >=this will be ignored
Definition: BrokerBase.hpp:41
helics_log_level_warning
@ helics_log_level_warning
Definition: helics_enums.h:169
helics_flag_dumplog
@ helics_flag_dumplog
Definition: helics_enums.h:157
helics::BrokerBase::configureBase
virtual void configureBase()
Definition: BrokerBase.cpp:326
helics_log_level_trace
@ helics_log_level_trace
Definition: helics_enums.h:182
helics::BrokerBase::timeCoord
std::unique_ptr< ForwardingTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:93
helics::BrokerBase::broker_state_t::configured
@ configured
the broker itself has been configured and is ready to connect
checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:75
helics::BrokerBase::no_ping
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:125
helics::BrokerBase::generateCLI
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:99
helics::BrokerBase::lastErrorString
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:133
helics_log_level_connections
@ helics_log_level_connections
Definition: helics_enums.h:174
helics_log_level_error
@ helics_log_level_error
Definition: helics_enums.h:167
helics::BrokerBase::setLogLevel
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:519
helics::BrokerBase::addActionMessage
void addActionMessage(const ActionMessage &m)
Definition: BrokerBase.cpp:544
helics::BrokerBase::errorDelay
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:57
helics::BrokerBase::getLoggingCallback
std::function< void(int, const std::string &, const std::string &)> getLoggingCallback() const
Definition: BrokerBase.cpp:84
helics::BrokerBase::generateLocalAddressString
virtual std::string generateLocalAddressString() const =0
helics::BrokerBase::broker_state_t::connecting
@ connecting
the connection process has started
helics::BrokerBase::broker_state_t::connected
@ connected
the connection process has completed
helics::BrokerBase::processCommand
virtual void processCommand(ActionMessage &&cmd)=0
helics_log_level_interfaces
@ helics_log_level_interfaces
Definition: helics_enums.h:176
helics::BrokerBase::broker_state_t::created
@ created
the broker has been created
helics::global_federate_id::baseValue
constexpr base_type baseValue() const
Definition: global_federate_id.hpp:83
helics::BrokerBase::global_id
std::atomic< global_broker_id > global_id
the unique identifier for the broker(core or broker)
Definition: BrokerBase.hpp:36
helics::BrokerBase::broker_state_t::initializing
@ initializing
the enter initialization process has started
helics::BrokerBase::brokerStateName
const friend std::string & brokerStateName(broker_state_t state)
Definition: BrokerBase.cpp:918
helics::BrokerBase::global_broker_id_local
global_broker_id global_broker_id_local
Definition: BrokerBase.hpp:38
helics::BrokerBase::currentMessageCounter
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:264
indicator_flag
@ indicator_flag
flag used for setting values
Definition: flagOperations.hpp:21
helics_log_level_data
@ helics_log_level_data
Definition: helics_enums.h:180
fed
@ fed
special logging command for message coming from a fed
Definition: loggingHelper.hpp:32
helics::BrokerBase::fileLogger
std::shared_ptr< spdlog::logger > fileLogger
default logging object to use if the logging callback is not specified
Definition: BrokerBase.hpp:68
helics::BrokerBase::enteredExecutionMode
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:120
setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:67
helics::BrokerBase::terminate_on_error
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:77
helics::BrokerBase::lastErrorCode
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:132
helics::BrokerBase::networkTimeout
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:54
helics::prettyPrintString
std::string prettyPrintString(const ActionMessage &command)
Definition: ActionMessage.cpp:861
helics::brokerStateName
const std::string & brokerStateName(BrokerBase::broker_state_t state)
Definition: BrokerBase.cpp:918
helics::isDisconnectCommand
bool isDisconnectCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:276
trace
@ trace
trace level printing (all processed messages)
Definition: loggingHelper.hpp:31
helics::BrokerBase::generateNewIdentifier
void generateNewIdentifier()
Definition: BrokerBase.cpp:442
helics::BrokerBase::processPriorityCommand
virtual void processPriorityCommand(ActionMessage &&command)=0
helics::BrokerBase::writeProfilingData
void writeProfilingData()
Definition: BrokerBase.cpp:457
error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
helics_log_level_no_print
@ helics_log_level_no_print
Definition: helics_enums.h:165
helics::BrokerBase::setLoggingFile
void setLoggingFile(const std::string &lfile)
Definition: BrokerBase.cpp:486
helics::BrokerBase::setLogLevels
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:537
helics_log_level_timing
@ helics_log_level_timing
Definition: helics_enums.h:178
helics::BrokerBase::tickTimer
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:52
helics::BrokerBase::processDisconnect
virtual void processDisconnect(bool skipUnregister=false)=0
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::BrokerBase::debugging
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:79
helics::BrokerBase::queryTimeout
Time queryTimeout
Definition: BrokerBase.hpp:55
helics::BrokerBase::consoleLogLevel
int32_t consoleLogLevel
the logging level for console display
Definition: BrokerBase.hpp:43
helics::BrokerBase::loggerFunction
std::function< void(int, const std::string &, const std::string &)> loggerFunction
Definition: BrokerBase.hpp:71
helics::BrokerBase::setTickForwarding
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:838
helics::BrokerBase::restrictive_time_policy
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:75
helics::BrokerBase::waitingForBrokerPingReply
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:122
helics::TimeoutMonitor
Definition: TimeoutMonitor.h:27
helics::BrokerBase::sendToLogger
virtual bool sendToLogger(global_federate_id federateID, int logLevel, const std::string &name, const std::string &message) const
Definition: BrokerBase.cpp:359
helics::BrokerBase::logFlush
void logFlush()
Definition: BrokerBase.cpp:524
helics::BrokerBase::isReasonForTick
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:208
helics::BrokerBase::consoleLogger
std::shared_ptr< spdlog::logger > consoleLogger
default logging object to use if the logging callback is not specified
Definition: BrokerBase.hpp:66
helics::BrokerBase::higher_broker_id
global_broker_id higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:40
helics::BrokerBase::saveProfilingData
void saveProfilingData(const std::string &message)
Definition: BrokerBase.cpp:448
helics::BrokerBase::actionQueue
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:94
helics::BrokerBase::minFederateCount
int32_t minFederateCount
the minimum number of federates that must connect before entering init mode
Definition: BrokerBase.hpp:45
flagOperations.hpp
helics::BrokerBase::identifier
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:58
helics::addJsonConfig
HelicsConfigJSON * addJsonConfig(CLI::App *app)
Add the HELICS JSON configuration processor to the app.
Definition: helicsCLI11JsonConfig.cpp:96
helics::BrokerBase
Definition: BrokerBase.hpp:34
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
helics::BrokerBase::logFile
std::string logFile
the file to log message to
Definition: BrokerBase.hpp:92
BrokerBase.hpp
helics::BrokerBase::address
std::string address
network location of the broker
Definition: BrokerBase.hpp:64
helics::BrokerBase::errorTimeStart
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started related to the errorDelay
Definition: BrokerBase.hpp:131