helics  3.0.1
BrokerBase.hpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
14 #include "ActionMessage.hpp"
15 #include "FederateIdExtra.hpp"
16 #include "gmlc/containers/BlockingPriorityQueue.hpp"
17 
18 #include <atomic>
19 #include <limits>
20 #include <memory>
21 #include <string>
22 #include <thread>
23 #include <vector>
24 
25 namespace spdlog {
26 class logger;
27 }
28 namespace helics {
29 class ForwardingTimeCoordinator;
30 class helicsCLI11App;
31 class ProfilerBuffer;
34 class BrokerBase {
35  protected:
36  static constexpr double mInvalidSimulationTime{-98763.2};
38  std::atomic<GlobalBrokerId> global_id{parent_broker_id};
42 
43  std::atomic<int32_t> maxLogLevel{HELICS_LOG_LEVEL_WARNING};
46 
47  int32_t minFederateCount{1};
49  int32_t minBrokerCount{0};
50  int32_t maxFederateCount{(std::numeric_limits<int32_t>::max)()};
51  int32_t maxBrokerCount{(std::numeric_limits<int32_t>::max)()};
53  int32_t minChildCount{0};
54  int32_t maxIterationCount{10000};
55  Time tickTimer{5.0};
56  Time timeout{30.0};
59  Time errorDelay{10.0};
62  std::string identifier;
63  std::string brokerKey;
64  // address is mutable since during initial phases it may not be fixed so to maintain a
66  // consistent public interface for extracting it this variable may need to be updated in a
67  // constant function
68  mutable std::string address;
69  std::shared_ptr<spdlog::logger>
71  std::shared_ptr<spdlog::logger>
73  std::thread queueProcessingThread;
74 
75  std::function<void(int, std::string_view, std::string_view)> loggerFunction;
77  std::atomic<bool> haltOperations{false};
81  bool terminate_on_error{false};
83  bool debugging{false};
85  bool observer{false};
86 
87  private:
88  std::atomic<bool> mainLoopIsRunning{
89  false};
90  bool dumplog{false};
91  std::atomic<bool> forceLoggingFlush{false};
92  bool queueDisabled{
93  false};
94  bool disable_timer{false};
96  std::atomic<std::size_t> messageCounter{0};
98 
99  protected:
100  std::string logFile;
101  std::unique_ptr<ForwardingTimeCoordinator> timeCoord;
102  gmlc::containers::BlockingPriorityQueue<ActionMessage> actionQueue;
103  // time coordinator for managing filters
104  // std::unique_ptr<TimeCoordinator> filterTimeCoord;
105  // global_federate_id filterFedID;
107  enum class BrokerState : int16_t {
108  created = -6,
109  configuring = -5,
110  configured = -4,
111  connecting = -3,
112  connected = -2,
113  initializing = -1,
114  operating = 0,
115  terminating = 1,
116  terminated = 3,
117  errored = 7,
118  };
119 
120  enum class TickForwardingReasons : uint32_t {
121  NONE = 0,
122  NO_COMMS = 0x01,
123  PING_RESPONSE = 0x02,
124  QUERY_TIMEOUT = 0x04
125  };
126  bool noAutomaticID{false};
127  bool hasTimeDependency{false};
129  false};
131  bool hasFilters{false};
132 
133  bool no_ping{false};
134  bool uuid_like{false};
135 
136  bool useJsonSerialization{false};
137  bool enable_profiling{false};
138  decltype(std::chrono::steady_clock::now())
140  std::atomic<int> lastErrorCode{0};
141  std::string lastErrorString;
142  private:
144  std::shared_ptr<ProfilerBuffer> prBuff;
145 
147  bool forwardTick{false};
149  uint32_t forwardingReasons{0U};
151  std::atomic<BrokerState> brokerState{BrokerState::created};
152 
153  public:
154  explicit BrokerBase(bool DisableQueue = false) noexcept;
155  explicit BrokerBase(const std::string& broker_name, bool DisableQueue = false);
156 
157  virtual ~BrokerBase();
161  int parseArgs(int argc, char* argv[]);
165  int parseArgs(std::vector<std::string> args);
169  int parseArgs(const std::string& initializationString);
172  virtual void configureBase();
173 
175  void addActionMessage(const ActionMessage& m);
177  void addActionMessage(ActionMessage&& m);
178 
184  void setLoggerFunction(
185  std::function<void(int level, std::string_view identifier, std::string_view message)>
186  logFunction);
188  void logFlush();
190  bool isRunning() const { return mainLoopIsRunning.load(); }
192  void setLogLevel(int32_t level);
197  void setLogLevels(int32_t consoleLevel, int32_t fileLevel);
199  GlobalBrokerId getGlobalId() const { return global_id.load(); }
200 
201  private:
203  void queueProcessingLoop();
206  action_message_def::action_t commandProcessor(ActionMessage& command);
207 
209  std::shared_ptr<helicsCLI11App> generateBaseCLI();
211  void generateLoggers();
213  void baseConfigure(ActionMessage& command);
214 
215  protected:
217  static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
218  {
219  return ((static_cast<std::uint32_t>(reason) & code) != 0);
220  }
222  void setTickForwarding(TickForwardingReasons reason, bool value = true);
223  BrokerState getBrokerState() const { return brokerState.load(); }
224  bool setBrokerState(BrokerState newState);
225  bool transitionBrokerState(BrokerState expectedState, BrokerState newState);
227  virtual void processDisconnect(bool skipUnregister = false) = 0;
230  virtual bool tryReconnect() = 0;
233  virtual void processCommand(ActionMessage&& cmd) = 0;
239  virtual void processPriorityCommand(ActionMessage&& command) = 0;
240 
244  virtual bool sendToLogger(GlobalFederateId federateID,
245  int logLevel,
246  std::string_view name,
247  std::string_view message) const;
249  void saveProfilingData(std::string_view message);
251  void writeProfilingData();
253  void generateNewIdentifier();
255  virtual std::string generateLocalAddressString() const = 0;
257  virtual std::shared_ptr<helicsCLI11App> generateCLI();
259  void setErrorState(int eCode, std::string_view estring);
261  void setLoggingFile(const std::string& lfile);
263  bool getFlagValue(int32_t flag) const;
265  virtual double getSimulationTime() const { return mInvalidSimulationTime; }
266 
267  public:
269  std::function<void(int, const std::string&, const std::string&)> getLoggingCallback() const;
271  void joinAllThreads();
273  std::size_t currentMessageCounter() const
274  {
275  return messageCounter.load(std::memory_order_acquire);
276  }
277  friend class TimeoutMonitor;
278  friend const std::string& brokerStateName(BrokerState state);
279 };
280 
283 const std::string& brokerStateName(BrokerBase::BrokerState state);
284 
285 } // namespace helics
helics::timeZero
constexpr Time timeZero
Definition: helicsTime.hpp:31
SUMMARY
@ SUMMARY
print/log summary information
Definition: loggingHelper.hpp:25
helics::BrokerBase::joinAllThreads
void joinAllThreads()
Definition: BrokerBase.cpp:91
TIMING
@ TIMING
print interfaces+ timing(exec/grant/disconnect)
Definition: loggingHelper.hpp:30
helics::BrokerBase::setLoggerFunction
void setLoggerFunction(std::function< void(int level, std::string_view identifier, std::string_view message)> logFunction)
Definition: BrokerBase.cpp:541
helics::BrokerBase::isRunning
bool isRunning() const
Definition: BrokerBase.hpp:190
helics::GlobalFederateId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:80
helics::BrokerBase::minBrokerCount
int32_t minBrokerCount
Definition: BrokerBase.hpp:49
helics::BrokerBase::higher_broker_id
GlobalBrokerId higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:41
helics::BrokerBase::useJsonSerialization
bool useJsonSerialization
Definition: BrokerBase.hpp:136
helics::BrokerBase::brokerKey
std::string brokerKey
Definition: BrokerBase.hpp:63
helics::BrokerBase::timeout
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:56
helics::BrokerBase::sendToLogger
virtual bool sendToLogger(GlobalFederateId federateID, int logLevel, std::string_view name, std::string_view message) const
Definition: BrokerBase.cpp:396
AsioContextManager::getContextPointer
static std::shared_ptr< AsioContextManager > getContextPointer(const std::string &contextName=std::string())
Definition: AsioContextManager.cpp:38
helics::BrokerBase::noAutomaticID
bool noAutomaticID
the broker should not automatically generate an ID
Definition: BrokerBase.hpp:126
helics::BrokerBase::enable_profiling
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:137
helics::action_message_def::action_t
action_t
Definition: ActionMessageDefintions.hpp:20
HELICS_LOG_LEVEL_NO_PRINT
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:179
HELICS_LOG_LEVEL_CONNECTIONS
@ HELICS_LOG_LEVEL_CONNECTIONS
Definition: helics_enums.h:190
helics::BrokerBase::tryReconnect
virtual bool tryReconnect()=0
Definition: BrokerBase.cpp:664
helics::BrokerBase::getGlobalId
GlobalBrokerId getGlobalId() const
Definition: BrokerBase.hpp:199
helics::BrokerBase::hasTimeDependency
bool hasTimeDependency
set to true if the broker has Time dependencies
Definition: BrokerBase.hpp:127
helics::BrokerBase::getFlagValue
bool getFlagValue(int32_t flag) const
Definition: BrokerBase.cpp:529
HELICS_LOG_LEVEL_WARNING
@ HELICS_LOG_LEVEL_WARNING
Definition: helics_enums.h:185
helics::BrokerBase::queueProcessingThread
std::thread queueProcessingThread
thread for running the broker
Definition: BrokerBase.hpp:73
FED
@ FED
special logging command for message coming from a fed
Definition: loggingHelper.hpp:34
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
helics::BrokerBase::hasFilters
bool hasFilters
flag indicating filters come through the broker
Definition: BrokerBase.hpp:131
loggingHelper.hpp
helics::BrokerBase::global_id
std::atomic< GlobalBrokerId > global_id
Definition: BrokerBase.hpp:38
helics::GlobalFederateId
Definition: GlobalFederateId.hpp:68
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::brokerStateName
const std::string & brokerStateName(BrokerBase::BrokerState state)
Definition: BrokerBase.cpp:946
helics::BrokerBase::maxIterationCount
int32_t maxIterationCount
the maximum number of iterative loops that are allowed
Definition: BrokerBase.hpp:54
helics::BrokerBase::uuid_like
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:134
helics::parent_broker_id
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:60
helics::BrokerBase::fileLogLevel
int32_t fileLogLevel
the logging level for logging to a file
Definition: BrokerBase.hpp:45
helics::BrokerBase::getSimulationTime
virtual double getSimulationTime() const
Definition: BrokerBase.hpp:265
helics::BrokerBase::brokerStateName
const friend std::string & brokerStateName(BrokerState state)
Definition: BrokerBase.cpp:946
helics::BrokerBase::parseArgs
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:312
helics::BrokerBase::haltOperations
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:77
helics::BrokerBase::maxLogLevel
std::atomic< int32_t > maxLogLevel
Definition: BrokerBase.hpp:43
helics::BrokerBase::configureBase
virtual void configureBase()
Definition: BrokerBase.cpp:339
ERROR_LEVEL
@ ERROR_LEVEL
only print errors
Definition: loggingHelper.hpp:22
helics::BrokerBase::timeCoord
std::unique_ptr< ForwardingTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:101
checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:85
helics::BrokerBase::minChildCount
int32_t minChildCount
Definition: BrokerBase.hpp:53
helics::BrokerBase::no_ping
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:133
helics::BrokerBase::generateCLI
virtual std::shared_ptr< helicsCLI11App > generateCLI()
Definition: BrokerBase.cpp:99
HELICS_LOG_LEVEL_DEBUG
@ HELICS_LOG_LEVEL_DEBUG
Definition: helics_enums.h:198
helics::BrokerBase::lastErrorString
std::string lastErrorString
storage for last error string
Definition: BrokerBase.hpp:141
helics::BrokerBase::setLogLevel
void setLogLevel(int32_t level)
Definition: BrokerBase.cpp:547
helics::BrokerBase::addActionMessage
void addActionMessage(const ActionMessage &m)
Definition: BrokerBase.cpp:572
helics::BrokerBase::errorDelay
Time errorDelay
time to delay before terminating after error state
Definition: BrokerBase.hpp:60
helics::BrokerBase::getLoggingCallback
std::function< void(int, const std::string &, const std::string &)> getLoggingCallback() const
Definition: BrokerBase.cpp:84
helics::BrokerBase::generateLocalAddressString
virtual std::string generateLocalAddressString() const =0
helics::BrokerBase::BrokerState
BrokerState
Definition: BrokerBase.hpp:107
helics::BrokerBase::processCommand
virtual void processCommand(ActionMessage &&cmd)=0
helics::BrokerBase::currentMessageCounter
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:273
indicator_flag
@ indicator_flag
flag used for setting values
Definition: flagOperations.hpp:21
helics::BrokerBase::fileLogger
std::shared_ptr< spdlog::logger > fileLogger
default logging object to use if the logging callback is not specified
Definition: BrokerBase.hpp:72
helics::BrokerBase::enteredExecutionMode
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:128
helics::GlobalBrokerId
Definition: GlobalFederateId.hpp:26
setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:77
helics::BrokerBase::terminate_on_error
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:81
helics::BrokerBase::lastErrorCode
std::atomic< int > lastErrorCode
storage for last error code
Definition: BrokerBase.hpp:140
helics::BrokerBase::networkTimeout
Time networkTimeout
timeout to establish a socket connection before giving up
Definition: BrokerBase.hpp:57
HELICS_FLAG_FORCE_LOGGING_FLUSH
@ HELICS_FLAG_FORCE_LOGGING_FLUSH
Definition: helics_enums.h:166
helics::prettyPrintString
std::string prettyPrintString(const ActionMessage &command)
Definition: ActionMessage.cpp:841
HELICS_LOG_LEVEL_DATA
@ HELICS_LOG_LEVEL_DATA
Definition: helics_enums.h:196
helics::isDisconnectCommand
bool isDisconnectCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:276
helics::BrokerBase::generateNewIdentifier
void generateNewIdentifier()
Definition: BrokerBase.cpp:470
PROFILING
@ PROFILING
profiling log level
Definition: loggingHelper.hpp:23
helics::BrokerBase::processPriorityCommand
virtual void processPriorityCommand(ActionMessage &&command)=0
helics::BrokerBase::writeProfilingData
void writeProfilingData()
Definition: BrokerBase.cpp:485
helics::BrokerBase::BrokerState::configured
@ configured
the broker itself has been configured and is ready to connect
helics::BrokerBase::loggerFunction
std::function< void(int, std::string_view, std::string_view)> loggerFunction
Definition: BrokerBase.hpp:75
error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
helics::BrokerBase::saveProfilingData
void saveProfilingData(std::string_view message)
Definition: BrokerBase.cpp:476
helics::BrokerBase::setLoggingFile
void setLoggingFile(const std::string &lfile)
Definition: BrokerBase.cpp:514
helics::BrokerBase::BrokerState::configuring
@ configuring
the broker is in the processing of configuring
helics::BrokerBase::setLogLevels
void setLogLevels(int32_t consoleLevel, int32_t fileLevel)
Definition: BrokerBase.cpp:565
helics::BrokerBase::tickTimer
Time tickTimer
the length of each heartbeat tick
Definition: BrokerBase.hpp:55
helics::BrokerBase::processDisconnect
virtual void processDisconnect(bool skipUnregister=false)=0
helics::BrokerBase::BrokerState::terminated
@ terminated
the termination process has started
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::BrokerBase::debugging
bool debugging
flag indicating operation in a user debugging mode
Definition: BrokerBase.hpp:83
helics::BrokerBase::BrokerState::connecting
@ connecting
the connection process has started
HELICS_LOG_LEVEL_SUMMARY
@ HELICS_LOG_LEVEL_SUMMARY
Definition: helics_enums.h:187
helics::BrokerBase::queryTimeout
Time queryTimeout
Definition: BrokerBase.hpp:58
helics::BrokerBase::consoleLogLevel
int32_t consoleLogLevel
the logging level for console display
Definition: BrokerBase.hpp:44
helics::BrokerBase::BrokerState::terminating
@ terminating
the termination process has started
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:181
helics::BrokerBase::BrokerState::connected
@ connected
the connection process has completed
helics::BrokerBase::setTickForwarding
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:866
helics::BrokerBase::restrictive_time_policy
bool restrictive_time_policy
flag indicating the broker should use a conservative time policy
Definition: BrokerBase.hpp:79
helics::BrokerBase::waitingForBrokerPingReply
bool waitingForBrokerPingReply
flag indicating we are waiting for a ping reply
Definition: BrokerBase.hpp:130
helics::TimeoutMonitor
Definition: TimeoutMonitor.h:27
helics::BrokerBase::grantTimeout
Time grantTimeout
timeout for triggering diagnostic action waiting for a time grant
Definition: BrokerBase.hpp:61
helics::BrokerBase::logFlush
void logFlush()
Definition: BrokerBase.cpp:552
helics::BrokerBase::isReasonForTick
static bool isReasonForTick(std::uint32_t code, TickForwardingReasons reason)
Definition: BrokerBase.hpp:217
helics::BrokerBase::consoleLogger
std::shared_ptr< spdlog::logger > consoleLogger
default logging object to use if the logging callback is not specified
Definition: BrokerBase.hpp:70
helics::BrokerBase::observer
bool observer
flag indicating that the broker is an observer only
Definition: BrokerBase.hpp:85
TRACE
@ TRACE
trace level printing (all processed messages)
Definition: loggingHelper.hpp:33
helics::BrokerBase::BrokerState::operating
@ operating
normal operating conditions
WARNING
@ WARNING
print/log warning and errors
Definition: loggingHelper.hpp:24
helics::BrokerBase::actionQueue
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:102
HELICS_LOG_LEVEL_TRACE
@ HELICS_LOG_LEVEL_TRACE
Definition: helics_enums.h:200
helics::BrokerBase::minFederateCount
int32_t minFederateCount
Definition: BrokerBase.hpp:47
flagOperations.hpp
helics::BrokerBase::BrokerState::initializing
@ initializing
the enter initialization process has started
helics::BrokerBase::identifier
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:62
HELICS_LOG_LEVEL_TIMING
@ HELICS_LOG_LEVEL_TIMING
Definition: helics_enums.h:194
HELICS_LOG_LEVEL_INTERFACES
@ HELICS_LOG_LEVEL_INTERFACES
Definition: helics_enums.h:192
helics::addJsonConfig
HelicsConfigJSON * addJsonConfig(CLI::App *app)
Add the HELICS JSON configuration processor to the app.
Definition: helicsCLI11JsonConfig.cpp:96
helics::BrokerBase
Definition: BrokerBase.hpp:34
helics::BrokerBase::setErrorState
void setErrorState(int eCode, std::string_view estring)
Definition: BrokerBase.cpp:496
helics::isPriorityCommand
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
helics::BrokerBase::logFile
std::string logFile
the file to log message to
Definition: BrokerBase.hpp:100
helics::BrokerBase::BrokerState::errored
@ errored
an error was encountered
helics::BrokerBase::BrokerState::created
@ created
the broker has been created
BrokerBase.hpp
helics::BrokerBase::address
std::string address
network location of the broker
Definition: BrokerBase.hpp:68
HELICS_FLAG_DUMPLOG
@ HELICS_FLAG_DUMPLOG
Definition: helics_enums.h:168
helics::BrokerBase::global_broker_id_local
GlobalBrokerId global_broker_id_local
Definition: BrokerBase.hpp:39
helics::BrokerBase::errorTimeStart
decltype(std::chrono::steady_clock::now()) errorTimeStart
time when the error condition started related to the errorDelay
Definition: BrokerBase.hpp:139