helics  3.3.0
FilterFederate.hpp
1 /*
2 Copyright (c) 2017-2022,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/JsonBuilder.hpp"
10 #include "Core.hpp"
11 #include "FederateIdExtra.hpp"
12 #include "FilterCoordinator.hpp"
13 #include "FilterInfo.hpp"
14 #include "TimeCoordinator.hpp"
15 #include "gmlc/containers/AirLock.hpp"
16 #include "gmlc/containers/MappedPointerVector.hpp"
17 
18 #include <any>
19 #include <deque>
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <set>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 namespace helics {
29 class HandleManager;
30 class ActionMessage;
31 class BasicHandleInfo;
32 
34  private:
35  GlobalFederateId mFedID;
36  GlobalBrokerId mCoreID;
37  const std::string mName;
38  // Core* mCore{nullptr};
39  TimeCoordinator mCoord;
40  HandleManager* mHandles{nullptr};
41  FederateStates current_state{FederateStates::CREATED};
43  std::map<InterfaceHandle, std::unique_ptr<FilterCoordinator>> filterCoord;
44  // The interface_handle used is here is usually referencing an endpoint
45 
46  std::function<void(const ActionMessage&)> mQueueMessage;
47  std::function<void(ActionMessage&&)> mQueueMessageMove;
48  std::function<void(const ActionMessage&)> mSendMessage;
49  std::function<void(ActionMessage&&)> mSendMessageMove;
50 
51  std::function<void(ActionMessage&)> mDeliverMessage;
52 
53  std::function<void(int, std::string_view, std::string_view)> mLogger;
54  std::function<gmlc::containers::AirLock<std::any>&(int)> mGetAirLock;
55  std::deque<std::pair<int32_t, Time>> timeBlockProcesses;
56  Time minReturnTime{Time::maxVal()};
58  std::map<int32_t, std::set<int32_t>> ongoingFilterProcesses;
60  std::map<int32_t, std::set<int32_t>> ongoingDestFilterProcesses;
63  std::atomic<int32_t> messageCounter{54};
65  bool usingGlobalTime{false};
67  gmlc::containers::MappedPointerVector<FilterInfo, GlobalHandle> filters;
68  // bool hasTiming{false};
69 
70  public:
71  FilterFederate(GlobalFederateId fedID, std::string name, GlobalBrokerId coreID, Core* core);
81  InterfaceHandle handle,
82  std::string_view key,
83  std::string_view type_in,
84  std::string_view type_out,
85  bool cloning);
86 
87  void setCallbacks(std::function<void(const ActionMessage&)> queueMessage,
88  std::function<void(ActionMessage&&)> queueMessageMove,
89  std::function<void(const ActionMessage&)> sendMessage,
90  std::function<void(ActionMessage&&)> sendMessageMove)
91  {
92  mQueueMessage = std::move(queueMessage);
93  mQueueMessageMove = std::move(queueMessageMove);
94  mSendMessage = std::move(sendMessage);
95  mSendMessageMove = std::move(sendMessageMove);
96  }
97 
98  void setLogger(std::function<void(int, std::string_view, std::string_view)> logger)
99  {
100  mLogger = std::move(logger);
101  }
102 
103  void setDeliver(std::function<void(ActionMessage&)> deliverMessage)
104  {
105  mDeliverMessage = std::move(deliverMessage);
106  }
107 
108  void setAirLockFunction(std::function<gmlc::containers::AirLock<std::any>&(int)> getAirLock)
109  {
110  mGetAirLock = std::move(getAirLock);
111  }
112  void organizeFilterOperations();
113 
114  void handleMessage(ActionMessage& command);
115 
116  void processFilterInfo(ActionMessage& command);
117 
118  ActionMessage& processMessage(ActionMessage& command, const BasicHandleInfo* handle);
121  bool destinationProcessMessage(ActionMessage& command, const BasicHandleInfo* handle);
122 
123  void addFilteredEndpoint(Json::Value& block, GlobalFederateId fed) const;
124 
125  void setHandleManager(HandleManager* handles) { mHandles = handles; }
126 
127  std::string query(std::string_view queryStr) const;
129  bool hasActiveTimeDependencies() const;
130 
131  void useGlobalTimeCoordinator(bool value)
132  {
133  usingGlobalTime = value;
134  mCoord.globalTime = value;
135  }
136 
137  private:
138  void routeMessage(const ActionMessage& msg);
140  FilterCoordinator* getFilterCoordinator(InterfaceHandle handle);
141 
142  FilterInfo* getFilterInfo(GlobalHandle id);
143  FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle);
144  const FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle) const;
146  void runCloningDestinationFilters(const FilterCoordinator* filt,
147  const BasicHandleInfo* handle,
148  const ActionMessage& command) const;
149  void addTimeReturn(int32_t id, Time TimeVal);
150  void clearTimeReturn(int32_t id);
151 
152  std::pair<ActionMessage&, bool> executeFilter(ActionMessage& command, FilterInfo* filt);
153  void generateProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
154  void acceptProcessReturn(GlobalFederateId fid, uint32_t pid);
155 
156  void generateDestProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
157  void acceptDestProcessReturn(GlobalFederateId fid, uint32_t pid);
158 };
159 } // namespace helics
helics::destination_target
@ destination_target
indicator that the target is a destination target
Definition: flagOperations.hpp:43
helics::TimeCoordinator::printTimeStatus
virtual std::string printTimeStatus() const override
Definition: TimeCoordinator.cpp:879
helics::GlobalFederateId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:84
helics::TimeCoordinator::getGrantedTime
Time getGrantedTime() const
Definition: TimeCoordinator.hpp:135
helics::has_non_cloning_dest_filter_flag
@ has_non_cloning_dest_filter_flag
indicator that the endpoint or filter has a destination filter that alters the message
Definition: flagOperations.hpp:66
helics::FilterInfo
Definition: FilterInfo.hpp:29
helics::HandleManager::getFilter
const BasicHandleInfo * getFilter(std::string_view name) const
Definition: HandleManager.cpp:305
helics::TimeCoordinator::timeRequest
void timeRequest(Time nextTime, IterationRequest iterate, Time newValueTime, Time newMessageTime)
Definition: TimeCoordinator.cpp:103
helics::FilterInfo::core_id
const GlobalBrokerId core_id
id of the core that manages the filter
Definition: FilterInfo.hpp:43
helics::FilterFederate::~FilterFederate
~FilterFederate()
Definition: FilterFederate.cpp:36
helics_definitions.hpp
base helics enumerations for C++ API's, a namespace wrapper for the definitions defined in helics_enu...
HELICS_LOG_LEVEL_WARNING
@ HELICS_LOG_LEVEL_WARNING
Definition: helics_enums.h:192
helics::BaseTimeCoordinator::disconnect
void disconnect()
Definition: BaseTimeCoordinator.cpp:79
helics::InterfaceHandle
Definition: LocalFederateId.hpp:65
helics::FilterFederate::processFilterReturn
void processFilterReturn(ActionMessage &cmd)
Definition: FilterFederate.cpp:243
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27
helics::cBigTime
constexpr Time cBigTime
Definition: helicsTime.hpp:37
helics::GlobalBrokerId::baseValue
constexpr BaseType baseValue() const
Definition: GlobalFederateId.hpp:35
helics::GlobalFederateId
Definition: GlobalFederateId.hpp:72
helics::ActionMessage
Definition: ActionMessage.hpp:30
helics::parent_broker_id
constexpr GlobalBrokerId parent_broker_id
Definition: GlobalFederateId.hpp:64
helics::FilterFederate::destinationProcessMessage
bool destinationProcessMessage(ActionMessage &command, const BasicHandleInfo *handle)
Definition: FilterFederate.cpp:404
helics::FilterCoordinator
Definition: FilterCoordinator.hpp:17
helics::has_dest_filter_flag
@ has_dest_filter_flag
indicator that an endpoint or message has a destination filter
Definition: flagOperations.hpp:64
coreTypeOperations.hpp
helics::versionString
constexpr auto versionString
Definition: helicsVersion.hpp:18
helics::TimeCoordinator::globalTime
bool globalTime
true if using a global time manager parent
Definition: TimeCoordinator.hpp:101
helics::FilterInfo::key
const std::string key
the identifier of the filter
Definition: FilterInfo.hpp:46
helics::FilterFederate::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: FilterFederate.cpp:1110
helics::GlobalHandle::handle
InterfaceHandle handle
the interface handle component
Definition: GlobalFederateId.hpp:147
helics::BasicHandleInfo::handle
const GlobalHandle handle
the global federate id for the creator of the handle
Definition: BasicHandleInfo.hpp:38
helics::HandleManager
Definition: HandleManager.hpp:25
helics::BasicHandleInfo::getFederateId
GlobalFederateId getFederateId() const
Definition: BasicHandleInfo.hpp:53
helics::Core
Definition: core/Core.hpp:41
helics::BaseTimeCoordinator::empty
bool empty() const
Definition: BaseTimeCoordinator.hpp:74
helics::commandErrorString
const char * commandErrorString(int errorCode)
Definition: ActionMessage.cpp:828
helics::FilterFederate
Definition: FilterFederate.hpp:33
helics::disconnected_flag
@ disconnected_flag
flag indicating that a broker/federate is disconnected
Definition: flagOperations.hpp:85
helics::has_source_filter_flag
@ has_source_filter_flag
indicator that an endpoint or message has a source filter
Definition: flagOperations.hpp:60
helics::BaseTimeCoordinator::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: BaseTimeCoordinator.cpp:253
helics::FilterInfo::cloning
bool cloning
indicator that the filter is a destination filter
Definition: FilterInfo.hpp:52
helics::TimeCoordinator::updateMessageTime
void updateMessageTime(Time messageUpdateTime, bool allowRequestSend)
Definition: TimeCoordinator.cpp:396
helics::setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:130
helics::FilterInfo::handle
const InterfaceHandle handle
id handle of the filter
Definition: FilterInfo.hpp:44
helics::BasicHandleInfo
Definition: BasicHandleInfo.hpp:20
helics::GlobalBrokerId
Definition: GlobalFederateId.hpp:27
helics::FilterInfo::filterOp
std::shared_ptr< FilterOperator > filterOp
the callback operation of the filter
Definition: FilterInfo.hpp:55
helics::FederateStates::INITIALIZING
@ INITIALIZING
helics::FilterFederate::processMessageFilter
void processMessageFilter(ActionMessage &cmd)
Definition: FilterFederate.cpp:70
helics::HandleManager::getHandleInfo
BasicHandleInfo * getHandleInfo(int32_t index)
Definition: HandleManager.cpp:93
helics::HandleManager::getEndpoint
BasicHandleInfo * getEndpoint(std::string_view name)
Definition: HandleManager.cpp:203
helics::TimeCoordinator
Definition: TimeCoordinator.hpp:55
helics::InterfaceHandle::baseValue
constexpr BaseType baseValue() const
Definition: LocalFederateId.hpp:73
helics::BasicHandleInfo::getInterfaceHandle
InterfaceHandle getInterfaceHandle() const
Definition: BasicHandleInfo.hpp:51
helics::MessageProcessingResult::CONTINUE_PROCESSING
@ CONTINUE_PROCESSING
the current loop should continue
helics::FilterCoordinator::cloningDestFilters
std::vector< FilterInfo * > cloningDestFilters
storage for cloning destination filters
Definition: FilterCoordinator.hpp:24
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::TimeCoordinator::getDependencies
std::vector< GlobalFederateId > getDependencies() const
Definition: TimeCoordinator.cpp:933
HELICS_LOG_LEVEL_ERROR
@ HELICS_LOG_LEVEL_ERROR
Definition: helics_enums.h:188
helics::checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:138
helics::FederateStates::EXECUTING
@ EXECUTING
the federation has entered execution state and it now advancing in time
helics::createMessageFromCommand
std::unique_ptr< Message > createMessageFromCommand(const ActionMessage &cmd)
Definition: ActionMessage.cpp:633
helics::FederateStates::FINISHED
@ FINISHED
the federation has finished its execution
helics::BasicHandleInfo::type
const std::string type
the type of data used by the handle
Definition: BasicHandleInfo.hpp:46
helics::clone_flag
@ clone_flag
flag indicating the filter is a clone filter or the data needs to be cloned
Definition: flagOperations.hpp:55
helics::FilterFederate::createFilter
FilterInfo * createFilter(GlobalBrokerId dest, InterfaceHandle handle, std::string_view key, std::string_view type_in, std::string_view type_out, bool cloning)
Definition: FilterFederate.cpp:689
helics::TimeCoordinator::getDependents
std::vector< GlobalFederateId > getDependents() const
Definition: TimeCoordinator.hpp:143
helics::FilterFederate::processDestFilterReturn
void processDestFilterReturn(ActionMessage &command)
Definition: FilterFederate.cpp:296
helics::GlobalHandle
Definition: GlobalFederateId.hpp:144
helics::BasicHandleInfo::used
bool used
Definition: BasicHandleInfo.hpp:41
helics::error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:17
helics::TimeCoordinator::generateDebuggingTimeInfo
virtual void generateDebuggingTimeInfo(Json::Value &base) const override
Definition: TimeCoordinator.cpp:269
helics::MessageProcessingResult::DELAY_MESSAGE
@ DELAY_MESSAGE
delay the current message and continue processing
flagOperations.hpp
helics::BasicHandleInfo::key
const std::string key
the name of the handle
Definition: BasicHandleInfo.hpp:45
helics::IterationRequest::NO_ITERATIONS
@ NO_ITERATIONS
indicator that the iterations have completed
helics::TimeCoordinator::enteringExecMode
virtual void enteringExecMode(IterationRequest mode) override
Definition: TimeCoordinator.cpp:24
helics::GlobalHandle::fed_id
GlobalFederateId fed_id
the federate id component
Definition: GlobalFederateId.hpp:146
helics::FederateStates::ERRORED
@ ERRORED
the federation has encountered an error
helics::TimeCoordinator::generateConfig
void generateConfig(Json::Value &base) const
Definition: TimeCoordinator.cpp:244
helics::FederateStates
FederateStates
Definition: CoreTypes.hpp:21
helics::fedStateString
const std::string & fedStateString(FederateStates state)
Definition: FederateState.cpp:2035
helics::TimeCoordinator::allowedSendTime
Time allowedSendTime() const
Definition: TimeCoordinator.hpp:139
helics::TimeCoordinator::triggered
bool triggered
indicator the federate was triggered recently
Definition: TimeCoordinator.hpp:99
helics::TimeCoordinator::addDependency
virtual bool addDependency(GlobalFederateId fedID) override
Definition: TimeCoordinator.cpp:893
helicsVersion.hpp