actions
Loading...
Searching...
No Matches
gRunAction.cc
Go to the documentation of this file.
1// gemc
2#include "gRunAction.h"
3#include "gRun.h"
5#include "gutsConventions.h"
6
7// geant4
8#include "G4Threading.hh"
9
10std::mutex GRunAction::completed_run_data_mutex;
11GRunAction::CompletedRunData GRunAction::completed_worker_run_data;
12
13
14// Construct the run action and retain access to shared configuration and
15// digitization services for the current execution context.
16GRunAction::GRunAction(std::shared_ptr<GOptions> gopt,
17 std::shared_ptr<gdynamicdigitization::dRoutinesMap> digi_map) : GBase(gopt, GRUNACTION_LOGGER),
18 goptions(std::move(gopt)),
19 digitization_routines_map(std::move(digi_map)) {
20 const auto desc = std::to_string(G4Threading::G4GetThreadId());
22}
23
24
25// Create the thread-local run object used by Geant4 for this execution context.
26G4Run *GRunAction::GenerateRun() {
27 log->debug(NORMAL, FUNCTION_NAME);
28
29 return new GRun(goptions, digitization_routines_map);
30}
31
32// Initialize run-scoped bookkeeping, determine which streamer categories are
33// needed for this run, and open the appropriate connections.
34void GRunAction::BeginOfRunAction(const G4Run *aRun) {
35 const auto thread_id = G4Threading::G4GetThreadId();
36 const auto run = aRun->GetRunID();
37
38 auto run_header = std::make_unique<GRunHeader>(goptions, run, thread_id);
39 run_data = std::make_unique<GRunDataCollection>(goptions, std::move(run_header));
40
41 const auto neventsThisRun = aRun->GetNumberOfEventToBeProcessed();
42
43 // Reset the per-run mode flags before scanning the digitization routines.
44 need_a_thread_streamer = false;
45 need_a_run_streamer = false;
46
47 // Inspect the available digitization routines to determine whether this run
48 // requires event-mode publication, run-mode publication, or both.
49 if (digitization_routines_map != nullptr) {
50 for (const auto &[plugin, digiRoutine]: *digitization_routines_map) {
51 if (digiRoutine == nullptr) {
52 log->error(ERR_GDIGIMAP_NOT_EXISTING, FUNCTION_NAME,
53 " null digitization routine registered for plugin ", plugin);
54 }
55
56 if (digiRoutine->collection_mode() == CollectionMode::event) {
57 need_a_thread_streamer = true;
58 } else if (digiRoutine->collection_mode() == CollectionMode::run) {
59 to_normalize[plugin] = digiRoutine->variables_to_normalize();
60 need_a_run_streamer = true;
61 }
62 }
63 } else {
64 log->error(ERR_GDIGIMAP_NOT_EXISTING, FUNCTION_NAME,
65 " digitization_routines_map is null - streamer mode detection skipped.");
66 }
67
68 for (const auto& streamer_definition : gstreamer::getGStreamerDefinition(goptions)) {
69 if (streamer_definition.type == "event") {
70 need_a_thread_streamer = true;
71 break;
72 }
73 }
74
75
76 // Worker threads own event-mode publication, so they lazily build and open
77 // thread-local streamers only when at least one event-mode digitizer exists.
78 if (!IsMaster() && need_a_thread_streamer) {
79 if (gstreamer_threads_map == nullptr) {
80 log->info(1, "Defining thread gstreamers for run ", run, " in thread ", thread_id);
81 gstreamer_threads_map = gstreamer::gstreamersMapPtr(goptions, thread_id);
82 }
83
84 if (gstreamer_threads_map == nullptr) {
85 log->error(1, FUNCTION_NAME, " gstreamer_threads_map is null in thread ", thread_id,
86 " - cannot open connections.");
87 }
88
89 for (const auto &[name, gstreamer]: *gstreamer_threads_map) {
90 if (gstreamer == nullptr) {
92 "Null GStreamer entry ", name, " in thread ", thread_id);
93 }
94
95 if (!gstreamer->openConnection()) {
97 "Failed to open connection for GStreamer ", name,
98 " in thread ", thread_id);
99 }
100
101 log->info(2, FUNCTION_NAME, "Worker thread [", thread_id, "]: opening connection for ",
102 KGRN, name, RST,
103 " for run ", run, ". Number of events to be processed: ", neventsThisRun);
104 }
105 }
106 // The master thread owns run-mode publication, so it opens the run streamers
107 // only when at least one digitizer accumulates payload at run scope.
108 else if (IsMaster() && need_a_run_streamer) {
109 if (gstreamer_run_map == nullptr) {
110 log->info(1, "Defining run gstreamers for run ", run);
111 gstreamer_run_map = gstreamer::gstreamersMapPtr(goptions);
112 }
113
114 if (gstreamer_run_map == nullptr) {
115 log->error(1, FUNCTION_NAME, " gstreamer_run_map is null in master thread ",
116 " - cannot open connections.");
117 }
118
119 for (const auto &[name, gstreamer]: *gstreamer_run_map) {
120 if (gstreamer == nullptr) {
122 "Null GStreamer entry ", name, " in master thread");
123 }
124
125 if (!gstreamer->openConnection()) {
127 "Failed to open connection for GStreamer in master thread ", name);
128 }
129
130 log->info(2, FUNCTION_NAME, "Master Thread: opening connection for ",
131 KGRN, name, RST,
132 " for run ", run, ". Number of events to be processed: ", neventsThisRun);
133 }
134 }
135}
136
137// Close streamers at run end and, when running on the master thread, merge and
138// publish the run-level payload accumulated by workers.
139void GRunAction::EndOfRunAction(const G4Run *aRun) {
140 const auto thread_id = G4Threading::G4GetThreadId();
141 const auto runNumber = aRun->GetRunID();
142 const std::string what_am_i = IsMaster() ? "Master" : "Worker";
143
144 if (!IsMaster() && need_a_thread_streamer) {
145 if (gstreamer_threads_map == nullptr) {
146 log->error(ERR_STREAMERMAP_NOT_EXISTING, FUNCTION_NAME,
147 " gstreamer_map is null in thread ", thread_id,
148 " - cannot close connections.");
149 } else {
150 for (const auto &[name, gstreamer]: *gstreamer_threads_map) {
151 log->info(2, FUNCTION_NAME, " ", what_am_i, " [", thread_id, "], for run ", runNumber,
152 " closing connection for gstreamer ", name);
153
154 if (gstreamer == nullptr) {
156 "Null GStreamer entry ", name, " in thread ", thread_id);
157 }
158
159 if (!gstreamer->closeConnection()) {
160 log->error(1, "Failed to close connection for GStreamer ", name, " in thread ", thread_id);
161 }
162 }
163 }
164 }
165
166 // Worker threads do not publish merged run data. Instead, they hand their
167 // completed run-level accumulation to the shared pool and return.
168 if (!IsMaster()) {
169 // Only contribute to the master merge pool when a run-mode streamer will drain it;
170 // otherwise the pool is never taken and would accumulate stale prior-run data.
171 if (need_a_run_streamer) { stash_worker_run_data(); }
172 return;
173 }
174
175 if (IsMaster() && need_a_run_streamer) {
176 // Gather all worker-produced run data for this run and merge them into a
177 // single master-side run-data object before publication.
178 auto completed_run_data = take_completed_worker_run_data();
179 log->info(2, FUNCTION_NAME,
180 " master collected ", static_cast<int>(completed_run_data.size()),
181 " worker run_data object(s) for run ", runNumber);
182
183 std::shared_ptr<GRunDataCollection> merged_run_data;
184
185 for (auto &worker_run_data: completed_run_data) {
186 if (worker_run_data == nullptr) {
187 continue;
188 }
189
190 // Create the merged destination lazily only if there is at least one
191 // valid worker contribution to merge.
192 if (merged_run_data == nullptr) {
193 auto merged_header = std::make_unique<GRunHeader>(goptions, runNumber, thread_id);
194 merged_run_data = std::make_shared<GRunDataCollection>(goptions, std::move(merged_header));
195 }
196
197 merged_run_data->merge(*worker_run_data);
198 }
199
200 // Publish the merged run-level payload once, after all workers have contributed.
201 if (merged_run_data != nullptr) {
202 publish_run_data(merged_run_data);
203 }
204
205 if (gstreamer_run_map == nullptr) {
206 log->error(ERR_STREAMERMAP_NOT_EXISTING, FUNCTION_NAME,
207 " gstreamer_map is null in master thread - cannot close connections.");
208 }
209
210 for (const auto &[name, gstreamer]: *gstreamer_run_map) {
211 log->info(2, FUNCTION_NAME, " ", what_am_i, " for run ", runNumber,
212 " closing connection for gstreamer ", name);
213
214 if (gstreamer == nullptr) {
216 "Null GStreamer entry ", name, " in master thread");
217 }
218
219 if (!gstreamer->closeConnection()) {
220 log->error(1, "Failed to close connection for GStreamer ", name, " in master thread");
221 }
222 }
223 }
224
225}
226
227// Move this worker's completed run-data object into the protected static pool
228// so it can later be collected by the master thread.
229void GRunAction::stash_worker_run_data() {
230 if (run_data == nullptr) {
231 return;
232 }
233
234 std::scoped_lock lock(completed_run_data_mutex);
235 completed_worker_run_data.emplace_back(std::move(run_data));
236}
237
238// Extract and clear the protected pool of completed worker run-data objects.
239auto GRunAction::take_completed_worker_run_data() -> CompletedRunData {
240 std::scoped_lock lock(completed_run_data_mutex);
241
242 auto result = std::move(completed_worker_run_data);
243 completed_worker_run_data.clear();
244
245 return result;
246}
247
248
249// Publish the merged run-level payload to every configured master-side run streamer.
250void GRunAction::publish_run_data(const std::shared_ptr<GRunDataCollection> &run_data_collaction) const {
251 if (run_data_collaction == nullptr) {
252 log->error(ERR_GRUNACTION_NOT_EXISTING, FUNCTION_NAME,
253 " run_data is null - cannot publish merged run data.");
254 }
255
256 if (gstreamer_run_map == nullptr) {
257 log->error(ERR_STREAMERMAP_NOT_EXISTING, FUNCTION_NAME,
258 " no run streamer map available - run data will not be published.");
259 }
260
261 // Normalize once, before publishing: normalize_run_data() mutates run_data_collaction
262 // in place and is NOT idempotent, so running it per streamer would divide the run
263 // observables by events_processed once for every configured run streamer.
264 normalize_run_data(run_data_collaction);
265
266 for (const auto &[name, gstreamer]: *gstreamer_run_map) {
267 if (gstreamer == nullptr) {
268 log->error(ERR_STREAMERMAP_NOT_EXISTING, FUNCTION_NAME,
269 " null gstreamer instance for run streamer ", name);
270 }
271
272 gstreamer->publishRunData(run_data_collaction);
273 }
274}
275
276
277void GRunAction::normalize_run_data(const std::shared_ptr<GRunDataCollection> &run_data_collaction) const {
278 if (run_data_collaction == nullptr) {
279 log->error(ERR_GRUNACTION_NOT_EXISTING, FUNCTION_NAME,
280 " run_data_collaction is null - cannot normalize run data.");
281 }
282
283 const int events_processed = run_data_collaction->get_events_processed();
284 if (events_processed <= 0) {
285 log->warning(FUNCTION_NAME,
286 " events_processed is ", events_processed,
287 " - skipping normalization.");
288 return;
289 }
290
291 const double norm = static_cast<double>(events_processed);
292
293 for (auto &[sdName, dataCollection]: run_data_collaction->getMutableDataCollectionMap()) {
294 if (dataCollection == nullptr) {
295 log->warning(FUNCTION_NAME,
296 " detector ", sdName,
297 " has null data collection - skipping.");
298 continue;
299 }
300
301 auto &digitizedData = dataCollection->getMutableDigitizedData();
302 if (digitizedData.empty() || digitizedData.front() == nullptr) {
303 continue;
304 }
305
306 auto &digitized = digitizedData.front();
307 // we are in a const method so we can't loop directky over to_normalize[sdName]
308 // cause this call could modify the map!
309
310 auto it = to_normalize.find(sdName);
311 if (it != to_normalize.end()) {
312 for (const auto &varName: it->second) {
313 const auto intVars = digitized->getIntObservablesMap(0);
314 const auto intIt = intVars.find(varName);
315 if (intIt != intVars.end()) {
316 digitized->includeVariable(varName, static_cast<double>(intIt->second) / norm);
317 continue;
318 }
319
320 const auto dblVars = digitized->getDblObservablesMap(0);
321 const auto dblIt = dblVars.find(varName);
322 if (dblIt != dblVars.end()) {
323 digitized->includeVariable(varName, dblIt->second / norm);
324 }
325 }
326 }
327 }
328}
329
330
331// (Legacy/experimental streaming logic remains commented out below.)
332
333// TODO: 2 more is too much we need some calculation here
334// int nFramesToCreate = neventsThisRun * eventDuration / frameDuration + 2;
335
336// if (stream) {
337// if (frameStreamVerbosity >= GVERBOSITY_SUMMARY) {
338// cout << SROLOGHEADER << " current nframes in the buffer: " << frameRunData.size() << ", new frames to create: " << nFramesToCreate;
339// cout << ", last frame id created: " << lastFrameCreated << endl;
340// }
341//
342// for (int f = lastFrameCreated; f < lastFrameCreated + nFramesToCreate; f++) {
343// GFrameDataCollectionHeader* gframeHeader = new GFrameDataCollectionHeader(f + 1, frameDuration, verbosity);
344// GFrameDataCollection* frameData = new GFrameDataCollection(gframeHeader, verbosity);
345// frameRunData.push_back(frameData);
346// }
347//
348// lastFrameCreated += nFramesToCreate;
349// if (frameStreamVerbosity >= GVERBOSITY_SUMMARY) {
350// cout << SROLOGHEADER << nFramesToCreate << " new frames, buffer size is now " << frameRunData.size();
351// cout << ", last frame id created: " << lastFrameCreated << endl;
352// }
353// }
354
355
356// looping over run data and filling frameRunData
357// need to remember last event number here
358// if (stream) {
359// for (auto eventDataCollection : theRun->getRunData()) {
360// int absoluteEventNumber = eventIndex + eventDataCollection->getEventNumber();
361//
362// // filling frameRunData with this eventDataCollection
363// for (auto [detectorName, gdataCollection] : *eventDataCollection->getDataCollectionMap()) {
364// for (auto hitDigitizedData : *gdataCollection->getDigitizedData()) {
365// int timeAtelectronic = hitDigitizedData->getTimeAtElectronics();
366// if (timeAtelectronic != TIMEATELECTRONICSNOTDEFINED) {
367// int frameIndex = eventFrameIndex(absoluteEventNumber, timeAtelectronic);
368// frameRunData[frameIndex]->addIntegralPayload(formPayload(hitDigitizedData), verbosity);
369// }
370// }
371// }
372// }
373// }
374
375
376// now flushing all frames past eventIndex
377
378// if (stream) {
379// // updating eventIndex
380// eventIndex += neventsThisRun;
381//
382// for (auto [factoryName, streamerFactory] : *gstreamerFactoryMap) {
383// if (streamerFactory->getStreamType() == "stream" && frameRunData.size() > 0) {
384// // need to look for additional frame to flush
385// int nFramesToFlush = nFramesToCreate - 2;
386//
387// if (frameStreamVerbosity >= GVERBOSITY_SUMMARY) { cout << SROLOGHEADER << "number of frames to flush: " << nFramesToFlush << endl; }
388// for (auto fid = 0; fid < nFramesToFlush; fid++) {
389// logSummary("Streaming frame id <" + to_string(frameRunData.front()->getFrameID()) + " using streamer factory >" + factoryName + "<");
390// streamerFactory->publishFrameRunData(goptions, frameRunData.front());
391// delete frameRunData.front();
392// frameRunData.erase(frameRunData.begin());
393// }
394// }
395// }
396// }
397
398
399// determine the frame ID based on event number, eventDuration, frameDuration and number of threads
400// add frameData to frameRunData if it's not present
401// int GRunAction::eventFrameIndex(int eventNumber, double timeAtElectronics) {
402// int absoluteHitTime = eventNumber * eventDuration + timeAtElectronics;
403// int frameID = absoluteHitTime / frameDuration + 1;
404// int frameIndex = -1;
405//
406// // cout << "eventNumber: " << eventNumber << ", absoluteHitTime: " << absoluteHitTime << ", frameID: " << frameID << endl;
407//
408// for (size_t f = 0; f < frameRunData.size(); f++) { if (frameRunData[f]->getFrameID() == frameID) { frameIndex = (int)f; } }
409// // cout << "eventNumber: " << eventNumber << ", absoluteHitTime: " << absoluteHitTime << ", frameIndex: " << frameIndex << endl;
410//
411// return frameIndex;
412// }
413
414// vector<int> GRunAction::formPayload(GDigitizedData* digitizedData) {
415// vector<int> payload;
416//
417// int crate = digitizedData->getIntObservable(CRATESTRINGID);
418// int slot = digitizedData->getIntObservable(SLOTSTRINGID);
419// int channel = digitizedData->getIntObservable(CHANNELSTRINGID);
420// int q = digitizedData->getIntObservable(CHARGEATELECTRONICS);
421// int time = digitizedData->getIntObservable(TIMEATELECTRONICS);
422//
423// payload.push_back(crate);
424// payload.push_back(slot);
425// payload.push_back(channel);
426// payload.push_back(q);
427// payload.push_back(time);
428//
429// return payload;
430// }
431//
432// bool GRunAction::findFrameID(int fid) {
433// for (auto frame : frameRunData) { if (frame->getFrameID() == fid) { return true; } }
434// return false;
435// }
std::shared_ptr< GLogger > log
void warning(Args &&... args) const
void debug(debug_type type, Args &&... args) const
void info(int level, Args &&... args) const
void error(int exit_code, Args &&... args) const
GRunAction(std::shared_ptr< GOptions > gopts, std::shared_ptr< gdynamicdigitization::dRoutinesMap > digi_map)
Constructs the run action.
Definition gRunAction.cc:16
Thread-local run object created by the GEMC run action.
Definition gRun.h:54
Declares GRunAction, the run-lifecycle action for the GEMC actions module.
constexpr const char * GRUNACTION_LOGGER
Definition gRunAction.h:25
Declares GRun, the thread-local run container used by the GEMC actions module.
Defines error codes used by the GEMC actions module.
#define ERR_GDIGIMAP_NOT_EXISTING
#define ERR_GRUNACTION_NOT_EXISTING
#define ERR_STREAMERMAP_NOT_EXISTING
run
#define FUNCTION_NAME
CONSTRUCTOR
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id=-1)
vector< GStreamerDefinition > getGStreamerDefinition(const std::shared_ptr< GOptions > &gopts)