9 #include "../common/JsonBuilder.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
14 #include "FederateIdExtra.hpp"
15 #include "HandleManager.hpp"
16 #include "TimeDependencies.hpp"
17 #include "UnknownHandleManager.hpp"
18 #include "gmlc/concurrency/DelayedObjects.hpp"
19 #include "gmlc/concurrency/TriggerVariable.hpp"
20 #include "gmlc/containers/AirLock.hpp"
21 #include "gmlc/containers/DualStringMappedVector.hpp"
22 #include "gmlc/containers/SimpleQueue.hpp"
34 #include <unordered_map>
46 REQUEST_DISCONNECT = 48,
60 bool nonCounting{
false};
63 bool reentrant{
false};
64 explicit BasicFedInfo(std::string_view fedname): name(fedname) {}
90 class TimeCoordinator;
102 std::atomic<bool> _isRoot{
false};
104 bool connectionEstablished{
false};
105 bool initIterating{
false};
109 DualStringMappedVector<BasicFedInfo, GlobalFederateId, reference_stability::unstable>
113 DualStringMappedVector<BasicBrokerInfo, GlobalBrokerId, reference_stability::unstable>
116 std::string mPreviousLocalBrokerIdentifier;
118 HandleManager handles;
119 UnknownHandleManager unknownHandles;
121 std::vector<std::pair<std::string, GlobalFederateId>> delayedDependencies;
123 std::unordered_map<GlobalFederateId, LocalFederateId> global_id_translation;
125 std::unordered_map<GlobalFederateId, route_id> routing_table;
127 std::unordered_map<std::string, route_id> knownExternalEndpoints;
128 std::unordered_map<std::string, std::string> global_values;
129 std::unordered_map<std::string, std::int64_t> renamers;
130 std::mutex name_mutex_;
131 std::atomic<int> queryCounter{1};
132 bool force_connection{
false};
133 gmlc::concurrency::DelayedObjects<std::string> activeQueries;
135 std::vector<std::tuple<fileops::JsonMapBuilder, std::vector<ActionMessage>,
QueryReuse>>
138 std::deque<std::pair<int32_t, decltype(std::chrono::steady_clock::now())>> queryTimeouts;
140 std::vector<ActionMessage> earlyMessages;
141 gmlc::concurrency::TriggerVariable disconnection;
143 std::unique_ptr<TimeoutMonitor> timeoutMon;
144 std::atomic<uint16_t> nextAirLock{0};
146 std::array<gmlc::containers::AirLock<std::any>, 3> dataAirlocks;
149 mTimeMonitorFederate;
150 GlobalFederateId mTimeMonitorFederateId{};
152 GlobalFederateId mTimeMonitorLocalFederateId{};
154 Time mTimeMonitorLastLogTime{Time::minVal()};
155 Time mTimeMonitorCurrentTime{Time::minVal()};
156 std::atomic<double> simTime{mInvalidSimulationTime};
157 Time mNextTimeBarrier{Time::maxVal()};
162 virtual void processCommand(ActionMessage&& command)
override;
168 void processPriorityCommand(ActionMessage&& command)
override;
171 void processBrokerConfigureCommands(ActionMessage& cmd);
173 gmlc::containers::SimpleQueue<ActionMessage> delayTransmitQueue;
175 void transmitDelayedMessages();
179 void routeMessage(ActionMessage& cmd, GlobalFederateId dest);
180 void routeMessage(ActionMessage&& cmd, GlobalFederateId dest);
183 void routeMessage(
const ActionMessage& cmd);
184 void routeMessage(ActionMessage&& cmd);
186 void transmitToParent(ActionMessage&& cmd);
188 void propagateError(ActionMessage&& cmd);
190 void broadcast(ActionMessage& cmd);
192 route_id fillMessageRouteInformation(ActionMessage& mess);
195 void executeInitializationOperations(
bool iterating);
197 uint16_t getNextAirlockIndex();
200 bool verifyBrokerKey(ActionMessage& mess)
const;
203 bool verifyBrokerKey(std::string_view key)
const;
208 virtual bool connect() override final;
226 virtual
bool isRoot() const override final {
return _isRoot; };
231 std::function<
void(
int, std::string_view, std::string_view)> logFunction)
override final;
234 std::chrono::milliseconds msToWait = std::chrono::milliseconds(0)) const override final;
240 virtual
void globalError(int32_t errorCode, std::string_view errorString) override final;
245 virtual
bool brokerConnect() = 0;
248 virtual
void brokerDisconnect() = 0;
283 explicit
CoreBroker(
bool setAsRootBroker = false) noexcept;
285 explicit
CoreBroker(std::string_view broker_name);
289 virtual
void configure(std::string_view configureString) override final;
307 virtual const std::string&
getAddress() const override final;
309 virtual
void setLogFile(std::string_view lfile) override final;
311 query(std::string_view target,
312 std::string_view queryStr,
314 virtual
void setGlobal(std::string_view valueName, std::string_view value) override final;
316 std::string_view commandStr,
319 virtual
void linkEndpoints(std::string_view source, std::string_view target) override final;
320 virtual
void dataLink(std::string_view publication, std::string_view input) override final;
323 std::string_view endpoint) override final;
326 std::string_view endpoint) override final;
327 virtual
void addAlias(std::string_view interfaceKey, std::string_view alias) override final;
335 int getCountableFederates() const;
337 void checkDependencies();
339 void connectInterfaces(
341 uint32_t originFlags,
343 uint32_t targetFlags,
344 std::pair<action_message_def::action_t, action_message_def::action_t> actions);
347 void findAndNotifyInputTargets(
BasicHandleInfo& handleInfo, const std::
string& key);
348 void findAndNotifyPublicationTargets(
BasicHandleInfo& handleInfo, const std::
string& key);
350 void findAndNotifyFilterTargets(
BasicHandleInfo& handleInfo, const std::
string& key);
351 void findAndNotifyEndpointTargets(
BasicHandleInfo& handleInfo, const std::
string& key);
353 void findRegexMatch(const std::
string& target,
382 void checkQueryTimeouts();
388 std::
string generateQueryAnswer(std::string_view request,
bool force_ordering);
390 std::
string quickBrokerQueries(std::string_view request) const;
396 std::
string getNameList(std::string_view gidString) const;
402 const BasicBrokerInfo* getBrokerById(GlobalBrokerId brokerid)
const;
404 BasicBrokerInfo* getBrokerById(GlobalBrokerId brokerid);
406 void addLocalInfo(BasicHandleInfo& handleInfo,
const ActionMessage& message);
407 void addPublication(ActionMessage& message);
408 void addInput(ActionMessage& message);
409 void addEndpoint(ActionMessage& message);
410 void addFilter(ActionMessage& message);
411 void addTranslator(ActionMessage& message);
412 void addDataSink(ActionMessage& message);
414 bool checkInterfaceCreation(ActionMessage& message,
InterfaceType type);
416 void brokerRegistration(ActionMessage&& command);
420 void sendBrokerErrorAck(ActionMessage& command, std::int32_t errorCode);
422 void linkInterfaces(ActionMessage& command);
424 void fedRegistration(ActionMessage&& command);
428 void sendFedErrorAck(ActionMessage& command, std::int32_t errorCode);
431 void initializeMapBuilder(std::string_view request,
434 bool force_ordering);
436 std::string generateGlobalStatus(fileops::JsonMapBuilder& builder);
438 void sendErrorToImmediateBrokers(
int errorCode);
442 std::string generateFederationSummary()
const;
444 void labelAsDisconnected(GlobalBrokerId brkid);
447 void processTimeMonitorMessage(ActionMessage& message);
449 void loadTimeMonitor(
bool firstLoad, std::string_view newFederate);
451 void generateTimeBarrier(ActionMessage& message);
452 int generateMapObjectCounter()
const;
454 std::string generateRename(std::string_view name);
455 friend class TimeoutMonitor;
Definition: ActionMessage.hpp:30
Definition: CoreBroker.hpp:68
bool _core
if set to true the broker is a core, false is a broker
Definition: CoreBroker.hpp:79
bool _sent_disconnect_ack
indicator that the disconnect ack has been sent
Definition: CoreBroker.hpp:82
bool _hasTimeDependency
Definition: CoreBroker.hpp:78
const std::string name
the name of the broker
Definition: CoreBroker.hpp:70
bool initIterating
indicator that initIteration was requested
Definition: CoreBroker.hpp:85
bool _observer
indicator that the broker is an observer
Definition: CoreBroker.hpp:84
GlobalBrokerId global_id
the global identifier for the broker
Definition: CoreBroker.hpp:72
bool _route_key
indicator that the broker has a unique route id
Definition: CoreBroker.hpp:81
std::string routeInfo
string describing the connection information for the route
Definition: CoreBroker.hpp:86
ConnectionState state
specify the current status of the broker
Definition: CoreBroker.hpp:76
bool _disable_ping
indicator that the broker doesn't respond to pings
Definition: CoreBroker.hpp:83
bool _nonLocal
indicator that the broker has a subbroker as a parent.
Definition: CoreBroker.hpp:80
GlobalBrokerId parent
Definition: CoreBroker.hpp:74
route_id route
the identifier for the route to take to the broker
Definition: CoreBroker.hpp:73
Definition: BasicHandleInfo.hpp:20
Definition: BrokerBase.hpp:42
std::string identifier
an identifier for the broker
Definition: BrokerBase.hpp:70
Definition: core/Broker.hpp:18
Definition: CoreBroker.hpp:98
void setIdentifier(std::string_view name)
Definition: CoreBroker.cpp:68
virtual void globalError(int32_t errorCode, std::string_view errorString) override final
Definition: CoreBroker.cpp:2444
virtual void dataLink(std::string_view publication, std::string_view input) override final
Definition: CoreBroker.cpp:185
virtual void addSourceFilterToEndpoint(std::string_view filter, std::string_view endpoint) override final
Definition: CoreBroker.cpp:193
virtual void linkEndpoints(std::string_view source, std::string_view target) override final
Definition: CoreBroker.cpp:177
virtual void setGlobal(std::string_view valueName, std::string_view value) override final
Definition: CoreBroker.cpp:3545
virtual void addAlias(std::string_view interfaceKey, std::string_view alias) override final
Definition: CoreBroker.cpp:210
virtual void makeConnections(const std::string &file) override final
Definition: CoreBroker.cpp:168
virtual void setLogFile(std::string_view lfile) override final
Definition: CoreBroker.cpp:3448
virtual void processDisconnect(bool skipUnregister=false) override final
Definition: CoreBroker.cpp:2468
virtual bool isRoot() const override final
Definition: CoreBroker.hpp:226
virtual void addRoute(route_id rid, int interfaceId, std::string_view routeInfo)=0
virtual bool waitForDisconnect(std::chrono::milliseconds msToWait=std::chrono::milliseconds(0)) const override final
Definition: CoreBroker.cpp:2459
void unregister()
Definition: CoreBroker.cpp:2487
virtual void setTimeBarrier(Time barrierTime) override final
Definition: CoreBroker.cpp:2424
virtual const std::string & getAddress() const override final
Definition: CoreBroker.cpp:77
virtual void configureFromVector(std::vector< std::string > args) override final
Definition: CoreBroker.cpp:2332
virtual std::shared_ptr< helicsCLI11App > generateCLI() override
Definition: CoreBroker.cpp:2352
virtual void transmit(route_id route, const ActionMessage &command)=0
virtual void disconnect() override final
Definition: CoreBroker.cpp:2505
virtual bool connect() override final
Definition: CoreBroker.cpp:2377
virtual void configure(std::string_view configureString) override final
Definition: CoreBroker.cpp:2302
virtual void configureFromArgs(int argc, char *argv[]) override final
Definition: CoreBroker.cpp:2317
virtual const std::string & getIdentifier() const override final
Definition: CoreBroker.hpp:306
bool allInitReady() const
Definition: CoreBroker.cpp:4657
virtual void clearTimeBarrier() override final
Definition: CoreBroker.cpp:2435
virtual bool isConnected() const override final
Definition: CoreBroker.cpp:2453
virtual void setLoggingLevel(int logLevel) override final
Definition: CoreBroker.cpp:3439
virtual void addDestinationFilterToEndpoint(std::string_view filter, std::string_view endpoint) override final
Definition: CoreBroker.cpp:201
bool _gateway
set to true if this broker should act as a gateway.
Definition: CoreBroker.hpp:100
virtual void setAsRoot() override final
Definition: CoreBroker.cpp:2369
virtual void removeRoute(route_id rid)=0
virtual void setLoggingCallback(std::function< void(int, std::string_view, std::string_view)> logFunction) override final
Definition: CoreBroker.cpp:117
virtual void sendCommand(std::string_view target, std::string_view commandStr, HelicsSequencingModes mode) override final
Definition: CoreBroker.cpp:3554
virtual double getSimulationTime() const override
Definition: CoreBroker.cpp:2347
ConnectionState getAllConnectionState() const
Definition: CoreBroker.cpp:4630
virtual std::string query(std::string_view target, std::string_view queryStr, HelicsSequencingModes mode=HELICS_SEQUENCING_MODE_FAST) override final
Definition: CoreBroker.cpp:3458
virtual bool isOpenToNewFederates() const override
Definition: CoreBroker.cpp:233
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: GlobalFederateId.hpp:147
Definition: helicsCLI11.hpp:46
Definition: GlobalFederateId.hpp:187
HelicsSequencingModes
Definition: helics_enums.h:425
@ HELICS_SEQUENCING_MODE_FAST
Definition: helics_enums.h:427
action_t
Definition: ActionMessageDefintions.hpp:20
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
constexpr Time timeZero
Definition: helicsTime.hpp:31
ConnectionState
Definition: CoreBroker.hpp:41
InterfaceType
Definition: CoreTypes.hpp:111
QueryReuse
Enumeration of if query result is reusable.
Definition: queryHelpers.hpp:39
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
forward declaration of QueryReuse
Definition: CoreBroker.hpp:54
const std::string name
name of the federate
Definition: CoreBroker.hpp:55
GlobalBrokerId parent
the id of the parent broker/core
Definition: CoreBroker.hpp:58
route_id route
the routing information for data to be sent to the federate
Definition: CoreBroker.hpp:57
GlobalFederateId global_id
the identification code for the federate
Definition: CoreBroker.hpp:56