 |
helics
2.8.1
|
9 #include "../common/JsonBuilder.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
14 #include "HandleManager.hpp"
15 #include "TimeDependencies.hpp"
16 #include "UnknownHandleManager.hpp"
17 #include "federate_id_extra.hpp"
18 #include "gmlc/concurrency/DelayedObjects.hpp"
19 #include "gmlc/concurrency/TriggerVariable.hpp"
20 #include "gmlc/containers/AirLock.hpp"
21 #include "gmlc/containers/DualMappedVector.hpp"
22 #include "gmlc/containers/SimpleQueue.hpp"
23 #include "helics/external/any.hpp"
34 #include <unordered_map>
46 request_disconnect = 48,
58 bool nonCounting{
false};
59 explicit BasicFedInfo(
const std::string& fedname):
name(fedname) {}
72 connection_state::connected};
86 class TimeCoordinator;
98 std::atomic<bool> _isRoot{
false};
100 bool connectionEstablished{
false};
102 gmlc::containers::DualMappedVector<BasicFedInfo, std::string, global_federate_id>
104 gmlc::containers::DualMappedVector<BasicBrokerInfo, std::string, global_broker_id>
107 previous_local_broker_identifier;
109 HandleManager handles;
110 UnknownHandleManager unknownHandles;
111 std::vector<std::pair<std::string, global_federate_id>>
113 std::unordered_map<global_federate_id, local_federate_id>
114 global_id_translation;
115 std::unordered_map<global_federate_id, route_id>
117 std::unordered_map<std::string, route_id>
118 knownExternalEndpoints;
119 std::unordered_map<std::string, std::string> global_values;
121 std::mutex name_mutex_;
122 std::atomic<int> queryCounter{1};
123 gmlc::concurrency::DelayedObjects<std::string> activeQueries;
124 std::vector<std::tuple<JsonMapBuilder, std::vector<ActionMessage>,
bool>> mapBuilders;
127 std::deque<std::pair<int32_t, decltype(std::chrono::steady_clock::now())>> queryTimeouts;
129 std::vector<ActionMessage> earlyMessages;
130 gmlc::concurrency::TriggerVariable disconnection;
131 std::unique_ptr<TimeoutMonitor>
133 std::atomic<uint16_t> nextAirLock{0};
134 std::array<gmlc::containers::AirLock<stx::any>, 3>
140 virtual void processCommand(ActionMessage&& command)
override;
146 void processPriorityCommand(ActionMessage&& command)
override;
149 void processBrokerConfigureCommands(ActionMessage& cmd);
151 gmlc::containers::SimpleQueue<ActionMessage>
155 void transmitDelayedMessages();
159 void routeMessage(ActionMessage& cmd, global_federate_id dest);
160 void routeMessage(ActionMessage&& cmd, global_federate_id dest);
163 void routeMessage(
const ActionMessage& cmd);
164 void routeMessage(ActionMessage&& cmd);
166 void transmitToParent(ActionMessage&& cmd);
168 void propagateError(ActionMessage&& cmd);
170 void broadcast(ActionMessage& cmd);
172 route_id fillMessageRouteInformation(ActionMessage& mess);
175 void executeInitializationOperations();
177 uint16_t getNextAirlockIndex();
180 bool verifyBrokerKey(ActionMessage& mess)
const;
183 bool verifyBrokerKey(
const std::string& key)
const;
188 virtual bool connect() override final;
206 virtual
bool isRoot() const override final {
return _isRoot; };
211 setLoggingCallback(
const std::function<
void(
int,
const std::string&,
const std::string&)>&
212 logFunction)
override final;
215 std::chrono::milliseconds msToWait = std::chrono::milliseconds(0)) const override final;
221 virtual
void globalError(int32_t errorCode, const std::
string& errorString) override final;
226 virtual
bool brokerConnect() = 0;
229 virtual
void brokerDisconnect() = 0;
255 virtual
void addRoute(
route_id rid,
int interfaceId, const std::
string& routeInfo) = 0;
264 explicit
CoreBroker(
bool setAsRootBroker = false) noexcept;
266 explicit
CoreBroker(const std::
string& broker_name);
270 virtual
void configure(const std::
string& configureString) override final;
288 virtual const std::string&
getAddress() const override final;
290 virtual
void setLogFile(const std::
string& lfile) override final;
292 query(const std::
string& target,
293 const std::
string& queryStr,
295 virtual
void setGlobal(const std::
string& valueName, const std::
string& value) override final;
297 virtual
void dataLink(const std::
string& publication, const std::
string& input) override final;
300 const std::
string& endpoint) override final;
303 const std::
string& endpoint) override final;
309 int getCountableFederates() const;
311 void checkDependencies();
338 void checkQueryTimeouts();
344 std::
string generateQueryAnswer(const std::
string& request,
bool force_ordering);
346 std::
string getNameList(std::
string gidString) const;
352 const BasicBrokerInfo* getBrokerById(global_broker_id brokerid)
const;
354 BasicBrokerInfo* getBrokerById(global_broker_id brokerid);
356 void addLocalInfo(BasicHandleInfo& handleInfo,
const ActionMessage& m);
357 void addPublication(ActionMessage& m);
358 void addInput(ActionMessage& m);
359 void addEndpoint(ActionMessage& m);
360 void addFilter(ActionMessage& m);
364 void initializeMapBuilder(
const std::string& request,
367 bool force_ordering);
369 std::string generateGlobalStatus(JsonMapBuilder& builder);
372 void sendErrorToImmediateBrokers(
int errorCode);
374 void sendDisconnect();
376 std::string generateFederationSummary()
const;
378 void labelAsDisconnected(global_broker_id brkid);
381 void generateTimeBarrier(ActionMessage& m);
382 int generateMapObjectCounter()
const;
383 friend class TimeoutMonitor;
constexpr Time timeZero
Definition: helics-time.hpp:31
std::string generateJsonString(const Json::Value &block)
Definition: JsonProcessingFunctions.cpp:97
@ configuring
the broker is in the processing of configuring
@ terminated
the termination process has started
global_federate_id source_id
12 – for federate_id or route_id
Definition: ActionMessage.hpp:36
bool isRunning() const
Definition: BrokerBase.hpp:181
int32_t minBrokerCount
the minimum number of brokers that must connect before entering init mode
Definition: BrokerBase.hpp:47
@ errored
an error was encountered
constexpr uint16_t slow_responding_flag
overload of extra_flag4 indicating a federate, core or broker is slow responding
Definition: flagOperations.hpp:37
bool useJsonSerialization
Definition: BrokerBase.hpp:128
std::string brokerKey
Definition: BrokerBase.hpp:59
Time timeout
timeout to wait to establish a broker connection before giving up
Definition: BrokerBase.hpp:53
virtual void setLoggingCallback(const std::function< void(int, const std::string &, const std::string &)> &logFunction) override final
Definition: CoreBroker.cpp:110
virtual std::shared_ptr< helicsCLI11App > generateCLI() override
Definition: CoreBroker.cpp:1752
virtual void configureFromVector(std::vector< std::string > args) override final
Definition: CoreBroker.cpp:1737
bool enable_profiling
indicator that profiling is enabled
Definition: BrokerBase.hpp:129
virtual void setLogFile(const std::string &lfile) override final
Definition: CoreBroker.cpp:2512
virtual void addSourceFilterToEndpoint(const std::string &filter, const std::string &endpoint) override final
Definition: CoreBroker.cpp:178
constexpr identififier_base_type global_broker_id_shift
Definition: global_federate_id.hpp:22
Definition: global_federate_id.hpp:26
virtual void removeRoute(route_id rid)=0
void clearPublication(const std::string &newPublication)
Definition: UnknownHandleManager.cpp:263
bool hasTimeDependency
set to true if the broker has Time dependencies
Definition: BrokerBase.hpp:119
@ operating
normal operating conditions
base helics enumerations for C++ API's, a namespace wrapper for the definitions defined in helics_enu...
@ warning
print/log warning and errors
Definition: loggingHelper.hpp:23
Definition: global_federate_id.hpp:68
route_id route
the routing information for data to be sent to the federate
Definition: CoreBroker.hpp:55
bool _sent_disconnect_ack
indicator that the disconnect ack has been sent
Definition: CoreBroker.hpp:79
global_broker_id getGlobalId() const
Definition: BrokerBase.hpp:190
virtual void disconnect() override final
Definition: CoreBroker.cpp:1897
connection_state getAllConnectionState() const
Definition: CoreBroker.cpp:3400
@ terminating
the termination process has started
constexpr global_broker_id parent_broker_id
Definition: global_federate_id.hpp:60
std::vector< std::string > checkForLinks(const std::string &newSource) const
Definition: UnknownHandleManager.cpp:103
bool _disable_ping
indicator that the broker doesn't respond to pings
Definition: CoreBroker.hpp:80
virtual void configure(const std::string &configureString) override final
Definition: CoreBroker.cpp:1707
void setErrorState(int eCode, const std::string &estring)
Definition: BrokerBase.cpp:468
TimeRepresentation< count_time< 9 > > Time
Definition: helics-time.hpp:27
const std::string name
name of the federate
Definition: CoreBroker.hpp:53
Definition: ActionMessage.hpp:29
bool uuid_like
will be set to true if the name looks like a uuid
Definition: BrokerBase.hpp:126
Definition: CoreBroker.hpp:63
const std::string name
the name of the broker
Definition: CoreBroker.hpp:65
void processNonOptionalUnknowns(std::function< void(const std::string &name, char type, global_handle)> cfunc) const
Definition: UnknownHandleManager.cpp:200
int parseArgs(int argc, char *argv[])
Definition: BrokerBase.cpp:299
constexpr base_type baseValue() const
Definition: global_federate_id.hpp:34
std::atomic< bool > haltOperations
flag indicating that no further message should be processed
Definition: BrokerBase.hpp:73
bool hasUnknowns() const
Definition: UnknownHandleManager.cpp:134
virtual void setGlobal(const std::string &valueName, const std::string &value) override final
Definition: CoreBroker.cpp:2582
Definition: helicsCLI11.hpp:41
constexpr auto versionString
Definition: helicsVersion.hpp:16
virtual void configureBase()
Definition: BrokerBase.cpp:326
uint16_t counter
26 counter for filter tracking or message counter
Definition: ActionMessage.hpp:40
void processRequiredUnknowns(std::function< void(const std::string &name, char type, global_handle)> cfunc) const
Definition: UnknownHandleManager.cpp:230
std::unique_ptr< ForwardingTimeCoordinator > timeCoord
object managing the time control
Definition: BrokerBase.hpp:93
@ configured
the broker itself has been configured and is ready to connect
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:75
virtual void transmit(route_id route, const ActionMessage &command)=0
bool no_ping
indicator that the broker is not very responsive to ping requests
Definition: BrokerBase.hpp:125
bool _core
if set to true the broker is a core false is a broker;
Definition: CoreBroker.hpp:76
virtual const std::string & getIdentifier() const override final
Definition: CoreBroker.hpp:287
helics_sequencing_mode
Definition: helics_enums.h:333
void setIdentifier(const std::string &name)
Definition: CoreBroker.cpp:61
void unregisterBroker(const std::string &name)
Definition: BrokerFactory.cpp:295
virtual ~CoreBroker()
Definition: CoreBroker.cpp:55
Definition: core-exceptions.hpp:48
bool hasRequiredUnknowns() const
Definition: UnknownHandleManager.cpp:174
@ filter
handle to a filter
std::vector< targetInfo > checkForPublications(const std::string &newPublication) const
Definition: UnknownHandleManager.cpp:98
Definition: CoreBroker.hpp:51
virtual void setLoggingLevel(int logLevel) override final
Definition: CoreBroker.cpp:2503
virtual bool connect() override final
Definition: CoreBroker.cpp:1769
void addActionMessage(const ActionMessage &m)
Definition: BrokerBase.cpp:544
virtual std::string generateLocalAddressString() const =0
@ connecting
the connection process has started
void clearInput(const std::string &newInput)
Definition: UnknownHandleManager.cpp:257
constexpr uint16_t child_flag
overload of extra_flag4 indicating a message is from a child object
Definition: flagOperations.hpp:46
@ connected
the connection process has completed
virtual bool isRoot() const override final
Definition: CoreBroker.hpp:206
Definition: global_federate_id.hpp:171
virtual void clearTimeBarrier() override final
Definition: CoreBroker.cpp:1826
void clearEndpoint(const std::string &newEndpoint)
Definition: UnknownHandleManager.cpp:269
@ created
the broker has been created
std::string payload
Definition: ActionMessage.hpp:44
@ clone_flag
flag indicating the filter is a clone filter or the data needs to be cloned
Definition: flagOperations.hpp:25
std::vector< targetInfo > checkForFilters(const std::string &newFilter) const
Definition: UnknownHandleManager.cpp:117
virtual void setTimeBarrier(Time barrierTime) override final
Definition: CoreBroker.cpp:1815
std::shared_ptr< Broker > findBroker(const std::string &brokerName)
Definition: BrokerFactory.cpp:185
void clearActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:90
virtual void processDisconnect(bool skipUnregister=false) override final
Definition: CoreBroker.cpp:1859
std::atomic< global_broker_id > global_id
the unique identifier for the broker(core or broker)
Definition: BrokerBase.hpp:36
virtual bool isOpenToNewFederates() const override
Definition: CoreBroker.cpp:211
int32_t messageID
8 – message ID for a variety of purposes
Definition: ActionMessage.hpp:35
const friend std::string & brokerStateName(broker_state_t state)
Definition: BrokerBase.cpp:918
virtual bool isConnected() const override final
Definition: CoreBroker.cpp:1844
global_broker_id global_broker_id_local
Definition: BrokerBase.hpp:38
Json::Value loadJsonStr(const std::string &jsonString)
Definition: JsonProcessingFunctions.cpp:50
virtual std::string query(const std::string &target, const std::string &queryStr, helics_sequencing_mode mode=helics_sequencing_mode_fast) override final
Definition: CoreBroker.cpp:2518
std::size_t currentMessageCounter() const
Definition: BrokerBase.hpp:264
@ indicator_flag
flag used for setting values
Definition: flagOperations.hpp:21
Definition: BasicHandleInfo.hpp:36
virtual const std::string & getAddress() const override final
Definition: CoreBroker.cpp:70
@ use_json_serialization_flag
flag to indicate it should use the json packetization
Definition: flagOperations.hpp:22
bool allInitReady() const
Definition: CoreBroker.cpp:3427
@ fed
special logging command for message coming from a fed
Definition: loggingHelper.hpp:32
std::string routeInfo
string describing the connection information for the route
Definition: CoreBroker.hpp:82
global_broker_id parent
the id of the parent broker/core
Definition: CoreBroker.hpp:69
bool enteredExecutionMode
flag indicating that the broker has entered execution mode
Definition: BrokerBase.hpp:120
std::vector< targetInfo > checkForEndpoints(const std::string &newEndpoint) const
Definition: UnknownHandleManager.cpp:110
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:67
constexpr uint16_t cancel_flag
overload of extra_flag3 indicating an operation is canceled
Definition: flagOperations.hpp:40
global_broker_id global_id
the global identifier for the broker
Definition: CoreBroker.hpp:67
BasicHandleInfo * getEndpoint(const std::string &name)
Definition: HandleManager.cpp:204
bool terminate_on_error
flag indicating that the federation should halt on any error
Definition: BrokerBase.hpp:77
virtual bool waitForDisconnect(std::chrono::milliseconds msToWait=std::chrono::milliseconds(0)) const override final
Definition: CoreBroker.cpp:1850
std::string prettyPrintString(const ActionMessage &command)
Definition: ActionMessage.cpp:861
@ empty_flag
flag indicating the interface is nameless
Definition: flagOperations.hpp:33
constexpr uint16_t parent_flag
overload of extra_flag3 indicating the message is from a parent object
Definition: flagOperations.hpp:43
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
constexpr uint16_t non_counting_flag
overload of nameless_interface_flag indicating that a federate should not count in any totals
Definition: flagOperations.hpp:49
@ input
handle to a input interface
void setLoggingFile(const std::string &lfile)
Definition: BrokerBase.cpp:486
bool _gateway
set to true if this broker should act as a gateway.
Definition: CoreBroker.hpp:96
Time actionTime
40 the time an action took place or will take place //32
Definition: ActionMessage.hpp:43
virtual void setAsRoot() override final
Definition: CoreBroker.cpp:1761
log_level
Definition: loggingHelper.hpp:20
bool _route_key
indicator that the broker has a unique route id
Definition: CoreBroker.hpp:78
@ destination_target
indicator that the target is a destination target
Definition: flagOperations.hpp:17
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
@ error
only print errors
Definition: loggingHelper.hpp:22
@ required_flag
flag indicating that an action or match is required
Definition: flagOperations.hpp:18
virtual void addRoute(route_id rid, int interfaceId, const std::string &routeInfo)=0
@ optional_flag
flag indicating that a connection is optional and may not be matched
Definition: flagOperations.hpp:24
void clearFilter(const std::string &newFilter)
Definition: UnknownHandleManager.cpp:275
connection_state state
specify the current status of the broker
Definition: CoreBroker.hpp:71
bool _nonLocal
indicator that the broker has a subbroker as a parent.
Definition: CoreBroker.hpp:77
global_federate_id global_id
the identification code for the federate
Definition: CoreBroker.hpp:54
Time queryTimeout
Definition: BrokerBase.hpp:55
@ core_flag
flag indicating that message comes from a core vs a broker
Definition: flagOperations.hpp:19
virtual void globalError(int32_t errorCode, const std::string &errorString) override final
Definition: CoreBroker.cpp:1835
std::vector< targetInfo > checkForInputs(const std::string &newInput) const
Definition: UnknownHandleManager.cpp:92
bool _hasTimeDependency
flag indicating that a broker has general endpoints it is coordinating
Definition: CoreBroker.hpp:74
virtual void dataLink(const std::string &publication, const std::string &input) override final
Definition: CoreBroker.cpp:170
void setTickForwarding(TickForwardingReasons reason, bool value=true)
Definition: BrokerBase.cpp:838
virtual void configureFromArgs(int argc, char *argv[]) override final
Definition: CoreBroker.cpp:1722
Definition: TimeoutMonitor.h:27
void setDestination(global_handle hand)
Definition: ActionMessage.hpp:100
virtual bool sendToLogger(global_federate_id federateID, int logLevel, const std::string &name, const std::string &message) const
Definition: BrokerBase.cpp:359
route_id route
the identifier for the route to take to the broker
Definition: CoreBroker.hpp:68
void logFlush()
Definition: BrokerBase.cpp:524
Definition: CoreBroker.hpp:94
@ endpoint
handle to an endpoint
void clearFederateUnknowns(global_federate_id id)
Definition: UnknownHandleManager.cpp:282
global_broker_id higher_broker_id
the id code of the broker 1 level about this broker
Definition: BrokerBase.hpp:40
Definition: core/Broker.hpp:18
global_federate_id dest_id
20 fed_id for a targeted message
Definition: ActionMessage.hpp:38
@ helics_sequencing_mode_fast
Definition: helics_enums.h:335
gmlc::containers::BlockingPriorityQueue< ActionMessage > actionQueue
primary routing queue
Definition: BrokerBase.hpp:94
virtual void addDestinationFilterToEndpoint(const std::string &filter, const std::string &endpoint) override final
Definition: CoreBroker.cpp:186
int32_t minFederateCount
the minimum number of federates that must connect before entering init mode
Definition: BrokerBase.hpp:45
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:58
connection_state
Definition: CoreBroker.hpp:41
constexpr global_broker_id root_broker_id
Definition: global_federate_id.hpp:62
Definition: BrokerBase.hpp:34
@ next_step
indicator that the iterations have completed
bool isPriorityCommand(const ActionMessage &command) noexcept
Definition: ActionMessage.hpp:232
@ publication
handle to output interface
const std::string & state_string(operation_state state)
Definition: CommonCore.cpp:47
global_broker_id parent
the id of the parent broker/core
Definition: CoreBroker.hpp:56
constexpr identififier_base_type global_federate_id_shift
Definition: global_federate_id.hpp:20
std::string address
network location of the broker
Definition: BrokerBase.hpp:64
std::string & name
alias payload to a name reference for registration functions
Definition: ActionMessage.hpp:46
virtual void makeConnections(const std::string &file) override final
Definition: CoreBroker.cpp:161
bool hasNonOptionalUnknowns() const
Definition: UnknownHandleManager.cpp:141
bool isValidIndex(sizeType testSize, const SizedDataType &vec)
Definition: core-data.hpp:249
void unregister()
Definition: CoreBroker.cpp:1879