16#include <FairRootManager.h>
22#include <boost/core/span.hpp>
27#include <ext_data_client.h>
28#include <fairlogger/Logger.h>
29#include <fmt/chrono.h>
31#include <fmt/format.h>
42 void print_uint32_with_size(
const uint32_t* data, ssize_t size)
45 constexpr auto column_size = 8;
46 using SubDataType =
const std::array<uint32_t, column_size>;
48 auto data_span = boost::span<SubDataType>(
reinterpret_cast<SubDataType*
>(data), size / column_size);
49 LOGP(info,
"Raw data:");
50 auto index = uint32_t{};
51 for (
const auto& row_data : data_span)
53 fmt::print(
"RAW{0:04x}: {1:08x}\n", index, fmt::join(row_data,
" "));
63 std::string_view ntuple_options,
64 std::string_view ucesb_path,
66 size_t event_struct_size)
89 command_string = fmt::format(
"{} --max-events={}", command_string,
max_event_num_);
91 LOGP(info,
"Calling ucesb with command: {}", command_string);
99 if (
auto* frm = FairRootManager::Instance(); frm !=
nullptr)
101 LOGP(debug,
"Checking the register of R3BEventHeader");
107 LOGP(debug,
"EventHeader. was defined properly");
120 auto is_struct_map_success = uint32_t{};
121 LOGP(info,
"Setting up ucesb client...");
129 LOGP(error,
"ext_data_clnt::setup() failed");
142 catch (std::exception& ex)
156 else if (ret_val == 0)
158 LOGP(info,
"Reached the maximal event num on the ucesb server.");
168 LOGP(error,
"ext_data_clnt::fetch_event() failed");
178 const void* raw_data =
nullptr;
179 ssize_t raw_data_size = 0;
180 auto ret_val =
ucesb_client_.get_raw_data(&raw_data, &raw_data_size);
184 if (raw_data !=
nullptr)
186 const auto* data =
reinterpret_cast<const uint32_t*
>(raw_data);
187 print_uint32_with_size(data, raw_data_size);
192 LOGP(error,
"ext_data_clnt::get_raw_data()");
199 auto* run = FairRun::Instance();
207 LOGP(info,
"Setting the run ID of the FairRun to be {} from FairSource",
run_id_);
210 else if (
auto run_id = run->GetRunId(); run_id != 0)
212 LOGP(info,
"Setting the run ID of the FairSource to be {} from FairRun", run_id);
217 LOGP(warn,
"Run ID of neither FairRun nor FairSource is set!");
249 if (!reader->ReInit())
260 LOGP(info,
"Trying to restart ucesb server...");
267 constexpr auto minimum_duration = std::chrono::minutes{ 30 };
268 constexpr auto max_waiting_time = std::chrono::minutes{ 600 };
269 constexpr auto waiting_time_increment = std::chrono::minutes{ 30 };
270 auto time_now = std::chrono::system_clock::now();
271 auto duration = std::chrono::duration_cast<std::chrono::minutes>(time_now -
last_start_time_);
272 if (duration < minimum_duration)
274 LOGP(info,
"The program has been running shortly for {}", duration);
280 LOGP(info,
"The program has been running for {}", duration);
285 "Infinite run enabled! Relaunching ucesb server after {}. Time now: {}",
waiting_time_, time_now));
constexpr auto UCESB_NULL_STR_MSG
EXT_STR_h101_t EventStructType
std::string ntuple_options_
std::chrono::time_point< std::chrono::system_clock > last_start_time_
R3BEventHeader * event_header_
ext_data_clnt ucesb_client_
std::unique_ptr< UcesbServerLauncher > ucesb_server_launcher_
std::mutex event_reader_mutex_
void restart_ucesb_server()
bool InitUnpackers() override
void RestartUcesbServer()
bool ReInitUnpackers() override
UcesbStructInfo ucesb_client_struct_info_
void FillEventHeader(FairEventHeader *feh) override
std::string lmdfile_name_
size_t event_struct_size_
int ReadEvent(unsigned int eventID=0) override
void restart_ucesb_server_delayed()
int CheckMaxEventNo(int EvtEnd=0) override
std::chrono::minutes waiting_time_
void ForEachReader(UnaryOp opt)
EventStructType * event_struct_