helics  3.5.2
FederateState.hpp
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/GuardedTypes.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
12 #include "CoreTypes.hpp"
13 #include "InterfaceInfo.hpp"
14 #include "core-data.hpp"
15 #include "gmlc/containers/BlockingQueue.hpp"
16 #include "helicsTime.hpp"
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <deque>
22 #include <map>
23 #include <memory>
24 #include <optional>
25 #include <string>
26 #include <thread>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30 
31 namespace helics {
32 class SubscriptionInfo;
33 class PublicationInfo;
34 class EndpointInfo;
35 class FilterInfo;
36 class CommonCore;
37 class CoreFederateInfo;
38 
39 class TimeCoordinator;
40 class MessageTimer;
41 class LogManager;
42 
43 constexpr Time startupTime = Time::minVal();
44 constexpr Time initialTime{-1000000.0};
45 
47 enum class TimeSynchronizationMethod : uint8_t { DISTRIBUTED = 0, GLOBAL = 1, ASYNC = 2 };
48 
51  public:
53  FederateState(const std::string& fedName, const CoreFederateInfo& fedInfo);
54  // the destructor is defined so some classes linked with unique ptrs don't have to be defined in
55  // the header
57  FederateState(const FederateState&) = delete;
58  FederateState& operator=(const FederateState&) = delete;
61 
62  private:
63  const std::string name;
65  std::unique_ptr<TimeCoordinator> timeCoord;
66 
67  public:
69  std::atomic<GlobalFederateId> global_id;
70  private:
72  std::atomic<FederateStates> state{FederateStates::CREATED};
73  bool only_transmit_on_change{false};
75  bool realtime{false};
76  bool observer{false};
77  bool reentrant{false};
78  bool mSourceOnly{false};
79  bool mCallbackBased{false};
81  bool strict_input_type_checking{false};
82  bool ignore_unit_mismatch{false};
84  bool mSlowResponding{false};
86  bool mAllowRemoteControl{true};
87  InterfaceInfo interfaceInformation;
88  std::unique_ptr<LogManager> mLogManager;
89  int maxLogLevel{HELICS_LOG_LEVEL_NO_PRINT};
90 
91  public:
92  std::atomic<bool> init_transmitted{false};
94  int indexGroup{0};
95 
96  private:
98  bool wait_for_current_time{false};
100  bool ignore_time_mismatch_warnings{false};
102  bool mProfilerActive{false};
105  bool mLocalProfileCapture{false};
106  int errorCode{0};
107  CommonCore* mParent{nullptr};
108  std::string errorString;
110  decltype(std::chrono::steady_clock::now()) start_clock_time;
111  Time rt_lag{timeZero};
112  Time rt_lead{timeZero};
113  Time grantTimeOutPeriod{timeZero};
114  std::int32_t realTimeTimerIndex{-1};
115  std::int32_t grantTimeoutTimeIndex{-1};
116  public:
118  std::atomic<bool> initRequested{false};
119  // temporary
120  std::atomic<bool> requestingMode{false};
121 
122  std::atomic<bool> initIterating{false};
123 
124  private:
125  bool iterating{false};
126  bool timeGranted_mode{false};
128  bool terminate_on_error{false};
132  TimeSynchronizationMethod timeMethod{TimeSynchronizationMethod::DISTRIBUTED};
134  std::uint32_t mGrantCount{0}; // this is intended to allow wrapping
136  std::shared_ptr<MessageTimer> mTimer;
138  gmlc::containers::BlockingQueue<ActionMessage> queue;
140  gmlc::containers::BlockingQueue<std::pair<std::string, std::string>> commandQueue;
142  std::atomic<uint16_t> interfaceFlags{0};
144  std::map<GlobalFederateId, std::deque<ActionMessage>> delayQueues;
145  std::vector<InterfaceHandle> events;
146  std::vector<InterfaceHandle> eventMessages;
147  std::vector<GlobalFederateId> delayedFederates;
148  Time time_granted{startupTime};
149  Time allowed_send_time{startupTime};
150  Time minimumReceiveTime{startupTime};
151  mutable std::atomic_flag processing = ATOMIC_FLAG_INIT;
152 
154  std::vector<std::function<std::string(std::string_view)>> queryCallbacks;
155  std::shared_ptr<FederateOperator> fedCallbacks;
156  std::vector<std::pair<std::string, std::string>> tags;
157  std::atomic<bool> queueProcessing{false};
159  Time nextValueTime() const;
161  Time nextMessageTime() const;
162 
164  void setState(FederateStates newState);
165 
167  bool messageShouldBeDelayed(const ActionMessage& cmd) const noexcept;
169  void addFederateToDelay(GlobalFederateId gid);
171  void generateConfig(Json::Value& base) const;
172 
173  public:
175  void reset(const CoreFederateInfo& fedInfo);
177  const std::string& getIdentifier() const { return name; }
179  FederateStates getState() const;
181  InterfaceInfo& interfaces() { return interfaceInformation; }
183  const InterfaceInfo& interfaces() const { return interfaceInformation; }
184 
186  uint64_t getQueueSize(InterfaceHandle hid) const;
189  uint64_t getQueueSize() const;
193  int32_t getCurrentIteration() const;
197  std::unique_ptr<Message> receive(InterfaceHandle hid);
200  std::unique_ptr<Message> receiveAny(InterfaceHandle& hid);
204  const std::shared_ptr<const SmallBuffer>& getValue(InterfaceHandle handle,
205  uint32_t* inputIndex);
206 
211  const std::vector<std::shared_ptr<const SmallBuffer>>& getAllValues(InterfaceHandle handle);
212 
214  std::pair<SmallBuffer, Time> getPublishedValue(InterfaceHandle handle);
216  void setParent(CommonCore* coreObject) { mParent = coreObject; }
221  void setProperties(const ActionMessage& cmd);
223  void setInterfaceProperty(const ActionMessage& cmd);
225  void setProperty(int timeProperty, Time propertyVal);
227  void setProperty(int intProperty, int propertyVal);
229  void setOptionFlag(int optionFlag, bool value);
231  Time getTimeProperty(int timeProperty) const;
233  bool getOptionFlag(int optionFlag) const;
235  int32_t getHandleOption(InterfaceHandle handle, char iType, int32_t option) const;
237  uint16_t getInterfaceFlags() const { return interfaceFlags.load(); }
239  int getIntegerProperty(int intProperty) const;
241  int publicationCount() const;
243  int endpointCount() const;
245  int inputCount() const;
247  void spinlock() const
248  {
249  while (processing.test_and_set()) {
250  ; // spin
251  }
252  }
254  void sleeplock() const
255  {
256  if (!processing.test_and_set()) {
257  return;
258  }
259  // spin for 10000 tries
260  for (int ii = 0; ii < 10000; ++ii) {
261  if (!processing.test_and_set()) {
262  return;
263  }
264  }
265  while (processing.test_and_set()) {
266  std::this_thread::yield();
267  }
268  }
270  void lock() { sleeplock(); }
271 
273  bool try_lock() const { return !processing.test_and_set(); }
275  void unlock() const { processing.clear(); }
277  int loggingLevel() const;
278 
280  void setTag(std::string_view tag, std::string_view value);
282  const std::string& getTag(std::string_view tag) const;
284  const std::pair<std::string, std::string>& getTagByIndex(size_t index) const
285  {
286  return tags[index];
287  }
289  auto tagCount() const { return tags.size(); }
291  bool isCallbackFederate() const { return mCallbackBased; }
292 
293  private:
303  MessageProcessingResult processQueue() noexcept;
304 
314  MessageProcessingResult processDelayQueue() noexcept;
315 
316  std::optional<MessageProcessingResult>
317  checkProcResult(std::tuple<FederateStates, MessageProcessingResult, bool>& proc_result,
318  ActionMessage& cmd);
322  MessageProcessingResult processActionMessage(ActionMessage& cmd);
323 
326  void processDataConnectionMessage(ActionMessage& cmd);
327 
330  void processDataMessage(ActionMessage& cmd);
332  void timeoutCheck(ActionMessage& cmd);
334  void processLoggingMessage(ActionMessage& cmd);
338  void fillEventVectorUpTo(Time currentTime);
342  void fillEventVectorInclusive(Time currentTime);
346  void fillEventVectorNextIteration(Time currentTime);
348  void addDependency(GlobalFederateId fedToDependOn);
350  void addDependent(GlobalFederateId fedThatDependsOnThis);
352  void resetDependency(GlobalFederateId gid);
353 
355  int checkInterfaces();
357  std::string processQueryActual(std::string_view query) const;
361  void generateProfilingMessage(bool enterHelicsCode);
363  void generateProfilingMarker();
365  void updateMaxLogLevel();
366 
368  void callbackProcessing() noexcept;
369  void callbackReturnResult(FederateStates lastState,
371  FederateStates newState) noexcept;
372  void initCallbackProcessing();
373  void execCallbackProcessing(IterationResult result);
375  void updateDataForExecEntry(MessageProcessingResult result, IterationRequest iterate);
377  void updateDataForTimeReturn(MessageProcessingResult result,
378  Time nextTime,
379  IterationRequest iterate);
380 
381  public:
383  Time grantedTime() const { return time_granted; }
385  Time nextAllowedSendTime() const { return allowed_send_time; }
388  const std::vector<InterfaceHandle>& getEvents() const;
391  std::vector<GlobalFederateId> getDependencies() const;
394  std::vector<GlobalFederateId> getDependents() const;
396  const std::string& lastErrorString() const { return errorString; }
398  int lastErrorCode() const noexcept { return errorCode; }
400  void setCoreObject(CommonCore* parent);
401  // the next 5 functions are the processing functions that actually process the queue
417  iteration_time enterExecutingMode(IterationRequest iterate, bool sendRequest = false);
425  iteration_time requestTime(Time nextTime, IterationRequest iterate, bool sendRequest = false);
429  std::vector<GlobalHandle> getSubscribers(InterfaceHandle handle);
430 
434  std::vector<std::pair<GlobalHandle, std::string_view>>
436 
443  void finalize();
445  void processCommunications(std::chrono::milliseconds period);
447  void addAction(const ActionMessage& action);
449  void addAction(ActionMessage&& action);
451  std::optional<ActionMessage> processPostTerminationAction(const ActionMessage& action);
452 
454  void forceProcessMessage(ActionMessage& action);
455 
463  void logMessage(int level,
464  std::string_view logMessageSource,
465  std::string_view message,
466  bool fromRemote = false) const;
467 
472  void setLogger(std::function<void(int, std::string_view, std::string_view)> logFunction);
473 
476  void setCallbackOperator(std::shared_ptr<FederateOperator> fed)
477  {
478  fedCallbacks = std::move(fed);
479  }
480 
484  void setQueryCallback(std::function<std::string(std::string_view)> queryCallbackFunction,
485  int order)
486  {
487  order = std::clamp(order, 1, 10);
488 
489  if (static_cast<int>(queryCallbacks.size()) < order) {
490  queryCallbacks.resize(order);
491  }
492  queryCallbacks[order - 1] = std::move(queryCallbackFunction);
493  }
499  std::string processQuery(std::string_view query, bool force_ordering = false) const;
507  bool checkAndSetValue(InterfaceHandle pub_id, const char* data, uint64_t len);
508 
510  void routeMessage(const ActionMessage& msg);
511 
513  void routeMessage(ActionMessage&& msg);
515  void createInterface(InterfaceType htype,
516  InterfaceHandle handle,
517  std::string_view key,
518  std::string_view type,
519  std::string_view units,
520  uint16_t flags);
522  void closeInterface(InterfaceHandle handle, InterfaceType type);
524  void sendCommand(ActionMessage& command);
525 
527  std::pair<std::string, std::string> getCommand();
529  std::pair<std::string, std::string> waitCommand();
530 };
531 
532 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: CommonCore.hpp:75
Definition: CoreFederateInfo.hpp:16
Definition: FederateState.hpp:50
void setQueryCallback(std::function< std::string(std::string_view)> queryCallbackFunction, int order)
Definition: FederateState.hpp:484
const std::string & getTag(std::string_view tag) const
Definition: FederateState.cpp:2877
void setInterfaceProperty(const ActionMessage &cmd)
Definition: FederateState.cpp:1966
std::atomic< bool > init_transmitted
Definition: FederateState.hpp:92
std::pair< SmallBuffer, Time > getPublishedValue(InterfaceHandle handle)
Definition: FederateState.cpp:341
void routeMessage(const ActionMessage &msg)
Definition: FederateState.cpp:350
void spinlock() const
Definition: FederateState.hpp:247
auto tagCount() const
Definition: FederateState.hpp:289
std::atomic< bool > initRequested
Definition: FederateState.hpp:118
void reset(const CoreFederateInfo &fedInfo)
Definition: FederateState.cpp:157
const std::pair< std::string, std::string > & getTagByIndex(size_t index) const
Definition: FederateState.hpp:284
std::optional< ActionMessage > processPostTerminationAction(const ActionMessage &action)
Definition: FederateState.cpp:478
const std::string & getIdentifier() const
Definition: FederateState.hpp:177
const std::vector< std::shared_ptr< const SmallBuffer > > & getAllValues(InterfaceHandle handle)
Definition: FederateState.cpp:336
bool isCallbackFederate() const
Definition: FederateState.hpp:291
void setCoreObject(CommonCore *parent)
Definition: FederateState.cpp:2417
int inputCount() const
Definition: FederateState.cpp:2337
std::vector< GlobalFederateId > getDependents() const
Definition: FederateState.cpp:2347
std::pair< std::string, std::string > getCommand()
Definition: FederateState.cpp:2601
void forceProcessMessage(ActionMessage &action)
Definition: FederateState.cpp:491
iteration_time enterExecutingMode(IterationRequest iterate, bool sendRequest=false)
Definition: FederateState.cpp:568
std::unique_ptr< Message > receive(InterfaceHandle hid)
Definition: FederateState.cpp:293
InterfaceInfo & interfaces()
Definition: FederateState.hpp:181
LocalFederateId local_id
id code for the local federate descriptor
Definition: FederateState.hpp:68
void closeInterface(InterfaceHandle handle, InterfaceType type)
Definition: FederateState.cpp:436
void setProperties(const ActionMessage &cmd)
Definition: FederateState.cpp:1925
Time nextAllowedSendTime() const
Definition: FederateState.hpp:385
uint16_t getInterfaceFlags() const
Definition: FederateState.hpp:237
MessageProcessingResult genericUnspecifiedQueueProcess(bool busyReturn)
Definition: FederateState.cpp:891
void unlock() const
Definition: FederateState.hpp:275
std::vector< GlobalFederateId > getDependencies() const
Definition: FederateState.cpp:2342
int publicationCount() const
Definition: FederateState.cpp:2327
void setOptionFlag(int optionFlag, bool value)
Definition: FederateState.cpp:2113
void addAction(const ActionMessage &action)
Definition: FederateState.cpp:380
void finalize()
Definition: FederateState.cpp:930
std::vector< std::pair< GlobalHandle, std::string_view > > getMessageDestinations(InterfaceHandle handle)
Definition: FederateState.cpp:682
IterationResult waitSetup()
Definition: FederateState.cpp:501
uint64_t getQueueSize() const
Definition: FederateState.cpp:278
int indexGroup
storage for index group location (this only matters on construction so can be public)
Definition: FederateState.hpp:94
bool getOptionFlag(int optionFlag) const
Definition: FederateState.cpp:2248
void logMessage(int level, std::string_view logMessageSource, std::string_view message, bool fromRemote=false) const
Definition: FederateState.cpp:2424
const std::vector< InterfaceHandle > & getEvents() const
Definition: FederateState.cpp:992
std::pair< std::string, std::string > waitCommand()
Definition: FederateState.cpp:2616
std::unique_ptr< Message > receiveAny(InterfaceHandle &hid)
Definition: FederateState.cpp:302
FederateState(const FederateState &)=delete
FederateStates getState() const
Definition: FederateState.cpp:232
iteration_time requestTime(Time nextTime, IterationRequest iterate, bool sendRequest=false)
Definition: FederateState.cpp:691
int lastErrorCode() const noexcept
Definition: FederateState.hpp:398
const std::string & lastErrorString() const
Definition: FederateState.hpp:396
void sleeplock() const
Definition: FederateState.hpp:254
int32_t getCurrentIteration() const
Definition: FederateState.cpp:237
void setCallbackOperator(std::shared_ptr< FederateOperator > fed)
Definition: FederateState.hpp:476
const std::shared_ptr< const SmallBuffer > & getValue(InterfaceHandle handle, uint32_t *inputIndex)
Definition: FederateState.cpp:329
void createInterface(InterfaceType htype, InterfaceHandle handle, std::string_view key, std::string_view type, std::string_view units, uint16_t flags)
Definition: FederateState.cpp:400
void sendCommand(ActionMessage &command)
Definition: FederateState.cpp:2482
void setParent(CommonCore *coreObject)
Definition: FederateState.hpp:216
void setProperty(int timeProperty, Time propertyVal)
Definition: FederateState.cpp:2032
IterationResult enterInitializingMode(IterationRequest request)
Definition: FederateState.cpp:530
void processCommunications(std::chrono::milliseconds period)
Definition: FederateState.cpp:966
int getIntegerProperty(int intProperty) const
Definition: FederateState.cpp:2309
void lock()
Definition: FederateState.hpp:270
int loggingLevel() const
Definition: FederateState.cpp:2858
std::vector< GlobalHandle > getSubscribers(InterfaceHandle handle)
Definition: FederateState.cpp:668
int32_t getHandleOption(InterfaceHandle handle, char iType, int32_t option) const
Definition: FederateState.cpp:2293
Time getTimeProperty(int timeProperty) const
Definition: FederateState.cpp:2233
bool try_lock() const
Definition: FederateState.hpp:273
const InterfaceInfo & interfaces() const
Definition: FederateState.hpp:183
void setLogger(std::function< void(int, std::string_view, std::string_view)> logFunction)
Definition: FederateState.cpp:287
Time grantedTime() const
Definition: FederateState.hpp:383
FederateState(const std::string &fedName, const CoreFederateInfo &fedInfo)
Definition: FederateState.cpp:107
bool checkAndSetValue(InterfaceHandle pub_id, const char *data, uint64_t len)
Definition: FederateState.cpp:242
std::string processQuery(std::string_view query, bool force_ordering=false) const
Definition: FederateState.cpp:2835
void setTag(std::string_view tag, std::string_view value)
Definition: FederateState.cpp:2863
std::atomic< GlobalFederateId > global_id
global id code, default to invalid
Definition: FederateState.hpp:69
int endpointCount() const
Definition: FederateState.cpp:2332
Definition: GlobalFederateId.hpp:75
Definition: LocalFederateId.hpp:65
Definition: InterfaceInfo.hpp:26
Definition: LocalFederateId.hpp:22
@ HELICS_LOG_LEVEL_NO_PRINT
Definition: helics_enums.h:204
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
IterationResult
Definition: CoreTypes.hpp:93
FederateStates
Definition: CoreTypes.hpp:21
MessageProcessingResult
Definition: CoreTypes.hpp:74
constexpr Time timeZero
Definition: helicsTime.hpp:31
TimeSynchronizationMethod
enumeration of possible time coordination methods
Definition: FederateState.hpp:47
IterationRequest
Definition: CoreTypes.hpp:102
@ NO_ITERATIONS
indicator that the iterations have completed
InterfaceType
Definition: CoreTypes.hpp:111
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
Definition: helicsTime.hpp:43