gstreamer
Loading...
Searching...
No Matches
gstreamer_example.cc
Go to the documentation of this file.
1// gstreamer
2#include "gstreamer.h"
3
4// gemc
5#include "glogger.h"
7#include "gutilities.h"
8#include "gthreads.h"
9
10// c++
11#include <atomic>
12#include <ranges>
13#include <vector>
14#include <memory>
15#include <unordered_map>
16
39const std::string plugin_name = "test_gdynamic_plugin";
40
68 int nthreads,
69 const std::shared_ptr<GLogger>& log,
70 const std::shared_ptr<const gdynamicdigitization::dRoutinesMap>& dynamicRoutinesMap,
71 const std::shared_ptr<GOptions>& gopts) {
72 // Shared atomic event counter used to distribute work without external locking.
73 std::atomic<int> next{1};
74
75 // Thread pool. Each worker owns its local streamer map and processes an arbitrary number of events.
76 std::vector<jthread_alias> pool;
77 pool.reserve(nthreads);
78
79 for (int tid = 0; tid < nthreads; ++tid) {
80 pool.emplace_back([&, tid]
81 {
82 log->info(0, "worker ", tid, " started");
83
84 int localCount = 0;
85
86 // Create all configured streamers for this worker thread.
87 auto gstreamer_map = gstreamer::gstreamersMapPtr(gopts, tid);
88
89 // Open every backend connection before entering the event loop.
90 for (auto& [name, gstreamer] : *gstreamer_map) {
91 if (!gstreamer->openConnection()) {
92 log->error(1, "Failed to open connection for GStreamer ", name, " in thread ", tid);
93 }
94 }
95
96 while (true) {
97 // Claim the next event number atomically.
98 int evn = next.fetch_add(1, std::memory_order_relaxed);
99 if (evn > nevents) break;
100
101 // Build one event collection with a thread-specific header.
102 auto gevent_header = GEventHeader::create(gopts, tid);
103 auto eventData = std::make_shared<GEventDataCollection>(gopts, std::move(gevent_header));
104
105 // Populate a small synthetic detector dataset.
106 for (unsigned i = 1; i < 11; i++) {
107 auto hit = GHit::create(gopts);
108 auto true_data = dynamicRoutinesMap->at(plugin_name)->collectTrueInformation(hit, i);
109 auto digi_data = dynamicRoutinesMap->at(plugin_name)->digitizeHit(hit, i);
110
111 eventData->addDetectorDigitizedData("ctof", std::move(digi_data));
112 eventData->addDetectorTrueInfoData("ctof", std::move(true_data));
113 }
114
115 log->info(0, "worker ", tid, " event ", evn, " has ",
116 eventData->getDataCollectionMap().at("ctof")->getDigitizedData().size(),
117 " digitized hits");
118
119 // Publish the event to each configured streamer instance.
120 for (const auto& [name, gstreamer] : *gstreamer_map) {
121 gstreamer->publishEventData(eventData);
122 }
123
124 ++localCount;
125 }
126
127 // Close all output backends. This also flushes any buffered events.
128 for (const auto& [name, gstreamer] : *gstreamer_map) {
129 if (!gstreamer->closeConnection()) {
130 log->error(1, "Failed to close connection for GStreamer ", name, " in thread ", tid);
131 }
132 }
133
134 log->info(0, "worker ", tid, " processed ", localCount, " events");
135 });
136 }
137}
138
152int main(int argc, char* argv[]) {
153 // Build the module option set and parse the command line.
154 auto gopts = std::make_shared<GOptions>(argc, argv, gstreamer::defineOptions());
155
156 // Create a logger scoped to this example.
157 auto log = std::make_shared<GLogger>(gopts, SFUNCTION_NAME, GSTREAMER_LOGGER);
158
159 constexpr int nevents = 200;
160 constexpr int nthreads = 4;
161
162 // Load the dynamic digitization plugin used to generate synthetic detector content.
163 auto dynamicRoutinesMap = gdynamicdigitization::dynamicRoutinesMap({plugin_name}, gopts);
164 if (dynamicRoutinesMap->at(plugin_name)->loadConstants(1, "default") == false) {
165 log->error(1, "Failed to load constants for dynamic routine", plugin_name,
166 "for run number 1 with variation 'default'.");
167 }
168
169 run_simulation_in_threads(nevents, nthreads, log, dynamicRoutinesMap, gopts);
170
171 return EXIT_SUCCESS;
172}
static std::unique_ptr< GEventHeader > create(const std::shared_ptr< GOptions > &gopts, int tid=-1)
static GHit * create(const std::shared_ptr< GOptions > &gopts)
#define SFUNCTION_NAME
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id=-1)
Create a per-thread map of configured streamer instances.
Definition gstreamer.h:581
GOptions defineOptions()
Define the options contributed by the gstreamer module.
Core streaming interface and helper utilities for the gstreamer module.
const std::string plugin_name
void run_simulation_in_threads(int nevents, int nthreads, const std::shared_ptr< GLogger > &log, const std::shared_ptr< const gdynamicdigitization::dRoutinesMap > &dynamicRoutinesMap, const std::shared_ptr< GOptions > &gopts)
Run a synthetic multithreaded event loop and publish the results through configured streamers.
constexpr const char * GSTREAMER_LOGGER
Logger category name used by gstreamer components.
std::shared_ptr< const dRoutinesMap > dynamicRoutinesMap(const std::vector< std::string > &plugin_names, const std::shared_ptr< GOptions > &gopts)