helics  2.8.1
FederateState.hpp
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/GuardedTypes.hpp"
10 #include "ActionMessage.hpp"
11 #include "BasicHandleInfo.hpp"
12 #include "InterfaceInfo.hpp"
13 #include "core-data.hpp"
14 #include "core-types.hpp"
15 #include "gmlc/containers/BlockingQueue.hpp"
16 #include "helics-time.hpp"
17 
18 #include <atomic>
19 #include <chrono>
20 #include <deque>
21 #include <map>
22 #include <memory>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27 
28 namespace helics {
29 class SubscriptionInfo;
30 class PublicationInfo;
31 class EndpointInfo;
32 class FilterInfo;
33 class CommonCore;
34 class CoreFederateInfo;
35 
36 class TimeCoordinator;
37 class MessageTimer;
38 
39 constexpr Time startupTime = Time::minVal();
40 constexpr Time initialTime{-1000000.0};
43  public:
45  FederateState(const std::string& fedName, const CoreFederateInfo& fedInfo);
46  // the destructor is defined so some classes linked with unique ptrs don't have to be defined in
47  // the header
49  FederateState(const FederateState&) = delete;
50  FederateState& operator=(const FederateState&) = delete;
53 
54  private:
55  const std::string name;
56  std::unique_ptr<TimeCoordinator>
57  timeCoord;
58  public:
60  std::atomic<global_federate_id> global_id;
61 
62  private:
63  std::atomic<federate_state> state{HELICS_CREATED};
64  bool only_transmit_on_change{false};
65  bool realtime{false};
67  bool observer{false};
68  bool source_only{false};
69  bool ignore_time_mismatch_warnings{
70  false};
71  bool strict_input_type_checking{
72  false};
73  bool ignore_unit_mismatch{false};
74  bool slow_responding{
75  false};
76  InterfaceInfo interfaceInformation;
77 
78  public:
79  std::atomic<bool> init_transmitted{false};
80  private:
82  bool wait_for_current_time{false};
84  bool mProfilerActive{false};
87  bool mLocalProfileCapture{false};
88  int errorCode{0};
89  CommonCore* parent_{nullptr};
90  std::string errorString;
91  decltype(std::chrono::steady_clock::now())
92  start_clock_time;
93  Time rt_lag{timeZero};
94  Time rt_lead{timeZero};
95  int32_t realTimeTimerIndex{-1};
96  public:
97  std::atomic<bool> init_requested{
98  false};
99  // temporary
100  std::atomic<bool> requestingMode{false};
101 
102  private:
103  bool iterating{false};
104  bool timeGranted_mode{false};
105  bool terminate_on_error{false};
107  int logLevel{1};
109 
110  // std::vector<ActionMessage> messLog;
111  private:
112  std::shared_ptr<MessageTimer>
113  mTimer;
114  gmlc::containers::BlockingQueue<ActionMessage>
115  queue;
116  std::atomic<uint16_t> interfaceFlags{
117  0};
118  std::map<global_federate_id, std::deque<ActionMessage>>
119  delayQueues;
120  std::vector<interface_handle> events;
121  std::vector<interface_handle> eventMessages;
122  std::vector<global_federate_id> delayedFederates;
123  Time time_granted{startupTime};
124  Time allowed_send_time{startupTime};
125  mutable std::atomic_flag processing = ATOMIC_FLAG_INIT;
126  private:
128  std::function<void(int, const std::string&, const std::string&)>
129  loggerFunction;
130  std::function<std::string(const std::string&)>
131  queryCallback;
132 
133  Time nextValueTime() const;
135  Time nextMessageTime() const;
136 
138  void setState(federate_state newState);
139 
141  bool messageShouldBeDelayed(const ActionMessage& cmd) const;
143  void addFederateToDelay(global_federate_id id);
145  void generateConfig(Json::Value& base) const;
146 
147  public:
149  void reset();
151  void reInit();
153  const std::string& getIdentifier() const { return name; }
155  federate_state getState() const;
157  InterfaceInfo& interfaces() { return interfaceInformation; }
159  const InterfaceInfo& interfaces() const { return interfaceInformation; }
160 
162  uint64_t getQueueSize(interface_handle id) const;
165  uint64_t getQueueSize() const;
169  int32_t getCurrentIteration() const;
173  std::unique_ptr<Message> receive(interface_handle id);
176  std::unique_ptr<Message> receiveAny(interface_handle& id);
180  const std::shared_ptr<const data_block>& getValue(interface_handle handle,
181  uint32_t* inputIndex);
182 
187  const std::vector<std::shared_ptr<const data_block>>& getAllValues(interface_handle handle);
188 
190  void setParent(CommonCore* coreObject) { parent_ = coreObject; }
195  void setProperties(const ActionMessage& cmd);
197  void setInterfaceProperty(const ActionMessage& cmd);
199  void setProperty(int timeProperty, Time propertyVal);
201  void setProperty(int intProperty, int propertyVal);
203  void setOptionFlag(int optionFlag, bool value);
205  Time getTimeProperty(int timeProperty) const;
207  bool getOptionFlag(int optionFlag) const;
209  int32_t getHandleOption(interface_handle handle, char iType, int32_t option) const;
211  uint16_t getInterfaceFlags() const { return interfaceFlags.load(); }
213  int getIntegerProperty(int intProperty) const;
215  int publicationCount() const;
217  int endpointCount() const;
219  int inputCount() const;
221  void spinlock() const
222  {
223  while (processing.test_and_set()) {
224  ; // spin
225  }
226  }
228  void sleeplock() const
229  {
230  if (!processing.test_and_set()) {
231  return;
232  }
233  // spin for 10000 tries
234  for (int ii = 0; ii < 10000; ++ii) {
235  if (!processing.test_and_set()) {
236  return;
237  }
238  }
239  while (processing.test_and_set()) {
240  std::this_thread::yield();
241  }
242  }
244  void lock() { sleeplock(); }
245 
247  bool try_lock() const { return !processing.test_and_set(); }
249  void unlock() const { processing.clear(); }
251  int loggingLevel() const { return logLevel; }
252 
253  private:
263  message_processing_result processQueue() noexcept;
264 
274  message_processing_result processDelayQueue() noexcept;
278  message_processing_result processActionMessage(ActionMessage& cmd);
282  void fillEventVectorUpTo(Time currentTime);
286  void fillEventVectorInclusive(Time currentTime);
290  void fillEventVectorNextIteration(Time currentTime);
292  void addDependency(global_federate_id fedToDependOn);
294  void addDependent(global_federate_id fedThatDependsOnThis);
296  int checkInterfaces();
298  std::string processQueryActual(const std::string& query) const;
302  void generateProfilingMessage(bool enterHelicsCode);
304  void generateProfilingMarker();
305 
306  public:
308  Time grantedTime() const { return time_granted; }
310  Time nextAllowedSendTime() const { return allowed_send_time; }
313  const std::vector<interface_handle>& getEvents() const;
316  std::vector<global_federate_id> getDependencies() const;
319  std::vector<global_federate_id> getDependents() const;
321  const std::string& lastErrorString() const { return errorString; }
323  int lastErrorCode() const noexcept { return errorCode; }
325  void setCoreObject(CommonCore* parent);
326  // the next 5 functions are the processing functions that actually process the queue
337  iteration_result enterExecutingMode(iteration_request iterate, bool sendRequest = false);
345  iteration_time requestTime(Time nextTime, iteration_request iterate, bool sendRequest = false);
349  std::vector<global_handle> getSubscribers(interface_handle handle);
350 
356  void finalize();
357 
359  void addAction(const ActionMessage& action);
361  void addAction(ActionMessage&& action);
363  opt<ActionMessage> processPostTerminationAction(const ActionMessage& action);
369  void logMessage(int level,
370  const std::string& logMessageSource,
371  const std::string& message) const;
372 
377  void setLogger(std::function<void(int, const std::string&, const std::string&)> logFunction)
378  {
379  loggerFunction = std::move(logFunction);
380  }
384  void setQueryCallback(std::function<std::string(const std::string&)> queryCallbackFunction)
385  {
386  queryCallback = std::move(queryCallbackFunction);
387  }
393  std::string processQuery(const std::string& query, bool force_ordering = false) const;
401  bool checkAndSetValue(interface_handle pub_id, const char* data, uint64_t len);
402 
404  void routeMessage(const ActionMessage& msg);
406  void createInterface(handle_type htype,
407  interface_handle handle,
408  const std::string& key,
409  const std::string& type,
410  const std::string& units);
412  void closeInterface(interface_handle handle, handle_type type);
413 };
414 
415 } // namespace helics
helics::timeZero
constexpr Time timeZero
Definition: helics-time.hpp:31
helics-time.hpp
helics::EndpointInfo::id
const global_handle id
identifier for the handle
Definition: EndpointInfo.hpp:27
core-types.hpp
generateJsonString
std::string generateJsonString(const Json::Value &block)
Definition: JsonProcessingFunctions.cpp:97
helics::FederateState::getSubscribers
std::vector< global_handle > getSubscribers(interface_handle handle)
Definition: FederateState.cpp:536
data
@ data
print timing+data transmissions
Definition: loggingHelper.hpp:30
helics::FederateState::setLogger
void setLogger(std::function< void(int, const std::string &, const std::string &)> logFunction)
Definition: FederateState.hpp:377
helics::ActionMessage::source_id
global_federate_id source_id
12 – for federate_id or route_id
Definition: ActionMessage.hpp:36
helics::initializationTime
constexpr Time initializationTime
Definition: helics-time.hpp:38
helics::HELICS_INITIALIZING
@ HELICS_INITIALIZING
Definition: core-types.hpp:24
helics::FederateState::nextAllowedSendTime
Time nextAllowedSendTime() const
Definition: FederateState.hpp:310
helics::ActionMessage::setSource
void setSource(global_handle hand)
Definition: ActionMessage.hpp:94
helics::FederateState::interfaces
InterfaceInfo & interfaces()
Definition: FederateState.hpp:157
helics::FederateState::lastErrorString
const std::string & lastErrorString() const
Definition: FederateState.hpp:321
helics::InterfaceInfo
Definition: InterfaceInfo.hpp:25
helics::iteration_request
iteration_request
Definition: core-types.hpp:90
helics::FederateState::publicationCount
int publicationCount() const
Definition: FederateState.cpp:1540
helics::local_federate_id
Definition: federate_id.hpp:22
helics::EndpointInfo
Definition: EndpointInfo.hpp:19
core-data.hpp
helics::iteration_result
iteration_result
Definition: core-types.hpp:81
helics::FederateState::getCurrentIteration
int32_t getCurrentIteration() const
Definition: FederateState.cpp:170
helics::FederateState::receive
std::unique_ptr< Message > receive(interface_handle id)
Definition: FederateState.cpp:218
helics_definitions.hpp
base helics enumerations for C++ API's, a namespace wrapper for the definitions defined in helics_enu...
helics::global_federate_id
Definition: global_federate_id.hpp:68
helics::FederateState::getTimeProperty
Time getTimeProperty(int timeProperty) const
Definition: FederateState.cpp:1463
helics::CommonCore
Definition: CommonCore.hpp:73
helics::setIterationFlags
void setIterationFlags(ActionMessage &command, iteration_request iterate)
Definition: ActionMessage.cpp:950
helics::FederateState::getOptionFlag
bool getOptionFlag(int optionFlag) const
Definition: FederateState.cpp:1477
helics::FederateState::getState
federate_state getState() const
Definition: FederateState.cpp:165
helics::parent_broker_id
constexpr global_broker_id parent_broker_id
Definition: global_federate_id.hpp:60
helics::FederateState::getDependencies
std::vector< global_federate_id > getDependencies() const
Definition: FederateState.cpp:1555
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helics-time.hpp:27
helics::FederateState::getValue
const std::shared_ptr< const data_block > & getValue(interface_handle handle, uint32_t *inputIndex)
Definition: FederateState.cpp:254
helics::FederateState::reset
void reset()
Definition: FederateState.cpp:147
helics::federate_state
federate_state
Definition: core-types.hpp:22
helics::FederateState::lock
void lock()
Definition: FederateState.hpp:244
interfaces
@ interfaces
print connections +interface level connection information
Definition: loggingHelper.hpp:27
helics::ActionMessage
Definition: ActionMessage.hpp:29
helics::FederateState::getDependents
std::vector< global_federate_id > getDependents() const
Definition: FederateState.cpp:1560
helics::FederateState::init_transmitted
std::atomic< bool > init_transmitted
the initialization request has been transmitted
Definition: FederateState.hpp:79
helics::FederateState::enterExecutingMode
iteration_result enterExecutingMode(iteration_request iterate, bool sendRequest=false)
Definition: FederateState.cpp:460
helics::FederateState::reInit
void reInit()
Definition: FederateState.cpp:158
helics::FederateState::getIntegerProperty
int getIntegerProperty(int intProperty) const
Definition: FederateState.cpp:1528
helics::FederateState::lastErrorCode
int lastErrorCode() const noexcept
Definition: FederateState.hpp:323
helics::returnableResult
bool returnableResult(message_processing_result result)
Definition: core-types.hpp:75
helics_log_level_warning
@ helics_log_level_warning
Definition: helics_enums.h:169
iteration_requested_flag
@ iteration_requested_flag
indicator that an iteration has been requested
Definition: flagOperations.hpp:16
helics::FederateState::try_lock
bool try_lock() const
Definition: FederateState.hpp:247
helics::FederateState::inputCount
int inputCount() const
Definition: FederateState.cpp:1550
checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:75
helics::message_processing_result
message_processing_result
Definition: core-types.hpp:65
helics::FederateState::enterInitializingMode
iteration_result enterInitializingMode()
Definition: FederateState.cpp:428
helics::FederateState::sleeplock
void sleeplock() const
Definition: FederateState.hpp:228
helics::FederateState::setOptionFlag
void setOptionFlag(int optionFlag, bool value)
Definition: FederateState.cpp:1368
helics::FederateState::getHandleOption
int32_t getHandleOption(interface_handle handle, char iType, int32_t option) const
Definition: FederateState.cpp:1512
helics::FederateState::getIdentifier
const std::string & getIdentifier() const
Definition: FederateState.hpp:153
helics::FederateState::getAllValues
const std::vector< std::shared_ptr< const data_block > > & getAllValues(interface_handle handle)
Definition: FederateState.cpp:261
helics::global_handle::handle
interface_handle handle
the interface handle component
Definition: global_federate_id.hpp:131
helics::FederateState::grantedTime
Time grantedTime() const
Definition: FederateState.hpp:308
helics::commandErrorString
const char * commandErrorString(int errorCode)
Definition: ActionMessage.cpp:837
helics::FederateState::createInterface
void createInterface(handle_type htype, interface_handle handle, const std::string &key, const std::string &type, const std::string &units)
Definition: FederateState.cpp:300
helics::EndpointInfo::getMessage
std::unique_ptr< Message > getMessage(Time maxTime)
Definition: EndpointInfo.cpp:80
helics::FederateState::setProperty
void setProperty(int timeProperty, Time propertyVal)
Definition: FederateState.cpp:1324
helics::iteration_time::grantedTime
Time grantedTime
the time of the granted step
Definition: helics-time.hpp:48
helics::FederateState::getQueueSize
uint64_t getQueueSize() const
Definition: FederateState.cpp:209
indicator_flag
@ indicator_flag
flag used for setting values
Definition: flagOperations.hpp:21
helics::FederateState::unlock
void unlock() const
Definition: FederateState.hpp:249
helics::FederateState::global_id
std::atomic< global_federate_id > global_id
global id code, default to invalid
Definition: FederateState.hpp:60
helics::FederateState::checkAndSetValue
bool checkAndSetValue(interface_handle pub_id, const char *data, uint64_t len)
Definition: FederateState.cpp:175
helics::FederateState::setProperties
void setProperties(const ActionMessage &cmd)
Definition: FederateState.cpp:1217
helics::HELICS_ERROR
@ HELICS_ERROR
the federation has encountered an error
Definition: core-types.hpp:28
helics::FederateState::getInterfaceFlags
uint16_t getInterfaceFlags() const
Definition: FederateState.hpp:211
helics::FederateState::~FederateState
~FederateState()
helics::fedStateString
const std::string & fedStateString(federate_state state)
Definition: FederateState.cpp:1645
setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:67
helics::FederateState::spinlock
void spinlock() const
Definition: FederateState.hpp:221
helics::interface_handle
Definition: federate_id.hpp:65
helics::FederateState::receiveAny
std::unique_ptr< Message > receiveAny(interface_handle &id)
Definition: FederateState.cpp:227
helics::handle_type
handle_type
Definition: BasicHandleInfo.hpp:16
helics::FederateState::endpointCount
int endpointCount() const
Definition: FederateState.cpp:1545
helics::HELICS_TERMINATING
@ HELICS_TERMINATING
the federate is in the process of shutting down
Definition: core-types.hpp:27
helics::FederateState::getEvents
const std::vector< interface_handle > & getEvents() const
Definition: FederateState.cpp:750
helics::prettyPrintString
std::string prettyPrintString(const ActionMessage &command)
Definition: ActionMessage.cpp:861
helics::TimeCoordinator
Definition: TimeCoordinator.hpp:55
helics::FederateState::FederateState
FederateState(const std::string &fedName, const CoreFederateInfo &fedInfo)
Definition: FederateState.cpp:100
helics::FederateState::finalize
void finalize()
Definition: FederateState.cpp:734
helics::HELICS_EXECUTING
@ HELICS_EXECUTING
the federation has entered execution state and it now advancing in time
Definition: core-types.hpp:26
error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
helics::FederateState
Definition: FederateState.hpp:42
helics::FederateState::closeInterface
void closeInterface(interface_handle handle, handle_type type)
Definition: FederateState.cpp:353
helics::ActionMessage::actionTime
Time actionTime
40 the time an action took place or will take place //32
Definition: ActionMessage.hpp:43
helics::FederateState::init_requested
std::atomic< bool > init_requested
this federate has requested entry to initialization
Definition: FederateState.hpp:97
helics::FederateState::loggingLevel
int loggingLevel() const
Definition: FederateState.hpp:251
helics::FederateState::setQueryCallback
void setQueryCallback(std::function< std::string(const std::string &)> queryCallbackFunction)
Definition: FederateState.hpp:384
helics::FederateState::processQuery
std::string processQuery(const std::string &query, bool force_ordering=false) const
Definition: FederateState.cpp:1805
log_level
log_level
Definition: loggingHelper.hpp:20
helics::FederateState::routeMessage
void routeMessage(const ActionMessage &msg)
Definition: FederateState.cpp:266
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
error
@ error
only print errors
Definition: loggingHelper.hpp:22
required_flag
@ required_flag
flag indicating that an action or match is required
Definition: flagOperations.hpp:18
helics::CoreFederateInfo
Definition: CoreFederateInfo.hpp:16
helics::iteration_time
Definition: helics-time.hpp:47
optional_flag
@ optional_flag
flag indicating that a connection is optional and may not be matched
Definition: flagOperations.hpp:24
helics::FederateState::interfaces
const InterfaceInfo & interfaces() const
Definition: FederateState.hpp:159
helics::FederateState::logMessage
void logMessage(int level, const std::string &logMessageSource, const std::string &message) const
Definition: FederateState.cpp:1632
helics::createMessageFromCommand
std::unique_ptr< Message > createMessageFromCommand(const ActionMessage &cmd)
Definition: ActionMessage.cpp:628
helics::FederateState::addAction
void addAction(const ActionMessage &action)
Definition: FederateState.cpp:281
helics::getValue
X getValue(ValueFederate &fed, const std::string &key)
Definition: Subscriptions.hpp:67
helics::ActionMessage::setDestination
void setDestination(global_handle hand)
Definition: ActionMessage.hpp:100
helics::FederateState::genericUnspecifiedQueueProcess
iteration_result genericUnspecifiedQueueProcess()
Definition: FederateState.cpp:720
helics::FederateState::setParent
void setParent(CommonCore *coreObject)
Definition: FederateState.hpp:190
helics::PublicationInfo::CheckSetValue
bool CheckSetValue(const char *dataToCheck, uint64_t len)
Definition: PublicationInfo.cpp:11
helics::FederateState::processPostTerminationAction
opt< ActionMessage > processPostTerminationAction(const ActionMessage &action)
Definition: FederateState.cpp:395
helics::ActionMessage::dest_id
global_federate_id dest_id
20 fed_id for a targeted message
Definition: ActionMessage.hpp:38
helics::HELICS_FINISHED
@ HELICS_FINISHED
the federation has finished its execution
Definition: core-types.hpp:29
helics::ActionMessage::action
action_message_def::action_t action() const noexcept
Definition: ActionMessage.hpp:89
helics::FederateState::waitSetup
iteration_result waitSetup()
Definition: FederateState.cpp:400
helics::FederateState::setInterfaceProperty
void setInterfaceProperty(const ActionMessage &cmd)
Definition: FederateState.cpp:1258
helics::EndpointInfo::availableMessages
int32_t availableMessages() const
Definition: EndpointInfo.cpp:125
helics::HELICS_UNKNOWN
@ HELICS_UNKNOWN
unknown state
Definition: core-types.hpp:30
helics::FederateState::requestTime
iteration_time requestTime(Time nextTime, iteration_request iterate, bool sendRequest=false)
Definition: FederateState.cpp:547
helics::message_processing_result::iterating
@ iterating
indicator that the iterations need to continue
make_flags
constexpr uint16_t make_flags(unsigned int flag)
Definition: flagOperations.hpp:96
helics::FederateState::local_id
local_federate_id local_id
id code for the local federate descriptor
Definition: FederateState.hpp:59
helics::FederateState::setCoreObject
void setCoreObject(CommonCore *parent)
Definition: FederateState.cpp:1625
helics::InterfaceInfo::setGlobalId
void setGlobalId(global_federate_id newglobalId)
Definition: InterfaceInfo.hpp:63