helics  3.5.2
FilterFederate.hpp
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "../common/JsonBuilder.hpp"
10 #include "Core.hpp"
11 #include "FederateIdExtra.hpp"
12 #include "FilterCoordinator.hpp"
13 #include "FilterInfo.hpp"
14 #include "TimeCoordinator.hpp"
15 #include "gmlc/containers/AirLock.hpp"
16 #include "gmlc/containers/MappedPointerVector.hpp"
17 
18 #include <any>
19 #include <deque>
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <set>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 namespace helics {
29 class HandleManager;
30 class ActionMessage;
31 class BasicHandleInfo;
32 
34  private:
35  GlobalFederateId mFedID;
36  GlobalBrokerId mCoreID;
37  const std::string mName;
38  // Core* mCore{nullptr};
39  TimeCoordinator mCoord;
40  HandleManager* mHandles{nullptr};
41  FederateStates current_state{FederateStates::CREATED};
43  std::map<InterfaceHandle, std::unique_ptr<FilterCoordinator>> filterCoord;
44  // The interface_handle used is here is usually referencing an endpoint
45 
46  std::function<void(const ActionMessage&)> mQueueMessage;
47  std::function<void(ActionMessage&&)> mQueueMessageMove;
48  std::function<void(const ActionMessage&)> mSendMessage;
49  std::function<void(ActionMessage&&)> mSendMessageMove;
50 
51  std::function<void(ActionMessage&)> mDeliverMessage;
52 
53  std::function<void(int, std::string_view, std::string_view)> mLogger;
54  std::function<gmlc::containers::AirLock<std::any>&(int)> mGetAirLock;
55  std::deque<std::pair<int32_t, Time>> timeBlockProcesses;
56  Time minReturnTime{Time::maxVal()};
58  std::map<int32_t, std::set<int32_t>> ongoingFilterProcesses;
60  std::map<int32_t, std::set<int32_t>> ongoingDestFilterProcesses;
63  std::atomic<int32_t> messageCounter{54};
65  bool usingGlobalTime{false};
67  gmlc::containers::MappedPointerVector<FilterInfo, GlobalHandle> filters;
68  // bool hasTiming{false};
69 
70  public:
71  FilterFederate(GlobalFederateId fedID, std::string name, GlobalBrokerId coreID, Core* core);
81  InterfaceHandle handle,
82  std::string_view key,
83  std::string_view type_in,
84  std::string_view type_out,
85  bool cloning);
86 
87  void setCallbacks(std::function<void(const ActionMessage&)> queueMessage,
88  std::function<void(ActionMessage&&)> queueMessageMove,
89  std::function<void(const ActionMessage&)> sendMessage,
90  std::function<void(ActionMessage&&)> sendMessageMove)
91  {
92  mQueueMessage = std::move(queueMessage);
93  mQueueMessageMove = std::move(queueMessageMove);
94  mSendMessage = std::move(sendMessage);
95  mSendMessageMove = std::move(sendMessageMove);
96  }
97 
98  void setLogger(std::function<void(int, std::string_view, std::string_view)> logger)
99  {
100  mLogger = std::move(logger);
101  }
102 
103  void setDeliver(std::function<void(ActionMessage&)> deliverMessage)
104  {
105  mDeliverMessage = std::move(deliverMessage);
106  }
107 
108  void setAirLockFunction(std::function<gmlc::containers::AirLock<std::any>&(int)> getAirLock)
109  {
110  mGetAirLock = std::move(getAirLock);
111  }
112  void organizeFilterOperations();
113 
114  void handleMessage(ActionMessage& command);
115 
116  void processFilterInfo(ActionMessage& command);
117 
118  ActionMessage& processMessage(ActionMessage& command, const BasicHandleInfo* handle);
121  bool destinationProcessMessage(ActionMessage& command, const BasicHandleInfo* handle);
122 
123  void addFilteredEndpoint(Json::Value& block, GlobalFederateId fed) const;
124 
125  void setHandleManager(HandleManager* handles) { mHandles = handles; }
126 
127  std::string query(std::string_view queryStr) const;
129  bool hasActiveTimeDependencies() const;
130 
131  void useGlobalTimeCoordinator(bool value)
132  {
133  usingGlobalTime = value;
134  mCoord.globalTime = value;
135  }
136 
137  private:
138  void routeMessage(const ActionMessage& msg);
140  FilterCoordinator* getFilterCoordinator(InterfaceHandle handle);
141 
142  FilterInfo* getFilterInfo(GlobalHandle id);
143  FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle);
144  const FilterInfo* getFilterInfo(GlobalFederateId fed, InterfaceHandle handle) const;
146  void runCloningDestinationFilters(const FilterCoordinator* filt,
147  const BasicHandleInfo* handle,
148  const ActionMessage& command) const;
149  void addTimeReturn(int32_t id, Time TimeVal);
150  void clearTimeReturn(int32_t id);
151 
152  std::pair<ActionMessage&, bool> executeFilter(ActionMessage& command, FilterInfo* filt);
153  void generateProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
154  void acceptProcessReturn(GlobalFederateId fid, uint32_t pid);
155 
156  void generateDestProcessMarker(GlobalFederateId fid, uint32_t pid, Time returnTime);
157  void acceptDestProcessReturn(GlobalFederateId fid, uint32_t pid);
158 };
159 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: BasicHandleInfo.hpp:20
Definition: core/Core.hpp:41
Definition: FilterCoordinator.hpp:17
Definition: FilterFederate.hpp:33
bool destinationProcessMessage(ActionMessage &command, const BasicHandleInfo *handle)
Definition: FilterFederate.cpp:403
void processDestFilterReturn(ActionMessage &command)
Definition: FilterFederate.cpp:295
FilterInfo * createFilter(GlobalBrokerId dest, InterfaceHandle handle, std::string_view key, std::string_view type_in, std::string_view type_out, bool cloning)
Definition: FilterFederate.cpp:690
void processMessageFilter(ActionMessage &cmd)
Definition: FilterFederate.cpp:69
bool hasActiveTimeDependencies() const
Definition: FilterFederate.cpp:1113
void processFilterReturn(ActionMessage &cmd)
Definition: FilterFederate.cpp:242
~FilterFederate()
Definition: FilterFederate.cpp:35
Definition: FilterInfo.hpp:29
Definition: GlobalFederateId.hpp:30
Definition: GlobalFederateId.hpp:75
Definition: GlobalFederateId.hpp:147
Definition: HandleManager.hpp:25
Definition: LocalFederateId.hpp:65
Definition: TimeCoordinator.hpp:48
bool globalTime
true if using a global time manager parent
Definition: TimeCoordinator.hpp:94
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14
FederateStates
Definition: CoreTypes.hpp:21
TimeRepresentation< count_time< 9 > > Time
Definition: helicsTime.hpp:27