R3BROOT
R3B analysis software
Loading...
Searching...
No Matches
R3BUcesbSource2.cxx
Go to the documentation of this file.
1/******************************************************************************
2 * Copyright (C) 2019 GSI Helmholtzzentrum für Schwerionenforschung GmbH *
3 * Copyright (C) 2019-2025 Members of R3B Collaboration *
4 * *
5 * This software is distributed under the terms of the *
6 * GNU General Public Licence (GPL) version 3, *
7 * copied verbatim in the file "LICENSE". *
8 * *
9 * In applying this license GSI does not waive the privileges and immunities *
10 * granted to it by virtue of its status as an Intergovernmental Organization *
11 * or submit itself to any jurisdiction. *
12 ******************************************************************************/
13
14#include "R3BUcesbSource2.h"
15#include "R3BUcesbLauncher.h"
16#include <FairRootManager.h>
17#include <FairRun.h>
18#include <R3BEventHeader.h>
19#include <R3BException.h>
20#include <R3BUcesbDecl.h>
21#include <array>
22#include <boost/core/span.hpp>
23#include <chrono>
24#include <cstddef>
25#include <cstdint>
26#include <exception>
27#include <ext_data_client.h>
28#include <fairlogger/Logger.h>
29#include <fmt/base.h>
30#include <fmt/chrono.h> // NOLINT
31#include <fmt/core.h>
32#include <fmt/format.h>
33#include <fmt/ranges.h>
34#include <memory>
35#include <mutex>
36#include <string_view>
37#include <sys/types.h>
38#include <thread>
39
40namespace R3B
41{
42 namespace
43 {
44 void print_uint32_with_size(const uint32_t* data, ssize_t size)
45 {
46 // TODO: use ranges library instead of reinterpret_cast
47 constexpr auto column_size = 8;
48 using SubDataType = const std::array<uint32_t, column_size>;
49
50 auto data_span = boost::span<SubDataType>(reinterpret_cast<SubDataType*>(data), size / column_size);
51 LOGP(info, "Raw data:");
52 auto index = uint32_t{};
53 for (const auto& row_data : data_span)
54 {
55 fmt::println("RAW{0:04x}: {1:08x}", index, fmt::join(row_data, " "));
56 index += column_size;
57 }
58 }
59
60 } // namespace
61
62 UcesbSource::UcesbSource() = default;
63
64 UcesbSource::UcesbSource(std::string_view lmdfile_name,
65 std::string_view ntuple_options,
66 std::string_view ucesb_path,
67 EventStructType* event_struct,
68 size_t event_struct_size)
69 : event_struct_size_{ event_struct_size }
70 , event_struct_{ event_struct }
71 , lmdfile_name_{ lmdfile_name }
72 , ntuple_options_{ ntuple_options }
73 , ucesb_path_{ ucesb_path }
74 {
75 }
76
78 {
79 init_runID();
80 init_ucesb();
81 return true;
82 }
83
85
87 {
88 auto command_string = fmt::format("{0} {1} --ntuple={2},STRUCT,-", ucesb_path_, lmdfile_name_, ntuple_options_);
89 if (max_event_num_ > 0)
90 {
91 command_string = fmt::format("{} --max-events={}", command_string, max_event_num_);
92 }
93 LOGP(info, "Calling ucesb with command: {}", command_string);
94 ucesb_server_launcher_ = std::make_unique<UcesbServerLauncher>(&ucesb_client_);
95 ucesb_server_launcher_->SetLaunchCmd(command_string);
96 ucesb_server_launcher_->Launch();
97 }
98
100 {
101 if (auto* frm = FairRootManager::Instance(); frm != nullptr)
102 {
103 LOGP(debug, "Checking the register of R3BEventHeader");
104 if (event_header_ = dynamic_cast<R3BEventHeader*>(frm->GetObject("EventHeader.")); event_header_ == nullptr)
105 {
106 throw R3B::runtime_error("EventHeader. was not defined properly!");
107 }
108 }
109 LOGP(debug, "EventHeader. was defined properly");
110
111 init_readers();
112 setup_ucesb();
113
114 return true;
115 }
116
118 {
119 // TODO: convert to std::bitset
120 // could be initialzed in type UcesbMap. But C++ doesn't allow static cast of enum class pointer to its
121 // underlying type
122 auto is_struct_map_success = uint32_t{};
123 LOGP(info, "Setting up ucesb client...");
124 if (ucesb_client_.setup(
125 nullptr, 0, ucesb_client_struct_info_.Get(), &is_struct_map_success, event_struct_size_) == 0)
126 {
127 ucesb_client_struct_info_.CheckStructMapping(this);
128 }
129 else
130 {
131 LOGP(error, "ext_data_clnt::setup() failed");
132 const auto* msg = (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
133 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
134 }
135 }
136
138 {
139 auto lock = std::scoped_lock{ event_reader_mutex_ };
140 try
141 {
143 }
144 catch (std::exception& ex)
145 {
146 throw;
147 }
148 }
149
150 int UcesbSource::ReadEvent(unsigned int /*eventID*/)
151 {
152 auto lock = std::scoped_lock{ event_reader_mutex_ };
153 auto ret_val = ucesb_client_.fetch_event(event_struct_, event_struct_size_);
154 if (ret_val > 0)
155 {
156 ForEachReader([](auto& reader) { reader->R3BRead(); });
157 }
158 else if (ret_val == 0)
159 {
160 LOGP(info, "Reached the maximal event num on the ucesb server.");
162 {
164 return 0;
165 }
166 return 1;
167 }
168 else
169 {
170 LOGP(error, "ext_data_clnt::fetch_event() failed");
171 const auto* msg = (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
172 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
173 }
174 return 0;
175 }
176
178 {
179 // TODO: what's the best way to deal with this void** monstrosity
180 const void* raw_data = nullptr;
181 ssize_t raw_data_size = 0;
182 auto ret_val = ucesb_client_.get_raw_data(&raw_data, &raw_data_size);
183
184 if (ret_val == 0)
185 {
186 if (raw_data != nullptr)
187 {
188 const auto* data = reinterpret_cast<const uint32_t*>(raw_data);
189 print_uint32_with_size(data, raw_data_size);
190 }
191 }
192 else
193 {
194 LOGP(error, "ext_data_clnt::get_raw_data()");
195 throw R3B::runtime_error("Failed to get raw data.");
196 }
197 }
198
200 {
201 auto* run = FairRun::Instance();
202 if (run == nullptr)
203 {
204 throw R3B::runtime_error("FairRun is not available!");
205 }
206
207 if (run_id_ != 0)
208 {
209 LOGP(info, "Setting the run ID of the FairRun to be {} from FairSource", run_id_);
210 run->SetRunId(run_id_);
211 }
212 else if (auto run_id = run->GetRunId(); run_id != 0)
213 {
214 LOGP(info, "Setting the run ID of the FairSource to be {} from FairRun", run_id);
215 run_id_ = run_id;
216 }
217 else
218 {
219 LOGP(warn, "Run ID of neither FairRun nor FairSource is set!");
220 }
221 }
222
223 void UcesbSource::FillEventHeader(FairEventHeader* feh) { feh->SetRunId(run_id_); }
224
226 {
227 max_event_num_ = (EvtEnd == 0) ? max_event_num_ : EvtEnd;
228 return max_event_num_ >= 0 ? max_event_num_ : -1;
229 }
230
231 // readers looping methods:
233 {
235 [this](auto& reader)
236 {
237 if (not reader->Init(ucesb_client_struct_info_.Get()))
238 {
239 const auto* msg =
240 (ucesb_client_.last_error() == nullptr) ? UCESB_NULL_STR_MSG : ucesb_client_.last_error();
241 throw R3B::runtime_error(fmt::format("UCESB error: {}", msg));
242 }
243 });
244 }
245
247 {
249 [](auto& reader)
250 {
251 if (!reader->ReInit())
252 {
253 throw R3B::runtime_error("ReInit of a reader failed.");
254 }
255 });
256 return true;
257 }
258
260 {
261 ucesb_server_launcher_->Close();
262 LOGP(info, "Trying to restart ucesb server...");
263 ucesb_server_launcher_->Launch();
264 setup_ucesb();
265 }
266
268 {
269 constexpr auto minimum_duration = std::chrono::minutes{ 30 };
270 constexpr auto max_waiting_time = std::chrono::minutes{ 600 };
271 constexpr auto waiting_time_increment = std::chrono::minutes{ 30 };
272 auto time_now = std::chrono::system_clock::now();
273 auto duration = std::chrono::duration_cast<std::chrono::minutes>(time_now - last_start_time_);
274 if (duration < minimum_duration)
275 {
276 LOGP(info, "The program has been running shortly for {}", duration);
278 (waiting_time_ < max_waiting_time) ? waiting_time_ + waiting_time_increment : max_waiting_time;
279 }
280 else
281 {
282 LOGP(info, "The program has been running for {}", duration);
283 waiting_time_ = std::chrono::minutes{ 0 };
284 }
285 // LOGP(info, "Infinite run enabled! Relaunching ucesb server after {}. Time now: {}", waiting_time_, time_now);
286 std::this_thread::sleep_for(waiting_time_);
288 last_start_time_ = std::chrono::system_clock::now();
289 }
290} // namespace R3B
constexpr auto UCESB_NULL_STR_MSG
EXT_STR_h101_t EventStructType
std::string ntuple_options_
std::chrono::time_point< std::chrono::system_clock > last_start_time_
R3BEventHeader * event_header_
ext_data_clnt ucesb_client_
std::unique_ptr< UcesbServerLauncher > ucesb_server_launcher_
std::mutex event_reader_mutex_
bool InitUnpackers() override
bool Init() override
bool ReInitUnpackers() override
UcesbStructInfo ucesb_client_struct_info_
unsigned int run_id_
void FillEventHeader(FairEventHeader *feh) override
std::string lmdfile_name_
int ReadEvent(unsigned int eventID=0) override
void restart_ucesb_server_delayed()
int CheckMaxEventNo(int EvtEnd=0) override
std::chrono::minutes waiting_time_
void ForEachReader(UnaryOp opt)
std::string ucesb_path_
EventStructType * event_struct_