R3BROOT
R3B analysis software
Loading...
Searching...
No Matches
R3BUcesbSource2.cxx
Go to the documentation of this file.
1/******************************************************************************
2 * Copyright (C) 2019 GSI Helmholtzzentrum für Schwerionenforschung GmbH *
3 * Copyright (C) 2019-2025 Members of R3B Collaboration *
4 * *
5 * This software is distributed under the terms of the *
6 * GNU General Public Licence (GPL) version 3, *
7 * copied verbatim in the file "LICENSE". *
8 * *
9 * In applying this license GSI does not waive the privileges and immunities *
10 * granted to it by virtue of its status as an Intergovernmental Organization *
11 * or submit itself to any jurisdiction. *
12 ******************************************************************************/
13
14#include "R3BUcesbSource2.h"
15#include <FairRootManager.h>
16#include <FairRun.h>
17#include <R3BEventHeader.h>
18#include <R3BException.h>
19#include <R3BLogger.h>
20#include <R3BUcesbDecl.h>
21#include <boost/core/span.hpp>
22#include <ext_data_client.h>
23#include <fmt/chrono.h>
24#include <fmt/format.h>
25#include <thread>
26
27namespace R3B
28{
29 UcesbSource::UcesbSource(std::string_view lmdfile_name,
30 std::string_view ntuple_options,
31 std::string_view ucesb_path,
32 EventStructType* event_struct,
33 size_t event_struct_size)
34 : event_struct_size_{ event_struct_size }
35 , event_struct_{ event_struct }
36 , lmdfile_name_{ lmdfile_name }
37 , ntuple_options_{ ntuple_options }
38 , ucesb_path_{ ucesb_path }
39 {
40 }
41
43 {
44 init_runID();
45 init_ucesb();
46 return true;
47 }
48
49 UcesbSource::~UcesbSource() { ucesb_server_launcher_.Close(); }
50
51 void UcesbSource::init_ucesb()
52 {
53 auto command_string = fmt::format("{0} {1} --ntuple={2},STRUCT,-", ucesb_path_, lmdfile_name_, ntuple_options_);
54 if (max_event_num_ > 0)
55 {
56 command_string = fmt::format("{} --max-events={}", command_string, max_event_num_);
57 }
58 R3BLOG(info, fmt::format("Calling ucesb with command: {}", command_string));
59 ucesb_server_launcher_.SetLaunchCmd(command_string);
60 ucesb_server_launcher_.Launch();
61 }
62
64 {
65 if (auto* frm = FairRootManager::Instance(); frm != nullptr)
66 {
67 R3BLOG(debug, "Checking the register of R3BEventHeader");
68 if (event_header_ = dynamic_cast<R3BEventHeader*>(frm->GetObject("EventHeader.")); event_header_ == nullptr)
69 {
70 throw R3B::runtime_error("EventHeader. was not defined properly!");
71 }
72 }
73 R3BLOG(debug, "EventHeader. was defined properly");
74
75 init_readers();
76 setup_ucesb();
77
78 return true;
79 }
80
81 void UcesbSource::setup_ucesb()
82 {
83 // TODO: convert to std::bitset
84 // could be initialzed in type UcesbMap. But C++ doesn't allow static cast of enum class pointer to its
85 // underlying type
86 auto is_struct_map_success = uint32_t{};
87 R3BLOG(info, "Setting up ucesb client...");
88 if (ucesb_client_.setup(
89 nullptr, 0, ucesb_client_struct_info_.Get(), &is_struct_map_success, event_struct_size_) == 0)
90 {
91 ucesb_client_struct_info_.CheckStructMapping(this);
92 }
93 else
94 {
95 R3BLOG(error, "ext_data_clnt::setup() failed");
96 const auto* msg = (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
97 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
98 }
99 }
100
102 {
103 auto lock = std::scoped_lock{ event_reader_mutex_ };
104 try
105 {
106 restart_ucesb_server();
107 }
108 catch (std::exception& ex)
109 {
110 throw;
111 }
112 }
113
114 int UcesbSource::ReadEvent(unsigned int /*eventID*/)
115 {
116 auto lock = std::scoped_lock{ event_reader_mutex_ };
117 auto ret_val = ucesb_client_.fetch_event(event_struct_, event_struct_size_);
118 if (ret_val > 0)
119 {
120 ForEachReader([](auto& reader) { reader->R3BRead(); });
121 }
122 else if (ret_val == 0)
123 {
124 R3BLOG(info, "Reached the maximal event num on the ucesb server.");
125 if (is_infinite_run_)
126 {
127 restart_ucesb_server_delayed();
128 return 0;
129 }
130
131 return 1;
132 }
133 else
134 {
135 R3BLOG(error, "ext_data_clnt::fetch_event() failed");
136 const auto* msg = (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
137 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
138 }
139 return 0;
140 }
141
142 void print_uint32_with_size(const uint32_t* data, ssize_t size)
143 {
144 // TODO: use ranges library instead of reinterpret_cast
145 constexpr auto column_size = 8;
146 using SubDataType = const std::array<uint32_t, column_size>;
147
148 auto data_span = std::span<SubDataType>(reinterpret_cast<SubDataType*>(data), size / column_size);
149 R3BLOG(info, "Raw data:");
150 auto index = uint32_t{};
151 for (const auto& row_data : data_span)
152 {
153 fmt::print("RAW{0:04x}: {1:08x}\n", index, fmt::join(row_data, " "));
154 index += column_size;
155 }
156 }
157
158 void UcesbSource::print_raw_data()
159 {
160 // TODO: what's the best way to deal with this void** monstrosity
161 const void* raw_data = nullptr;
162 ssize_t raw_data_size = 0;
163 auto ret_val = ucesb_client_.get_raw_data(&raw_data, &raw_data_size);
164
165 if (ret_val == 0)
166 {
167 if (raw_data != nullptr)
168 {
169 const auto* data = reinterpret_cast<const uint32_t*>(raw_data);
170 print_uint32_with_size(data, raw_data_size);
171 }
172 }
173 else
174 {
175 R3BLOG(error, "ext_data_clnt::get_raw_data()");
176 throw R3B::runtime_error("Failed to get raw data.");
177 }
178 }
179
180 void UcesbSource::init_runID()
181 {
182 auto* run = FairRun::Instance();
183 if (run == nullptr)
184 {
185 throw R3B::runtime_error("FairRun is not available!");
186 }
187
188 if (run_id_ != 0)
189 {
190 R3BLOG(info, fmt::format("Setting the run ID of the FairRun to be {} from FairSource", run_id_));
191 run->SetRunId(run_id_);
192 }
193 else if (auto run_id = run->GetRunId(); run_id != 0)
194 {
195 R3BLOG(info, fmt::format("Setting the run ID of the FairSource to be {} from FairRun", run_id));
196 run_id_ = run_id;
197 }
198 else
199 {
200 R3BLOG(warn, "Run ID of neither FairRun nor FairSource is set!");
201 }
202 }
203
204 void UcesbSource::FillEventHeader(FairEventHeader* feh) { feh->SetRunId(run_id_); }
205
207 {
208 max_event_num_ = (EvtEnd == 0) ? max_event_num_ : EvtEnd;
209 return max_event_num_ >= 0 ? max_event_num_ : -1;
210 }
211
212 // readers looping methods:
213 void UcesbSource::init_readers()
214 {
216 [this](auto& reader)
217 {
218 if (not reader->Init(ucesb_client_struct_info_.Get()))
219 {
220 const auto* msg =
221 (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
222 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
223 }
224 });
225 }
226
228 {
230 [](auto& reader)
231 {
232 if (!reader->ReInit())
233 {
234 throw R3B::runtime_error("ReInit of a reader failed.");
235 }
236 });
237 return true;
238 }
239
240 void UcesbSource::restart_ucesb_server()
241 {
242 ucesb_server_launcher_.Close();
243 R3BLOG(info, "Trying to restart ucesb server...");
244 ucesb_server_launcher_.Launch();
245 setup_ucesb();
246 }
247
248 void UcesbSource::restart_ucesb_server_delayed()
249 {
250 constexpr auto minimum_duration = std::chrono::minutes{ 30 };
251 constexpr auto max_waiting_time = std::chrono::minutes{ 600 };
252 constexpr auto waiting_time_increment = std::chrono::minutes{ 30 };
253 auto time_now = std::chrono::system_clock::now();
254 auto duration = std::chrono::duration_cast<std::chrono::minutes>(time_now - last_start_time_);
255 if (duration < minimum_duration)
256 {
257 R3BLOG(info, fmt::format("The program has been running shortly for {}", duration));
258 waiting_time_ =
259 (waiting_time_ < max_waiting_time) ? waiting_time_ + waiting_time_increment : max_waiting_time;
260 }
261 else
262 {
263 R3BLOG(info, fmt::format("The program has been running for {}", duration));
264 waiting_time_ = std::chrono::minutes{ 0 };
265 }
266 R3BLOG(info,
267 fmt::format(
268 "Infinite run enabled! Relaunching ucesb server after {}. Time now: {}", waiting_time_, time_now));
269 std::this_thread::sleep_for(waiting_time_);
270 restart_ucesb_server();
271 last_start_time_ = std::chrono::system_clock::now();
272 }
273} // namespace R3B
#define R3BLOG(severity, x)
Definition R3BLogger.h:35
constexpr auto UCESB_NULL_STR_MSG
EXT_STR_h101_t EventStructType
void SetLaunchCmd(const std::string &command_string)
bool InitUnpackers() override
bool Init() override
bool ReInitUnpackers() override
void FillEventHeader(FairEventHeader *feh) override
void ForEachReader(UnaryOp &&opt)
UcesbSource()=default
int ReadEvent(unsigned int eventID=0) override
int CheckMaxEventNo(int EvtEnd=0) override
void CheckStructMapping(UcesbSource *source)
auto Get() -> ext_data_struct_info *
void print_uint32_with_size(const uint32_t *data, ssize_t size)