8#include "G4Threading.hh"
10std::mutex GRunAction::completed_run_data_mutex;
11GRunAction::CompletedRunData GRunAction::completed_worker_run_data;
19 digitization_routines_map(std::move(digi_map)) {
20 const auto desc = std::to_string(G4Threading::G4GetThreadId());
26G4Run *GRunAction::GenerateRun() {
34void GRunAction::BeginOfRunAction(
const G4Run *aRun) {
35 const auto thread_id = G4Threading::G4GetThreadId();
36 const auto run = aRun->GetRunID();
38 auto run_header = std::make_unique<GRunHeader>(
goptions, run, thread_id);
39 run_data = std::make_unique<GRunDataCollection>(
goptions, std::move(run_header));
41 const auto neventsThisRun = aRun->GetNumberOfEventToBeProcessed();
44 need_a_thread_streamer =
false;
45 need_a_run_streamer =
false;
49 if (digitization_routines_map !=
nullptr) {
50 for (
const auto &[plugin, digiRoutine]: *digitization_routines_map) {
51 if (digiRoutine ==
nullptr) {
53 " null digitization routine registered for plugin ", plugin);
56 if (digiRoutine->collection_mode() == CollectionMode::event) {
57 need_a_thread_streamer =
true;
58 }
else if (digiRoutine->collection_mode() == CollectionMode::run) {
59 to_normalize[plugin] = digiRoutine->variables_to_normalize();
60 need_a_run_streamer =
true;
65 " digitization_routines_map is null - streamer mode detection skipped.");
69 if (streamer_definition.type ==
"event") {
70 need_a_thread_streamer =
true;
78 if (!IsMaster() && need_a_thread_streamer) {
79 if (gstreamer_threads_map ==
nullptr) {
80 log->
info(1,
"Defining thread gstreamers for run ", run,
" in thread ", thread_id);
84 if (gstreamer_threads_map ==
nullptr) {
85 log->
error(1, FUNCTION_NAME,
" gstreamer_threads_map is null in thread ", thread_id,
86 " - cannot open connections.");
89 for (
const auto &[name,
gstreamer]: *gstreamer_threads_map) {
92 "Null GStreamer entry ", name,
" in thread ", thread_id);
97 "Failed to open connection for GStreamer ", name,
98 " in thread ", thread_id);
101 log->
info(2, FUNCTION_NAME,
"Worker thread [", thread_id,
"]: opening connection for ",
103 " for run ", run,
". Number of events to be processed: ", neventsThisRun);
108 else if (IsMaster() && need_a_run_streamer) {
109 if (gstreamer_run_map ==
nullptr) {
110 log->
info(1,
"Defining run gstreamers for run ", run);
114 if (gstreamer_run_map ==
nullptr) {
115 log->
error(1, FUNCTION_NAME,
" gstreamer_run_map is null in master thread ",
116 " - cannot open connections.");
119 for (
const auto &[name,
gstreamer]: *gstreamer_run_map) {
122 "Null GStreamer entry ", name,
" in master thread");
127 "Failed to open connection for GStreamer in master thread ", name);
130 log->
info(2, FUNCTION_NAME,
"Master Thread: opening connection for ",
132 " for run ", run,
". Number of events to be processed: ", neventsThisRun);
139void GRunAction::EndOfRunAction(
const G4Run *aRun) {
140 const auto thread_id = G4Threading::G4GetThreadId();
141 const auto runNumber = aRun->GetRunID();
142 const std::string what_am_i = IsMaster() ?
"Master" :
"Worker";
144 if (!IsMaster() && need_a_thread_streamer) {
145 if (gstreamer_threads_map ==
nullptr) {
147 " gstreamer_map is null in thread ", thread_id,
148 " - cannot close connections.");
150 for (
const auto &[name,
gstreamer]: *gstreamer_threads_map) {
151 log->
info(2, FUNCTION_NAME,
" ", what_am_i,
" [", thread_id,
"], for run ", runNumber,
152 " closing connection for gstreamer ", name);
156 "Null GStreamer entry ", name,
" in thread ", thread_id);
160 log->
error(1,
"Failed to close connection for GStreamer ", name,
" in thread ", thread_id);
171 if (need_a_run_streamer) { stash_worker_run_data(); }
175 if (IsMaster() && need_a_run_streamer) {
178 auto completed_run_data = take_completed_worker_run_data();
180 " master collected ",
static_cast<int>(completed_run_data.size()),
181 " worker run_data object(s) for run ", runNumber);
183 std::shared_ptr<GRunDataCollection> merged_run_data;
185 for (
auto &worker_run_data: completed_run_data) {
186 if (worker_run_data ==
nullptr) {
192 if (merged_run_data ==
nullptr) {
193 auto merged_header = std::make_unique<GRunHeader>(
goptions, runNumber, thread_id);
194 merged_run_data = std::make_shared<GRunDataCollection>(
goptions, std::move(merged_header));
197 merged_run_data->merge(*worker_run_data);
201 if (merged_run_data !=
nullptr) {
202 publish_run_data(merged_run_data);
205 if (gstreamer_run_map ==
nullptr) {
207 " gstreamer_map is null in master thread - cannot close connections.");
210 for (
const auto &[name,
gstreamer]: *gstreamer_run_map) {
211 log->
info(2, FUNCTION_NAME,
" ", what_am_i,
" for run ", runNumber,
212 " closing connection for gstreamer ", name);
216 "Null GStreamer entry ", name,
" in master thread");
220 log->
error(1,
"Failed to close connection for GStreamer ", name,
" in master thread");
229void GRunAction::stash_worker_run_data() {
230 if (run_data ==
nullptr) {
234 std::scoped_lock lock(completed_run_data_mutex);
235 completed_worker_run_data.emplace_back(std::move(run_data));
239auto GRunAction::take_completed_worker_run_data() -> CompletedRunData {
240 std::scoped_lock lock(completed_run_data_mutex);
242 auto result = std::move(completed_worker_run_data);
243 completed_worker_run_data.clear();
250void GRunAction::publish_run_data(
const std::shared_ptr<GRunDataCollection> &run_data_collaction)
const {
251 if (run_data_collaction ==
nullptr) {
253 " run_data is null - cannot publish merged run data.");
256 if (gstreamer_run_map ==
nullptr) {
258 " no run streamer map available - run data will not be published.");
264 normalize_run_data(run_data_collaction);
266 for (
const auto &[name,
gstreamer]: *gstreamer_run_map) {
269 " null gstreamer instance for run streamer ", name);
272 gstreamer->publishRunData(run_data_collaction);
277void GRunAction::normalize_run_data(
const std::shared_ptr<GRunDataCollection> &run_data_collaction)
const {
278 if (run_data_collaction ==
nullptr) {
280 " run_data_collaction is null - cannot normalize run data.");
283 const int events_processed = run_data_collaction->get_events_processed();
284 if (events_processed <= 0) {
286 " events_processed is ", events_processed,
287 " - skipping normalization.");
291 const double norm =
static_cast<double>(events_processed);
293 for (
auto &[sdName, dataCollection]: run_data_collaction->getMutableDataCollectionMap()) {
294 if (dataCollection ==
nullptr) {
296 " detector ", sdName,
297 " has null data collection - skipping.");
301 auto &digitizedData = dataCollection->getMutableDigitizedData();
302 if (digitizedData.empty() || digitizedData.front() ==
nullptr) {
306 auto &digitized = digitizedData.front();
310 auto it = to_normalize.find(sdName);
311 if (it != to_normalize.end()) {
312 for (
const auto &varName: it->second) {
313 const auto intVars = digitized->getIntObservablesMap(0);
314 const auto intIt = intVars.find(varName);
315 if (intIt != intVars.end()) {
316 digitized->includeVariable(varName,
static_cast<double>(intIt->second) / norm);
320 const auto dblVars = digitized->getDblObservablesMap(0);
321 const auto dblIt = dblVars.find(varName);
322 if (dblIt != dblVars.end()) {
323 digitized->includeVariable(varName, dblIt->second / norm);
std::shared_ptr< GLogger > log
void warning(Args &&... args) const
void debug(debug_type type, Args &&... args) const
void info(int level, Args &&... args) const
void error(int exit_code, Args &&... args) const
GRunAction(std::shared_ptr< GOptions > gopts, std::shared_ptr< gdynamicdigitization::dRoutinesMap > digi_map)
Constructs the run action.
Thread-local run object created by the GEMC run action.
Declares GRunAction, the run-lifecycle action for the GEMC actions module.
constexpr const char * GRUNACTION_LOGGER
Declares GRun, the thread-local run container used by the GEMC actions module.
Defines error codes used by the GEMC actions module.
#define ERR_GDIGIMAP_NOT_EXISTING
#define ERR_GRUNACTION_NOT_EXISTING
#define ERR_STREAMERMAP_NOT_EXISTING
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id=-1)
vector< GStreamerDefinition > getGStreamerDefinition(const std::shared_ptr< GOptions > &gopts)