69 const std::shared_ptr<GLogger>& log,
70 const std::shared_ptr<const gdynamicdigitization::dRoutinesMap>& dynamicRoutinesMap,
71 const std::shared_ptr<GOptions>& gopts) {
73 std::atomic<int> next{1};
76 std::vector<jthread_alias> pool;
77 pool.reserve(nthreads);
79 for (
int tid = 0; tid < nthreads; ++tid) {
80 pool.emplace_back([&, tid]
82 log->info(0,
"worker ", tid,
" started");
90 for (
auto& [name,
gstreamer] : *gstreamer_map) {
92 log->error(1,
"Failed to open connection for GStreamer ", name,
" in thread ", tid);
98 int evn = next.fetch_add(1, std::memory_order_relaxed);
99 if (evn > nevents)
break;
103 auto eventData = std::make_shared<GEventDataCollection>(gopts, std::move(gevent_header));
106 for (
unsigned i = 1; i < 11; i++) {
108 auto true_data = dynamicRoutinesMap->at(
plugin_name)->collectTrueInformation(hit, i);
109 auto digi_data = dynamicRoutinesMap->at(
plugin_name)->digitizeHit(hit, i);
111 eventData->addDetectorDigitizedData(
"ctof", std::move(digi_data));
112 eventData->addDetectorTrueInfoData(
"ctof", std::move(true_data));
115 log->info(0,
"worker ", tid,
" event ", evn,
" has ",
116 eventData->getDataCollectionMap().at(
"ctof")->getDigitizedData().size(),
120 for (
const auto& [name,
gstreamer] : *gstreamer_map) {
128 for (
const auto& [name,
gstreamer] : *gstreamer_map) {
130 log->error(1,
"Failed to close connection for GStreamer ", name,
" in thread ", tid);
134 log->info(0,
"worker ", tid,
" processed ", localCount,
" events");
void run_simulation_in_threads(int nevents, int nthreads, const std::shared_ptr< GLogger > &log, const std::shared_ptr< const gdynamicdigitization::dRoutinesMap > &dynamicRoutinesMap, const std::shared_ptr< GOptions > &gopts)
Run a synthetic multithreaded event loop and publish the results through configured streamers.