helics  2.8.1
FilterFederate.hpp
1 /*
2 Copyright (c) 2017-2021,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/JsonBuilder.hpp"
10 #include "Core.hpp"
11 #include "FilterCoordinator.hpp"
12 #include "FilterInfo.hpp"
13 #include "TimeCoordinator.hpp"
14 #include "global_federate_id.hpp"
15 #include "gmlc/containers/AirLock.hpp"
16 #include "gmlc/containers/MappedPointerVector.hpp"
17 #include "helics/external/any.hpp"
18 
19 #include <deque>
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <set>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 namespace helics {
29 class HandleManager;
30 class ActionMessage;
31 class BasicHandleInfo;
32 
34  private:
35  global_federate_id mFedID;
36  global_broker_id mCoreID;
37  const std::string mName;
38  // Core* mCore{nullptr};
39  TimeCoordinator mCoord;
40  HandleManager* mHandles{nullptr};
41  federate_state current_state{HELICS_CREATED};
43  std::map<interface_handle, std::unique_ptr<FilterCoordinator>> filterCoord;
44  // The interface_handle used is here is usually referencing an endpoint
45 
46  std::function<void(const ActionMessage&)> mQueueMessage;
47  std::function<void(ActionMessage&&)> mQueueMessageMove;
48  std::function<void(const ActionMessage&)> mSendMessage;
49  std::function<void(ActionMessage&&)> mSendMessageMove;
50 
51  std::function<void(ActionMessage&)> mDeliverMessage;
52 
53  std::function<void(int, const std::string&, const std::string&)> mLogger;
54  std::function<gmlc::containers::AirLock<stx::any>&(int)> mGetAirLock;
55  std::deque<std::pair<int32_t, Time>> timeBlockProcesses;
56  Time minReturnTime{Time::maxVal()};
58  std::map<int32_t, std::set<int32_t>> ongoingFilterProcesses;
60  std::map<int32_t, std::set<int32_t>> ongoingDestFilterProcesses;
63  std::atomic<int32_t> messageCounter{54};
65  gmlc::containers::MappedPointerVector<FilterInfo, global_handle> filters;
66  // bool hasTiming{false};
67 
68  public:
69  FilterFederate(global_federate_id fedID, std::string name, global_broker_id coreID, Core* core);
79  interface_handle handle,
80  const std::string& key,
81  const std::string& type_in,
82  const std::string& type_out,
83  bool cloning);
84 
85  void setCallbacks(std::function<void(const ActionMessage&)> queueMessage,
86  std::function<void(ActionMessage&&)> queueMessageMove,
87  std::function<void(const ActionMessage&)> sendMessage,
88  std::function<void(ActionMessage&&)> sendMessageMove)
89  {
90  mQueueMessage = std::move(queueMessage);
91  mQueueMessageMove = std::move(queueMessageMove);
92  mSendMessage = std::move(sendMessage);
93  mSendMessageMove = std::move(sendMessageMove);
94  }
95 
96  void setLogger(std::function<void(int, const std::string&, const std::string&)> logger)
97  {
98  mLogger = std::move(logger);
99  }
100 
101  void setDeliver(std::function<void(ActionMessage&)> deliverMessage)
102  {
103  mDeliverMessage = std::move(deliverMessage);
104  }
105 
106  void setAirLockFunction(std::function<gmlc::containers::AirLock<stx::any>&(int)> getAirLock)
107  {
108  mGetAirLock = std::move(getAirLock);
109  }
110  void organizeFilterOperations();
111 
112  void handleMessage(ActionMessage& command);
113 
114  void processFilterInfo(ActionMessage& command);
115 
116  ActionMessage& processMessage(ActionMessage& command, const BasicHandleInfo* handle);
119  bool destinationProcessMessage(ActionMessage& command, const BasicHandleInfo* handle);
120 
121  void addFilteredEndpoint(Json::Value& block, global_federate_id fed) const;
122 
123  void setHandleManager(HandleManager* handles) { mHandles = handles; }
124 
125  std::string query(const std::string& queryStr) const;
127  bool hasActiveTimeDependencies() const;
128 
129  private:
130  void routeMessage(const ActionMessage& msg);
132  FilterCoordinator* getFilterCoordinator(interface_handle handle);
133 
134  FilterInfo* getFilterInfo(global_handle id);
135 
136  FilterInfo* getFilterInfo(global_federate_id fed, interface_handle handle);
137  const FilterInfo* getFilterInfo(global_federate_id fed, interface_handle handle) const;
139  void runCloningDestinationFilters(const FilterCoordinator* fcoord,
140  const BasicHandleInfo* handle,
141  const ActionMessage& command) const;
142 
143  std::pair<ActionMessage&, bool> executeFilter(ActionMessage& command, FilterInfo* filt);
144  void generateProcessMarker(global_federate_id fid, uint32_t pid, Time returnTime);
145  void acceptProcessReturn(global_federate_id fid, uint32_t pid);
146 
147  void generateDestProcessMarker(global_federate_id fid, uint32_t pid, Time returnTime);
148  void acceptDestProcessReturn(global_federate_id fid, uint32_t pid);
149 
150  void addTimeReturn(int32_t id, Time TimeVal);
151  void clearTimeReturn(int32_t id);
152 };
153 } // namespace helics
generateJsonString
std::string generateJsonString(const Json::Value &block)
Definition: JsonProcessingFunctions.cpp:97
helics::ActionMessage::source_id
global_federate_id source_id
12 – for federate_id or route_id
Definition: ActionMessage.hpp:36
helics::HELICS_INITIALIZING
@ HELICS_INITIALIZING
Definition: core-types.hpp:24
helics::TimeCoordinator::getGrantedTime
Time getGrantedTime() const
Definition: TimeCoordinator.hpp:124
helics::ActionMessage::dest_handle
interface_handle dest_handle
24 local handle for a targeted message
Definition: ActionMessage.hpp:39
helics::FilterInfo
Definition: FilterInfo.hpp:18
helics::HandleManager::getFilter
const BasicHandleInfo * getFilter(const std::string &name) const
Definition: HandleManager.cpp:306
helics::BasicHandleInfo::getInterfaceHandle
interface_handle getInterfaceHandle() const
Definition: BasicHandleInfo.hpp:71
helics::FilterFederate::~FilterFederate
~FilterFederate()
Definition: FilterFederate.cpp:34
helics::interface_handle::baseValue
constexpr base_type baseValue() const
Definition: federate_id.hpp:73
helics::ActionMessage::source_handle
interface_handle source_handle
16 – for local handle or local code
Definition: ActionMessage.hpp:37
helics::global_broker_id
Definition: global_federate_id.hpp:26
helics_definitions.hpp
base helics enumerations for C++ API's, a namespace wrapper for the definitions defined in helics_enu...
helics::global_federate_id
Definition: global_federate_id.hpp:68
helics::ActionMessage::setAction
void setAction(action_message_def::action_t newAction)
Definition: ActionMessage.cpp:140
helics::TimeCoordinator::printTimeStatus
std::string printTimeStatus() const
Definition: TimeCoordinator.cpp:649
helics::parent_broker_id
constexpr global_broker_id parent_broker_id
Definition: global_federate_id.hpp:60
helics::FilterFederate::processFilterReturn
void processFilterReturn(ActionMessage &cmd)
Definition: FilterFederate.cpp:242
helics::Time
TimeRepresentation< count_time< 9 > > Time
Definition: helics-time.hpp:27
helics::federate_state
federate_state
Definition: core-types.hpp:22
helics::TimeCoordinator::enteringExecMode
void enteringExecMode(iteration_request mode)
Definition: TimeCoordinator.cpp:42
helics::ActionMessage
Definition: ActionMessage.hpp:29
helics::FilterFederate::destinationProcessMessage
bool destinationProcessMessage(ActionMessage &command, const BasicHandleInfo *handle)
Definition: FilterFederate.cpp:402
helics::FilterCoordinator
Definition: FilterCoordinator.hpp:17
coreTypeOperations.hpp
helics::global_broker_id::baseValue
constexpr base_type baseValue() const
Definition: global_federate_id.hpp:34
helics_log_level_warning
@ helics_log_level_warning
Definition: helics_enums.h:169
disconnected_flag
@ disconnected_flag
flag indicating that a broker/federate is disconnected
Definition: flagOperations.hpp:30
helics::versionString
constexpr auto versionString
Definition: helicsVersion.hpp:16
helics::FilterInfo::key
const std::string key
the identifier of the filter
Definition: FilterInfo.hpp:35
checkActionFlag
bool checkActionFlag(uint16_t flags, FlagIndex flag)
Definition: flagOperations.hpp:75
helics::FilterFederate::createFilter
FilterInfo * createFilter(global_broker_id dest, interface_handle handle, const std::string &key, const std::string &type_in, const std::string &type_out, bool cloning)
Definition: FilterFederate.cpp:676
helics::FilterFederate::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: FilterFederate.cpp:1090
helics::global_handle::fed_id
global_federate_id fed_id
the federate id component
Definition: global_federate_id.hpp:130
helics::HandleManager
Definition: HandleManager.hpp:20
helics::has_source_filter_flag
@ has_source_filter_flag
indicator that an endpoint or message has a source filter
Definition: BasicHandleInfo.hpp:27
helics::global_handle::handle
interface_handle handle
the interface handle component
Definition: global_federate_id.hpp:131
helics::Core
Definition: core/Core.hpp:42
helics::BasicHandleInfo::getFederateId
global_federate_id getFederateId() const
Definition: BasicHandleInfo.hpp:73
helics_log_level_error
@ helics_log_level_error
Definition: helics_enums.h:167
helics::message_processing_result::continue_processing
@ continue_processing
the current loop should continue
helics::commandErrorString
const char * commandErrorString(int errorCode)
Definition: ActionMessage.cpp:837
helics::TimeCoordinator::disconnect
void disconnect()
Definition: TimeCoordinator.cpp:60
helics::FilterFederate
Definition: FilterFederate.hpp:33
helics::ActionMessage::sequenceID
uint32_t sequenceID
a sequence number for ordering
Definition: ActionMessage.hpp:42
helics::iteration_request::no_iterations
@ no_iterations
indicator that the iterations have completed
helics::FilterInfo::cloning
bool cloning
indicator that the filter is a destination filter
Definition: FilterInfo.hpp:39
clone_flag
@ clone_flag
flag indicating the filter is a clone filter or the data needs to be cloned
Definition: flagOperations.hpp:25
helics::global_handle
Definition: global_federate_id.hpp:128
helics::TimeCoordinator::addDependency
bool addDependency(global_federate_id fedID)
Definition: TimeCoordinator.cpp:668
helics::global_federate_id::baseValue
constexpr base_type baseValue() const
Definition: global_federate_id.hpp:83
helics::TimeCoordinator::updateMessageTime
void updateMessageTime(Time messageUpdateTime, bool allowRequestSend)
Definition: TimeCoordinator.cpp:411
helics::TimeCoordinator::hasActiveTimeDependencies
bool hasActiveTimeDependencies() const
Definition: TimeCoordinator.cpp:348
helics::BasicHandleInfo
Definition: BasicHandleInfo.hpp:36
fed
@ fed
special logging command for message coming from a fed
Definition: loggingHelper.hpp:32
helics::FilterInfo::filterOp
std::shared_ptr< FilterOperator > filterOp
the callback operation of the filter
Definition: FilterInfo.hpp:42
helics::HELICS_ERROR
@ HELICS_ERROR
the federation has encountered an error
Definition: core-types.hpp:28
helics::fedStateString
const std::string & fedStateString(federate_state state)
Definition: FederateState.cpp:1645
setActionFlag
void setActionFlag(FlagContainer &M, FlagIndex flag)
Definition: flagOperations.hpp:67
helics::FilterFederate::processMessageFilter
void processMessageFilter(ActionMessage &cmd)
Definition: FilterFederate.cpp:68
helics::HandleManager::getHandleInfo
BasicHandleInfo * getHandleInfo(int32_t index)
Definition: HandleManager.cpp:94
helics::HandleManager::getEndpoint
BasicHandleInfo * getEndpoint(const std::string &name)
Definition: HandleManager.cpp:204
helics::interface_handle
Definition: federate_id.hpp:65
helics::TimeCoordinator
Definition: TimeCoordinator.hpp:55
helics::HELICS_EXECUTING
@ HELICS_EXECUTING
the federation has entered execution state and it now advancing in time
Definition: core-types.hpp:26
error_flag
@ error_flag
flag indicating an error condition associated with the command
Definition: flagOperations.hpp:20
helics::ActionMessage::actionTime
Time actionTime
40 the time an action took place or will take place //32
Definition: ActionMessage.hpp:43
helics::has_non_cloning_dest_filter_flag
@ has_non_cloning_dest_filter_flag
Definition: BasicHandleInfo.hpp:31
helics::FilterCoordinator::cloningDestFilters
std::vector< FilterInfo * > cloningDestFilters
storage for cloning destination filters
Definition: FilterCoordinator.hpp:24
destination_target
@ destination_target
indicator that the target is a destination target
Definition: flagOperations.hpp:17
helics
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
helics::BasicHandleInfo::handle
const global_handle handle
the global federate id for the creator of the handle
Definition: BasicHandleInfo.hpp:57
helics::createMessageFromCommand
std::unique_ptr< Message > createMessageFromCommand(const ActionMessage &cmd)
Definition: ActionMessage.cpp:628
helics::TimeCoordinator::generateDebuggingTimeInfo
void generateDebuggingTimeInfo(Json::Value &base) const
Definition: TimeCoordinator.cpp:302
helics::BasicHandleInfo::type
const std::string type
the type of data used by the handle
Definition: BasicHandleInfo.hpp:65
helics::FilterInfo::core_id
const global_broker_id core_id
id of the core that manages the filter
Definition: FilterInfo.hpp:32
helics::TimeCoordinator::getDependencies
std::vector< global_federate_id > getDependencies() const
Definition: TimeCoordinator.cpp:742
helics::FilterFederate::processDestFilterReturn
void processDestFilterReturn(ActionMessage &command)
Definition: FilterFederate.cpp:295
helics::ActionMessage::dest_id
global_federate_id dest_id
20 fed_id for a targeted message
Definition: ActionMessage.hpp:38
helics::TimeCoordinator::getDependents
std::vector< global_federate_id > getDependents() const
Definition: TimeCoordinator.hpp:130
helics::BasicHandleInfo::used
bool used
indicator that the handle is being used to link with another federate
Definition: BasicHandleInfo.hpp:60
helics::HELICS_FINISHED
@ HELICS_FINISHED
the federation has finished its execution
Definition: core-types.hpp:29
helics::ActionMessage::action
action_message_def::action_t action() const noexcept
Definition: ActionMessage.hpp:89
flagOperations.hpp
helics::BasicHandleInfo::key
const std::string key
the name of the handle
Definition: BasicHandleInfo.hpp:64
helics::FilterInfo::handle
const interface_handle handle
id handle of the filter
Definition: FilterInfo.hpp:33
helics::has_dest_filter_flag
@ has_dest_filter_flag
indicator that an endpoint or message has a destination filter
Definition: BasicHandleInfo.hpp:29
helics::TimeCoordinator::generateConfig
void generateConfig(Json::Value &base) const
Definition: TimeCoordinator.cpp:277
helics::TimeCoordinator::timeRequest
void timeRequest(Time nextTime, iteration_request iterate, Time newValueTime, Time newMessageTime)
Definition: TimeCoordinator.cpp:140
helics::message_processing_result::delay_message
@ delay_message
delay the current message and continue processing
helics::TimeCoordinator::allowedSendTime
Time allowedSendTime() const
Definition: TimeCoordinator.hpp:126
helicsVersion.hpp