helics  3.0.1
FilterFederate.hpp
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/JsonBuilder.hpp"
10 #include "Core.hpp"
11 #include "FilterCoordinator.hpp"
12 #include "FilterInfo.hpp"
13 #include "GlobalFederateId.hpp"
14 #include "TimeCoordinator.hpp"
15 #include "gmlc/containers/AirLock.hpp"
16 #include "gmlc/containers/MappedPointerVector.hpp"
17 
18 #include <any>
19 #include <deque>
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <set>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 namespace helics {
29 class HandleManager;
30 class ActionMessage;
31 class BasicHandleInfo;
32 
34  private:
35  GlobalFederateId mFedID;
36  GlobalBrokerId mCoreID;
37  const std::string mName;
38  // Core* mCore{nullptr};
39  TimeCoordinator mCoord;
40  HandleManager* mHandles{nullptr};
41  FederateStates current_state{HELICS_CREATED};
43  std::map<InterfaceHandle, std::unique_ptr<FilterCoordinator>> filterCoord;
44  // The interface_handle used is here is usually referencing an endpoint
45 
46  std::function<void(const ActionMessage&)> mQueueMessage;
47  std::function<void(ActionMessage&&)> mQueueMessageMove;
48  std::function<void(const ActionMessage&)> mSendMessage;
49  std::function<void(ActionMessage&&)> mSendMessageMove;
50 
51  std::function<void(ActionMessage&)> mDeliverMessage;
52 
53  std::function<void(int, const std::string&, const std::string&)> mLogger;
54  std::function<gmlc::containers::AirLock<std::any>&(int)> mGetAirLock;
55  std::deque<std::pair<int32_t, Time>> timeBlockProcesses;
56  Time minReturnTime{Time::maxVal()};
58  std::map<int32_t, std::set<int32_t>> ongoingFilterProcesses;
60  std::map<int32_t, std::set<int32_t>> ongoingDestFilterProcesses;
63  std::atomic<int32_t> messageCounter{54};
65  gmlc::containers::MappedPointerVector<FilterInfo, GlobalHandle> filters;
66  // bool hasTiming{false};
67 
68  public:
69  FilterFederate(GlobalFederateId fedID, std::string name, GlobalBrokerId coreID, Core* core);
79  InterfaceHandle handle,
80  const std::string& key,
81  const std::string& type_in,
82  const std::string& type_out,
83  bool cloning);
84 
85  void setCallbacks(std::function<void(const ActionMessage&)> queueMessage,
86  std::function<void(ActionMessage&&)> queueMessageMove,
87  std::function<void(const ActionMessage&)> sendMessage,
88  std::function<void(ActionMessage&&)> sendMessageMove)
89  {
90  mQueueMessage = std::move(queueMessage);
91  mQueueMessageMove = std::move(queueMessageMove);
92  mSendMessage = std::move(sendMessage);
93  mSendMessageMove = std::move(sendMessageMove);
94  }
95 
96  void setLogger(std::function<void(int, const std::string&, const std::string&)> logger)
97  {
98  mLogger = std::move(logger);
99  }
100 
101  void setDeliver(std::function<void(ActionMessage&)> deliverMessage)
102  {
103  mDeliverMessage = std::move(deliverMessage);
104  }
105 
106  void setAirLockFunction(std::function<gmlc::containers::AirLock<std::any>&(int)> getAirLock)
107  {
108  mGetAirLock = std::move(getAirLock);
109  }
110  void organizeFilterOperations();
111 
112  void handleMessage(ActionMessage& command);
113 
114  void processFilterInfo(ActionMessage& command);
115 
116  ActionMessage& processMessage(ActionMessage& command, const BasicHandleInfo* handle);
119  bool destinationProcessMessage(ActionMessage& command, const BasicHandleInfo* handle);
120 
121  void addFilteredEndpoint(Json::Value& block, GlobalFederateId fed) const;
122 
123  void setHandleManager(HandleManager* handles) { mHandles = handles; }
124 
125  std::string query(const std::string& queryStr) const;
127  bool hasActiveTimeDependencies() const;
128 
129  private:
130  void routeMessage(const ActionMessage& msg);
132  FilterCoordinator* getFilterCoordinator(InterfaceHandle handle);
133 
134  FilterInfo* getFilterInfo(GlobalHandle id);
135  FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle);
136  const FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle) const;
138  void runCloningDestinationFilters(const FilterCoordinator* filt,
139  const BasicHandleInfo* handle,
140  const ActionMessage& command) const;
141  void addTimeReturn(int32_t id, Time TimeVal);
142  void clearTimeReturn(int32_t id);
143 
144  std::pair<ActionMessage&, bool> executeFilter(ActionMessage& command, FilterInfo* filt);
145  void generateProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
146  void acceptProcessReturn(GlobalFederateId fid, uint32_t pid);
147 
148  void generateDestProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
149  void acceptDestProcessReturn(GlobalFederateId fid, uint32_t pid);
150 };
151 } // namespace helics
helics::ActionMessage::source_handle
InterfaceHandle source_handle
16 – for local handle or local code
Definition: ActionMessage.hpp:38
helics::GlobalFederateId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:80
helics::TimeCoordinator::getGrantedTime
Time getGrantedTime() const
Definition: TimeCoordinator.hpp:136
helics::FilterInfo
Definition: FilterInfo.hpp:29
helics::HandleManager::getFilter
const BasicHandleInfo * getFilter(std::string_view name) const
Definition: HandleManager.cpp:306
helics::TimeCoordinator::timeRequest
void timeRequest(Time nextTime, IterationRequest iterate, Time newValueTime, Time newMessageTime)
Definition: TimeCoordinator.cpp:143
helics::FilterInfo::core_id
const GlobalBrokerId core_id
id of the core that manages the filter
Definition: FilterInfo.hpp:43
helics::FilterFederate::~FilterFederate
~FilterFederate()
Definition: FilterFederate.cpp:37
helics_definitions.hpp
base helics enumerations for C++ API's, a namespace wrapper for the definitions defined in helics_enu...
HELICS_LOG_LEVEL_WARNING
@ HELICS_LOG_LEVEL_WARNING
Definition: helics_enums.h:185
helics::ActionMessage::setAction
void setAction(action_message_def::action_t newAction)
Definition: ActionMessage.cpp:142
helics::TimeCoordinator::printTimeStatus
std::string printTimeStatus() const
Definition: TimeCoordinator.cpp:692
helics::InterfaceHandle
Definition: LocalFederateId.hpp:65
helics::FilterFederate::processFilterReturn
void processFilterReturn(ActionMessage &cmd)
Definition: FilterFederate.cpp:243
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
helics::GlobalBrokerId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:34
helics::GlobalFederateId
Definition: GlobalFederateId.hpp:68
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::parent_broker_id
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:60
helics::FilterFederate::destinationProcessMessage
bool destinationProcessMessage(ActionMessage &command, const BasicHandleInfo *handle)
Definition: FilterFederate.cpp:403
helics::FilterCoordinator
Definition: FilterCoordinator.hpp:17
coreTypeOperations.hpp
disconnected_flag
@ disconnected_flag
flag indicating that a broker/federate is disconnected
Definition: flagOperations.hpp:30
helics::versionString
constexpr auto versionString
Definition: helicsVersion.hpp:16
helics::FilterInfo::key
const std::string key
the identifier of the filter
Definition: FilterInfo.hpp:46
checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:85
helics::FilterFederate::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: FilterFederate.cpp:1090
helics::GlobalHandle::handle
InterfaceHandle handle
the interface handle component
Definition: GlobalFederateId.hpp:131
helics::BasicHandleInfo::handle
const GlobalHandle handle
the global federate id for the creator of the handle
Definition: BasicHandleInfo.hpp:51
helics::ActionMessage::dest_handle
InterfaceHandle dest_handle
24 local handle for a targeted message
Definition: ActionMessage.hpp:40
helics::HandleManager
Definition: HandleManager.hpp:22
helics::has_source_filter_flag
@ has_source_filter_flag
indicator that an endpoint or message has a source filter
Definition: BasicHandleInfo.hpp:21
helics::BasicHandleInfo::getFederateId
GlobalFederateId getFederateId() const
Definition: BasicHandleInfo.hpp:66
helics::Core
Definition: core/Core.hpp:42
helics::commandErrorString
const char * commandErrorString(int errorCode)
Definition: ActionMessage.cpp:823
helics::TimeCoordinator::disconnect
void disconnect()
Definition: TimeCoordinator.cpp:63
helics::FilterFederate
Definition: FilterFederate.hpp:33
helics::ActionMessage::sequenceID
uint32_t sequenceID
32 a sequence number for ordering
Definition: ActionMessage.hpp:43
helics::FilterInfo::cloning
bool cloning
indicator that the filter is a destination filter
Definition: FilterInfo.hpp:52
clone_flag
@ clone_flag
flag indicating the filter is a clone filter or the data needs to be cloned
Definition: flagOperations.hpp:25
helics::TimeCoordinator::updateMessageTime
void updateMessageTime(Time messageUpdateTime, bool allowRequestSend)
Definition: TimeCoordinator.cpp:453
helics::MessageProcessingResult::DELAY_MESSAGE
@ DELAY_MESSAGE
delay the current message and continue processing
helics::TimeCoordinator::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: TimeCoordinator.cpp:351
helics::FilterInfo::handle
const InterfaceHandle handle
id handle of the filter
Definition: FilterInfo.hpp:44
helics::BasicHandleInfo
Definition: BasicHandleInfo.hpp:30
helics::GlobalBrokerId
Definition: GlobalFederateId.hpp:26
helics::FilterInfo::filterOp
std::shared_ptr< FilterOperator > filterOp
the callback operation of the filter
Definition: FilterInfo.hpp:55
helics::TimeCoordinator::addDependency
bool addDependency(GlobalFederateId fedID)
Definition: TimeCoordinator.cpp:711
setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:77
helics::FilterFederate::processMessageFilter
void processMessageFilter(ActionMessage &cmd)
Definition: FilterFederate.cpp:71
helics::HandleManager::getHandleInfo
BasicHandleInfo * getHandleInfo(int32_t index)
Definition: HandleManager.cpp:94
helics::ActionMessage::dest_id
GlobalFederateId dest_id
20 fed_id for a targeted message
Definition: ActionMessage.hpp:39
helics::HELICS_EXECUTING
@ HELICS_EXECUTING
the federation has entered execution state and it now advancing in time
Definition: CoreTypes.hpp:25
helics::HandleManager::getEndpoint
BasicHandleInfo * getEndpoint(std::string_view name)
Definition: HandleManager.cpp:204
helics::TimeCoordinator
Definition: TimeCoordinator.hpp:55
helics::InterfaceHandle::baseValue
constexpr BaseType baseValue() const
Definition: LocalFederateId.hpp:73
helics::BasicHandleInfo::getInterfaceHandle
InterfaceHandle getInterfaceHandle() const
Definition: BasicHandleInfo.hpp:64
error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
helics::TimeCoordinator::enteringExecMode
void enteringExecMode(IterationRequest mode)
Definition: TimeCoordinator.cpp:45
helics::ActionMessage::actionTime
Time actionTime
40 the time an action took place or will take place //32
Definition: ActionMessage.hpp:44
helics::has_non_cloning_dest_filter_flag
@ has_non_cloning_dest_filter_flag
Definition: BasicHandleInfo.hpp:25
helics::FilterCoordinator::cloningDestFilters
std::vector< FilterInfo * > cloningDestFilters
storage for cloning destination filters
Definition: FilterCoordinator.hpp:24
destination_target
@ destination_target
indicator that the target is a destination target
Definition: flagOperations.hpp:17
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::TimeCoordinator::getDependencies
std::vector< GlobalFederateId > getDependencies() const
Definition: TimeCoordinator.cpp:785
helics::FederateStates
FederateStates
Definition: CoreTypes.hpp:21
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:181
helics::createMessageFromCommand
std::unique_ptr< Message > createMessageFromCommand(const ActionMessage &cmd)
Definition: ActionMessage.cpp:632
helics::TimeCoordinator::generateDebuggingTimeInfo
void generateDebuggingTimeInfo(Json::Value &base) const
Definition: TimeCoordinator.cpp:305
helics::BasicHandleInfo::type
const std::string type
the type of data used by the handle
Definition: BasicHandleInfo.hpp:59
helics::TimeCoordinator::getDependents
std::vector< GlobalFederateId > getDependents() const
Definition: TimeCoordinator.hpp:142
helics::FilterFederate::processDestFilterReturn
void processDestFilterReturn(ActionMessage &command)
Definition: FilterFederate.cpp:296
helics::GlobalHandle
Definition: GlobalFederateId.hpp:128
helics::BasicHandleInfo::used
bool used
indicator that the handle is being used to link with another federate
Definition: BasicHandleInfo.hpp:54
helics::MessageProcessingResult::CONTINUE_PROCESSING
@ CONTINUE_PROCESSING
the current loop should continue
helics::ActionMessage::action
action_message_def::action_t action() const noexcept
Definition: ActionMessage.hpp:88
helics::FilterFederate::createFilter
FilterInfo * createFilter(GlobalBrokerId dest, InterfaceHandle handle, const std::string &key, const std::string &type_in, const std::string &type_out, bool cloning)
Definition: FilterFederate.cpp:677
flagOperations.hpp
helics::HELICS_ERROR
@ HELICS_ERROR
the federation has encountered an error
Definition: CoreTypes.hpp:27
helics::HELICS_INITIALIZING
@ HELICS_INITIALIZING
Definition: CoreTypes.hpp:23
helics::BasicHandleInfo::key
const std::string key
the name of the handle
Definition: BasicHandleInfo.hpp:58
helics::IterationRequest::NO_ITERATIONS
@ NO_ITERATIONS
indicator that the iterations have completed
helics::HELICS_FINISHED
@ HELICS_FINISHED
the federation has finished its execution
Definition: CoreTypes.hpp:28
helics::GlobalHandle::fed_id
GlobalFederateId fed_id
the federate id component
Definition: GlobalFederateId.hpp:130
helics::has_dest_filter_flag
@ has_dest_filter_flag
indicator that an endpoint or message has a destination filter
Definition: BasicHandleInfo.hpp:23
helics::TimeCoordinator::generateConfig
void generateConfig(Json::Value &base) const
Definition: TimeCoordinator.cpp:280
helics::fedStateString
const std::string & fedStateString(FederateStates state)
Definition: FederateState.cpp:1833
helics::TimeCoordinator::allowedSendTime
Time allowedSendTime() const
Definition: TimeCoordinator.hpp:138
helics::ActionMessage::source_id
GlobalFederateId source_id
12 – for federate_id or route_id
Definition: ActionMessage.hpp:37
helicsVersion.hpp