15#include <FairRootManager.h>
21#include <boost/core/span.hpp>
22#include <ext_data_client.h>
23#include <fmt/chrono.h>
24#include <fmt/format.h>
30 std::string_view ntuple_options,
31 std::string_view ucesb_path,
33 size_t event_struct_size)
34 : event_struct_size_{ event_struct_size }
35 , event_struct_{ event_struct }
36 , lmdfile_name_{ lmdfile_name }
37 , ntuple_options_{ ntuple_options }
38 , ucesb_path_{ ucesb_path }
51 void UcesbSource::init_ucesb()
53 auto command_string = fmt::format(
"{0} {1} --ntuple={2},STRUCT,-", ucesb_path_, lmdfile_name_, ntuple_options_);
54 if (max_event_num_ > 0)
56 command_string = fmt::format(
"{} --max-events={}", command_string, max_event_num_);
58 R3BLOG(info, fmt::format(
"Calling ucesb with command: {}", command_string));
60 ucesb_server_launcher_.
Launch();
65 if (
auto* frm = FairRootManager::Instance(); frm !=
nullptr)
67 R3BLOG(debug,
"Checking the register of R3BEventHeader");
68 if (event_header_ =
dynamic_cast<R3BEventHeader*
>(frm->GetObject(
"EventHeader.")); event_header_ ==
nullptr)
73 R3BLOG(debug,
"EventHeader. was defined properly");
81 void UcesbSource::setup_ucesb()
86 auto is_struct_map_success = uint32_t{};
87 R3BLOG(info,
"Setting up ucesb client...");
88 if (ucesb_client_.setup(
89 nullptr, 0, ucesb_client_struct_info_.
Get(), &is_struct_map_success, event_struct_size_) == 0)
95 R3BLOG(error,
"ext_data_clnt::setup() failed");
96 const auto* msg = (ucesb_client_.last_error() ==
nullptr) ?
UCESB_NULL_STR_MSG : ucesb_client_.last_error();
97 throw R3B::runtime_error(fmt::format(
"UCESB error: {}", msg));
103 auto lock = std::scoped_lock{ event_reader_mutex_ };
106 restart_ucesb_server();
108 catch (std::exception& ex)
116 auto lock = std::scoped_lock{ event_reader_mutex_ };
117 auto ret_val = ucesb_client_.fetch_event(event_struct_, event_struct_size_);
122 else if (ret_val == 0)
124 R3BLOG(info,
"Reached the maximal event num on the ucesb server.");
125 if (is_infinite_run_)
127 restart_ucesb_server_delayed();
135 R3BLOG(error,
"ext_data_clnt::fetch_event() failed");
136 const auto* msg = (ucesb_client_.last_error() ==
nullptr) ?
UCESB_NULL_STR_MSG : ucesb_client_.last_error();
145 constexpr auto column_size = 8;
146 using SubDataType =
const std::array<uint32_t, column_size>;
148 auto data_span = std::span<SubDataType>(
reinterpret_cast<SubDataType*
>(data), size / column_size);
149 R3BLOG(info,
"Raw data:");
150 auto index = uint32_t{};
151 for (
const auto& row_data : data_span)
153 fmt::print(
"RAW{0:04x}: {1:08x}\n", index, fmt::join(row_data,
" "));
154 index += column_size;
158 void UcesbSource::print_raw_data()
161 const void* raw_data =
nullptr;
162 ssize_t raw_data_size = 0;
163 auto ret_val = ucesb_client_.get_raw_data(&raw_data, &raw_data_size);
167 if (raw_data !=
nullptr)
169 const auto* data =
reinterpret_cast<const uint32_t*
>(raw_data);
175 R3BLOG(error,
"ext_data_clnt::get_raw_data()");
176 throw R3B::runtime_error(
"Failed to get raw data.");
180 void UcesbSource::init_runID()
182 auto* run = FairRun::Instance();
185 throw R3B::runtime_error(
"FairRun is not available!");
190 R3BLOG(info, fmt::format(
"Setting the run ID of the FairRun to be {} from FairSource", run_id_));
191 run->SetRunId(run_id_);
193 else if (
auto run_id = run->GetRunId(); run_id != 0)
195 R3BLOG(info, fmt::format(
"Setting the run ID of the FairSource to be {} from FairRun", run_id));
200 R3BLOG(warn,
"Run ID of neither FairRun nor FairSource is set!");
208 max_event_num_ = (EvtEnd == 0) ? max_event_num_ : EvtEnd;
209 return max_event_num_ >= 0 ? max_event_num_ : -1;
213 void UcesbSource::init_readers()
218 if (not reader->Init(ucesb_client_struct_info_.
Get()))
221 (ucesb_client_.last_error() ==
nullptr) ?
UCESB_NULL_STR_MSG : ucesb_client_.last_error();
232 if (!reader->ReInit())
240 void UcesbSource::restart_ucesb_server()
242 ucesb_server_launcher_.
Close();
243 R3BLOG(info,
"Trying to restart ucesb server...");
244 ucesb_server_launcher_.
Launch();
248 void UcesbSource::restart_ucesb_server_delayed()
250 constexpr auto minimum_duration = std::chrono::minutes{ 30 };
251 constexpr auto max_waiting_time = std::chrono::minutes{ 600 };
252 constexpr auto waiting_time_increment = std::chrono::minutes{ 30 };
253 auto time_now = std::chrono::system_clock::now();
254 auto duration = std::chrono::duration_cast<std::chrono::minutes>(time_now - last_start_time_);
255 if (duration < minimum_duration)
257 R3BLOG(info, fmt::format(
"The program has been running shortly for {}", duration));
259 (waiting_time_ < max_waiting_time) ? waiting_time_ + waiting_time_increment : max_waiting_time;
263 R3BLOG(info, fmt::format(
"The program has been running for {}", duration));
264 waiting_time_ = std::chrono::minutes{ 0 };
268 "Infinite run enabled! Relaunching ucesb server after {}. Time now: {}", waiting_time_, time_now));
269 std::this_thread::sleep_for(waiting_time_);
270 restart_ucesb_server();
271 last_start_time_ = std::chrono::system_clock::now();
#define R3BLOG(severity, x)
constexpr auto UCESB_NULL_STR_MSG
EXT_STR_h101_t EventStructType
void SetLaunchCmd(const std::string &command_string)
bool InitUnpackers() override
void RestartUcesbServer()
bool ReInitUnpackers() override
void FillEventHeader(FairEventHeader *feh) override
void ForEachReader(UnaryOp &&opt)
int ReadEvent(unsigned int eventID=0) override
int CheckMaxEventNo(int EvtEnd=0) override
void CheckStructMapping(UcesbSource *source)
auto Get() -> ext_data_struct_info *
void print_uint32_with_size(const uint32_t *data, ssize_t size)