helics  3.5.2
IpcQueueHelper.h
1 /*
2 Copyright (c) 2017-2024,
3 Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
4 Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
5 SPDX-License-Identifier: BSD-3-Clause
6 */
7 #pragma once
8 
9 #include "helics/core/ActionMessage.hpp"
10 
11 #include <algorithm>
12 #include <boost/interprocess/ipc/message_queue.hpp>
13 #include <boost/interprocess/shared_memory_object.hpp>
14 #include <boost/interprocess/sync/interprocess_mutex.hpp>
15 #include <cctype>
16 #include <iostream>
17 #include <memory>
18 #include <optional>
19 #include <string>
20 #include <thread>
21 #include <vector>
22 
23 using ipc_queue = boost::interprocess::message_queue;
24 using ipc_state = boost::interprocess::shared_memory_object;
25 
26 namespace helics {
27 namespace ipc {
30  inline std::string stringTranslateToCppName(std::string in)
31  {
32  std::replace_if(
33  in.begin(), in.end(), [](auto c) { return !(std::isalnum(c) || (c == '_')); }, '_');
34  return in;
35  }
37  enum class queue_state_t : int {
38  unknown = -1,
39  startup = 0,
40  connected = 1,
41  operating = 2,
42  closing = 3,
43  };
44 
49  private:
50  using ipcmutex = boost::interprocess::interprocess_mutex;
51  mutable ipcmutex data_lock;
52  queue_state_t state = queue_state_t::startup;
53 
54  public:
55  queue_state_t getState() const
56  {
57  try {
58  boost::interprocess::scoped_lock<ipcmutex> lock(data_lock);
59  return state;
60  }
61  catch (const boost::interprocess::lock_exception&) {
62  return queue_state_t::unknown;
63  }
64  }
65  bool setState(queue_state_t newState)
66  {
67  bool success = false;
68  int tries = 0;
69  while (!success) {
70  try {
71  boost::interprocess::scoped_lock<ipcmutex> lock(data_lock);
72  state = newState;
73  success = true;
74  }
75  catch (const boost::interprocess::lock_exception&) {
76  std::this_thread::sleep_for(std::chrono::milliseconds(100));
77  ++tries;
78  if (tries > 20) {
79  std::cout << "error in connecting to process lock\n";
80  state = newState;
81  return false;
82  }
83  }
84  }
85  return success;
86  }
87  };
88 
90  class OwnedQueue {
91  private:
92  std::unique_ptr<ipc_queue> rqueue;
93  std::unique_ptr<ipc_state> queue_state;
94  std::string connectionNameOrig;
95  std::string connectionName;
96  std::string stateName;
97  std::string errorString;
98  std::vector<char> buffer;
99  int mxSize = 0;
100  bool connected = false;
101 
102  public:
103  OwnedQueue() = default;
104  ~OwnedQueue();
105  bool connect(const std::string& connection, int maxMessages, int maxSize);
106 
107  void changeState(queue_state_t newState);
108 
109  std::optional<ActionMessage> getMessage(int timeout);
110  ActionMessage getMessage();
111 
112  const std::string& getError() const { return errorString; }
113  };
114 
116  class SendToQueue {
117  private:
118  std::unique_ptr<ipc_queue> txqueue;
119  std::string connectionNameOrig;
120  std::string
121  connectionName;
122  std::string errorString;
123  std::vector<char> buffer;
124  bool connected = false;
125 
126  public:
127  SendToQueue() = default;
128 
129  bool connect(const std::string& connection, bool initOnly, int retries);
130 
131  void sendMessage(const ActionMessage& cmd, int priority);
132 
133  const std::string& getError() const { return errorString; }
134  };
135 } // namespace ipc
136 } // namespace helics
Definition: ActionMessage.hpp:30
Definition: IpcQueueHelper.h:90
Definition: IpcQueueHelper.h:116
Definition: IpcQueueHelper.h:48
the main namespace for the helics co-simulation library User functions will be in the helics namespac...
Definition: AsyncFedCallInfo.hpp:14