helics  3.6.1
FederateState.hpp
1 /*
2 Copyright (c) 2017-2025,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/GuardedTypes.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
12 #include "CoreTypes.hpp"
13 #include "InterfaceInfo.hpp"
14 #include "core-data.hpp"
15 #include "gmlc/containers/BlockingQueue.hpp"
16 #include "helicsTime.hpp"
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <deque>
22 #include <map>
23 #include <memory>
24 #include <optional>
25 #include <string>
26 #include <thread>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30 
31 namespace helics {
32 class SubscriptionInfo;
33 class PublicationInfo;
34 class EndpointInfo;
35 class FilterInfo;
36 class CommonCore;
37 class CoreFederateInfo;
38 
39 class TimeCoordinator;
40 class MessageTimer;
41 class LogManager;
42 
43 constexpr Time startupTime = Time::minVal();
44 constexpr Time initialTime{-1000000.0};
45 
47 enum class TimeSynchronizationMethod : uint8_t { DISTRIBUTED = 0, GLOBAL = 1, ASYNC = 2 };
48 
51  public:
53  FederateState(const std::string& fedName, const CoreFederateInfo& fedInfo);
54  // the destructor is defined so some classes linked with unique ptrs don't have to be defined in
55  // the header
57  FederateState(const FederateState&) = delete;
58  FederateState& operator=(const FederateState&) = delete;
61 
62  private:
63  const std::string name;
65  std::unique_ptr<TimeCoordinator> timeCoord;
66 
67  public:
69  std::atomic<GlobalFederateId> global_id;
70  private:
72  std::atomic<FederateStates> state{FederateStates::CREATED};
73  bool only_transmit_on_change{false};
75  bool realtime{false};
76  bool observer{false};
77  bool reentrant{false};
78  bool mSourceOnly{false};
79  bool mCallbackBased{false};
81  bool strict_input_type_checking{false};
82  bool ignore_unit_mismatch{false};
84  bool mSlowResponding{false};
86  bool mAllowRemoteControl{true};
87  InterfaceInfo interfaceInformation;
88  std::unique_ptr<LogManager> mLogManager;
89  int maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
90 
91  public:
92  std::atomic<bool> init_transmitted{false};
94  int indexGroup{0};
95 
96  private:
98  bool wait_for_current_time{false};
100  bool ignore_time_mismatch_warnings{false};
102  bool mProfilerActive{false};
105  bool mLocalProfileCapture{false};
106  int errorCode{0};
107  CommonCore* mParent{nullptr};
108  std::string errorString;
110  decltype(std::chrono::steady_clock::now()) start_clock_time;
111  Time rt_lag{timeZero};
112  Time rt_lead{timeZero};
113  Time grantTimeOutPeriod{timeZero};
114  std::int32_t realTimeTimerIndex{-1};
115  std::int32_t grantTimeoutTimeIndex{-1};
116  public:
118  std::atomic<bool> initRequested{false};
119  // temporary
120  std::atomic<bool> requestingMode{false};
121 
122  std::atomic<bool> initIterating{false};
123 
124  private:
125  bool iterating{false};
126  bool timeGranted_mode{false};
128  bool terminate_on_error{false};
132  TimeSynchronizationMethod timeMethod{TimeSynchronizationMethod::DISTRIBUTED};
134  std::uint32_t mGrantCount{0}; // this is intended to allow wrapping
136  std::shared_ptr<MessageTimer> mTimer;
138  gmlc::containers::BlockingQueue<ActionMessage> queue;
140  gmlc::containers::BlockingQueue<std::pair<std::string, std::string>> commandQueue;
142  std::atomic<uint16_t> interfaceFlags{0};
144  std::map<GlobalFederateId, std::deque<ActionMessage>> delayQueues;
145  std::vector<InterfaceHandle> events;
146  std::vector<InterfaceHandle> eventMessages;
147  std::vector<GlobalFederateId> delayedFederates;
148  Time time_granted{startupTime};
149  Time allowed_send_time{startupTime};
150  Time minimumReceiveTime{startupTime};
151 
152 #if __cplusplus >= 201703L
153  mutable std::atomic_flag processing{};
154 #else
155  mutable std::atomic_flag processing = ATOMIC_FLAG_INIT;
156 #endif
157 
159  std::vector<std::function<std::string(std::string_view)>> queryCallbacks;
160  std::shared_ptr<FederateOperator> fedCallbacks;
161  std::vector<std::pair<std::string, std::string>> tags;
162  std::atomic<bool> queueProcessing{false};
164  Time nextValueTime() const;
166  Time nextMessageTime() const;
167 
169  void setState(FederateStates newState);
170 
172  bool messageShouldBeDelayed(const ActionMessage& cmd) const noexcept;
174  void addFederateToDelay(GlobalFederateId gid);
176  void generateConfig(nlohmann::json& base) const;
177 
178  public:
180  void reset(const CoreFederateInfo& fedInfo);
182  const std::string& getIdentifier() const { return name; }
184  FederateStates getState() const;
186  InterfaceInfo& interfaces() { return interfaceInformation; }
188  const InterfaceInfo& interfaces() const { return interfaceInformation; }
189 
191  uint64_t getQueueSize(InterfaceHandle hid) const;
194  uint64_t getQueueSize() const;
198  int32_t getCurrentIteration() const;
202  std::unique_ptr<Message> receive(InterfaceHandle hid);
205  std::unique_ptr<Message> receiveAny(InterfaceHandle& hid);
209  const std::shared_ptr<const SmallBuffer>& getValue(InterfaceHandle handle,
210  uint32_t* inputIndex);
211 
216  const std::vector<std::shared_ptr<const SmallBuffer>>& getAllValues(InterfaceHandle handle);
217 
219  std::pair<SmallBuffer, Time> getPublishedValue(InterfaceHandle handle);
221  void setParent(CommonCore* coreObject) { mParent = coreObject; }
226  void setProperties(const ActionMessage& cmd);
228  void setInterfaceProperty(const ActionMessage& cmd);
230  void setProperty(int timeProperty, Time propertyVal);
232  void setProperty(int intProperty, int propertyVal);
234  void setOptionFlag(int optionFlag, bool value);
236  Time getTimeProperty(int timeProperty) const;
238  bool getOptionFlag(int optionFlag) const;
240  int32_t getHandleOption(InterfaceHandle handle, char iType, int32_t option) const;
242  uint16_t getInterfaceFlags() const { return interfaceFlags.load(); }
244  int getIntegerProperty(int intProperty) const;
246  int publicationCount() const;
248  int endpointCount() const;
250  int inputCount() const;
252  void spinlock() const
253  {
254  while (processing.test_and_set()) {
255  ; // spin
256  }
257  }
259  void sleeplock() const
260  {
261  if (!processing.test_and_set()) {
262  return;
263  }
264  // spin for 10000 tries
265  for (int ii = 0; ii < 10000; ++ii) {
266  if (!processing.test_and_set()) {
267  return;
268  }
269  }
270  while (processing.test_and_set()) {
271  std::this_thread::yield();
272  }
273  }
275  void lock() { sleeplock(); }
276 
278  bool try_lock() const { return !processing.test_and_set(); }
280  void unlock() const { processing.clear(); }
282  int loggingLevel() const;
283 
285  void setTag(std::string_view tag, std::string_view value);
287  const std::string& getTag(std::string_view tag) const;
289  const std::pair<std::string, std::string>& getTagByIndex(size_t index) const
290  {
291  return tags[index];
292  }
294  auto tagCount() const { return tags.size(); }
296  bool isCallbackFederate() const { return mCallbackBased; }
297 
298  private:
308  MessageProcessingResult processQueue() noexcept;
309 
319  MessageProcessingResult processDelayQueue() noexcept;
320 
321  std::optional<MessageProcessingResult>
322  checkProcResult(std::tuple<FederateStates, MessageProcessingResult, bool>& proc_result,
323  ActionMessage& cmd);
327  MessageProcessingResult processActionMessage(ActionMessage& cmd);
328 
331  void processDataConnectionMessage(ActionMessage& cmd);
332 
335  void processDataMessage(ActionMessage& cmd);
337  void timeoutCheck(ActionMessage& cmd);
339  void processLoggingMessage(ActionMessage& cmd);
343  void fillEventVectorUpTo(Time currentTime);
347  void fillEventVectorInclusive(Time currentTime);
351  void fillEventVectorNextIteration(Time currentTime);
353  void addDependency(GlobalFederateId fedToDependOn);
355  void addDependent(GlobalFederateId fedThatDependsOnThis);
357  void resetDependency(GlobalFederateId gid);
358 
360  int checkInterfaces();
362  std::string processQueryActual(std::string_view query) const;
366  void generateProfilingMessage(bool enterHelicsCode);
368  void generateProfilingMarker();
370  void updateMaxLogLevel();
371 
373  void callbackProcessing() noexcept;
374  void callbackReturnResult(FederateStates lastState,
376  FederateStates newState) noexcept;
377  void initCallbackProcessing();
378  void execCallbackProcessing(IterationResult result);
380  void updateDataForExecEntry(MessageProcessingResult result, IterationRequest iterate);
382  void updateDataForTimeReturn(MessageProcessingResult result,
383  Time nextTime,
384  IterationRequest iterate);
385 
386  public:
388  Time grantedTime() const { return time_granted; }
390  Time nextAllowedSendTime() const { return allowed_send_time; }
393  const std::vector<InterfaceHandle>& getEvents() const;
396  std::vector<GlobalFederateId> getDependencies() const;
399  std::vector<GlobalFederateId> getDependents() const;
401  const std::string& lastErrorString() const { return errorString; }
403  int lastErrorCode() const noexcept { return errorCode; }
405  void setCoreObject(CommonCore* parent);
406  // the next 5 functions are the processing functions that actually process the queue
422  iteration_time enterExecutingMode(IterationRequest iterate, bool sendRequest = false);
430  iteration_time requestTime(Time nextTime, IterationRequest iterate, bool sendRequest = false);
434  std::vector<GlobalHandle> getSubscribers(InterfaceHandle handle);
435 
439  std::vector<std::pair<GlobalHandle, std::string_view>>
441 
448  void finalize();
450  void processCommunications(std::chrono::milliseconds period);
452  void addAction(const ActionMessage& action);
454  void addAction(ActionMessage&& action);
456  std::optional<ActionMessage> processPostTerminationAction(const ActionMessage& action);
457 
459  void forceProcessMessage(ActionMessage& action);
460 
468  void logMessage(int level,
469  std::string_view logMessageSource,
470  std::string_view message,
471  bool fromRemote = false) const;
472 
477  void setLogger(std::function<void(int, std::string_view, std::string_view)> logFunction);
478 
481  void setCallbackOperator(std::shared_ptr<FederateOperator> fed)
482  {
483  fedCallbacks = std::move(fed);
484  }
485 
489  void setQueryCallback(std::function<std::string(std::string_view)> queryCallbackFunction,
490  int order)
491  {
492  order = std::clamp(order, 1, 10);
493 
494  if (static_cast<int>(queryCallbacks.size()) < order) {
495  queryCallbacks.resize(order);
496  }
497  queryCallbacks[order - 1] = std::move(queryCallbackFunction);
498  }
504  std::string processQuery(std::string_view query, bool force_ordering = false) const;
512  bool checkAndSetValue(InterfaceHandle pub_id, const char* data, uint64_t len);
513 
515  void routeMessage(const ActionMessage& msg);
516 
518  void routeMessage(ActionMessage&& msg);
520  void createInterface(InterfaceType htype,
521  InterfaceHandle handle,
522  std::string_view key,
523  std::string_view type,
524  std::string_view units,
525  uint16_t flags);
527  void closeInterface(InterfaceHandle handle, InterfaceType type);
529  void sendCommand(ActionMessage& command);
530 
532  std::pair<std::string, std::string> getCommand();
534  std::pair<std::string, std::string> waitCommand();
535 };
536 
537 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: CommonCore.hpp:74
Definition: CoreFederateInfo.hpp:16
Definition: FederateState.hpp:50
void setQueryCallback(std::function< std::string(std::string_view)> queryCallbackFunction, int order)
Definition: FederateState.hpp:489
const std::string & getTag(std::string_view tag) const
Definition: FederateState.cpp:2880
void setInterfaceProperty(const ActionMessage &cmd)
Definition: FederateState.cpp:1969
std::atomic< bool > init_transmitted
Definition: FederateState.hpp:92
std::pair< SmallBuffer, Time > getPublishedValue(InterfaceHandle handle)
Definition: FederateState.cpp:344
void routeMessage(const ActionMessage &msg)
Definition: FederateState.cpp:353
void spinlock() const
Definition: FederateState.hpp:252
auto tagCount() const
Definition: FederateState.hpp:294
std::atomic< bool > initRequested
Definition: FederateState.hpp:118
void reset(const CoreFederateInfo &fedInfo)
Definition: FederateState.cpp:160
const std::pair< std::string, std::string > & getTagByIndex(size_t index) const
Definition: FederateState.hpp:289
std::optional< ActionMessage > processPostTerminationAction(const ActionMessage &action)
Definition: FederateState.cpp:481
const std::string & getIdentifier() const
Definition: FederateState.hpp:182
const std::vector< std::shared_ptr< const SmallBuffer > > & getAllValues(InterfaceHandle handle)
Definition: FederateState.cpp:339
bool isCallbackFederate() const
Definition: FederateState.hpp:296
void setCoreObject(CommonCore *parent)
Definition: FederateState.cpp:2420
int inputCount() const
Definition: FederateState.cpp:2340
std::vector< GlobalFederateId > getDependents() const
Definition: FederateState.cpp:2350
std::pair< std::string, std::string > getCommand()
Definition: FederateState.cpp:2604
void forceProcessMessage(ActionMessage &action)
Definition: FederateState.cpp:494
iteration_time enterExecutingMode(IterationRequest iterate, bool sendRequest=false)
Definition: FederateState.cpp:571
std::unique_ptr< Message > receive(InterfaceHandle hid)
Definition: FederateState.cpp:296
InterfaceInfo & interfaces()
Definition: FederateState.hpp:186
LocalFederateId local_id
id code for the local federate descriptor
Definition: FederateState.hpp:68
void closeInterface(InterfaceHandle handle, InterfaceType type)
Definition: FederateState.cpp:439
void setProperties(const ActionMessage &cmd)
Definition: FederateState.cpp:1928
Time nextAllowedSendTime() const
Definition: FederateState.hpp:390
uint16_t getInterfaceFlags() const
Definition: FederateState.hpp:242
MessageProcessingResult genericUnspecifiedQueueProcess(bool busyReturn)
Definition: FederateState.cpp:894
void unlock() const
Definition: FederateState.hpp:280
std::vector< GlobalFederateId > getDependencies() const
Definition: FederateState.cpp:2345
int publicationCount() const
Definition: FederateState.cpp:2330
void setOptionFlag(int optionFlag, bool value)
Definition: FederateState.cpp:2116
void addAction(const ActionMessage &action)
Definition: FederateState.cpp:383
void finalize()
Definition: FederateState.cpp:933
std::vector< std::pair< GlobalHandle, std::string_view > > getMessageDestinations(InterfaceHandle handle)
Definition: FederateState.cpp:685
IterationResult waitSetup()
Definition: FederateState.cpp:504
uint64_t getQueueSize() const
Definition: FederateState.cpp:281
int indexGroup
storage for index group location (this only matters on construction so can be public)
Definition: FederateState.hpp:94
bool getOptionFlag(int optionFlag) const
Definition: FederateState.cpp:2251
void logMessage(int level, std::string_view logMessageSource, std::string_view message, bool fromRemote=false) const
Definition: FederateState.cpp:2427
const std::vector< InterfaceHandle > & getEvents() const
Definition: FederateState.cpp:995
std::pair< std::string, std::string > waitCommand()
Definition: FederateState.cpp:2619
std::unique_ptr< Message > receiveAny(InterfaceHandle &hid)
Definition: FederateState.cpp:305
FederateState(const FederateState &)=delete
FederateStates getState() const
Definition: FederateState.cpp:235
iteration_time requestTime(Time nextTime, IterationRequest iterate, bool sendRequest=false)
Definition: FederateState.cpp:694
int lastErrorCode() const noexcept
Definition: FederateState.hpp:403
const std::string & lastErrorString() const
Definition: FederateState.hpp:401
void sleeplock() const
Definition: FederateState.hpp:259
int32_t getCurrentIteration() const
Definition: FederateState.cpp:240
void setCallbackOperator(std::shared_ptr< FederateOperator > fed)
Definition: FederateState.hpp:481
const std::shared_ptr< const SmallBuffer > & getValue(InterfaceHandle handle, uint32_t *inputIndex)
Definition: FederateState.cpp:332
void createInterface(InterfaceType htype, InterfaceHandle handle, std::string_view key, std::string_view type, std::string_view units, uint16_t flags)
Definition: FederateState.cpp:403
void sendCommand(ActionMessage &command)
Definition: FederateState.cpp:2485
void setParent(CommonCore *coreObject)
Definition: FederateState.hpp:221
void setProperty(int timeProperty, Time propertyVal)
Definition: FederateState.cpp:2035
IterationResult enterInitializingMode(IterationRequest request)
Definition: FederateState.cpp:533
void processCommunications(std::chrono::milliseconds period)
Definition: FederateState.cpp:969
int getIntegerProperty(int intProperty) const
Definition: FederateState.cpp:2312
void lock()
Definition: FederateState.hpp:275
int loggingLevel() const
Definition: FederateState.cpp:2861
std::vector< GlobalHandle > getSubscribers(InterfaceHandle handle)
Definition: FederateState.cpp:671
int32_t getHandleOption(InterfaceHandle handle, char iType, int32_t option) const
Definition: FederateState.cpp:2296
Time getTimeProperty(int timeProperty) const
Definition: FederateState.cpp:2236
bool try_lock() const
Definition: FederateState.hpp:278
const InterfaceInfo & interfaces() const
Definition: FederateState.hpp:188
void setLogger(std::function< void(int, std::string_view, std::string_view)> logFunction)
Definition: FederateState.cpp:290
Time grantedTime() const
Definition: FederateState.hpp:388
FederateState(const std::string &fedName, const CoreFederateInfo &fedInfo)
Definition: FederateState.cpp:110
bool checkAndSetValue(InterfaceHandle pub_id, const char *data, uint64_t len)
Definition: FederateState.cpp:245
std::string processQuery(std::string_view query, bool force_ordering=false) const
Definition: FederateState.cpp:2838
void setTag(std::string_view tag, std::string_view value)
Definition: FederateState.cpp:2866
std::atomic< GlobalFederateId > global_id
global id code, default to invalid
Definition: FederateState.hpp:69
int endpointCount() const
Definition: FederateState.cpp:2335
Definition: GlobalFederateId.hpp:75
Definition: LocalFederateId.hpp:65
Definition: InterfaceInfo.hpp:26
Definition: LocalFederateId.hpp:22
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:206
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
IterationResult
Definition: CoreTypes.hpp:94
FederateStates
Definition: CoreTypes.hpp:21
MessageProcessingResult
Definition: CoreTypes.hpp:75
constexpr Time timeZero
Definition: helicsTime.hpp:31
TimeSynchronizationMethod
enumeration of possible time coordination methods
Definition: FederateState.hpp:47
IterationRequest
Definition: CoreTypes.hpp:103
@ NO_ITERATIONS
indicator that the iterations have completed
InterfaceType
Definition: CoreTypes.hpp:112
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
Definition: helicsTime.hpp:43