gstreamer
Loading...
Searching...
No Matches
gstreamer_example.cc
Go to the documentation of this file.
1// gstreamer
2#include "gstreamer.h"
3
4// gemc
5#include "glogger.h"
7#include "gutilities.h"
8#include "gthreads.h"
9
10// c++
11#include <atomic> // std::atomic<T>: lock-free, thread-safe integers, flags…
12#include <ranges> // std::views::iota – range of integers 0,1,…,n-1
13#include <vector>
14#include <memory> // smart pointers
15#include <unordered_map>
16
30const std::string plugin_name = "test_gdynamic_plugin";
31
48 int nthreads,
49 const std::shared_ptr<GLogger>& log,
50 const std::shared_ptr<const gdynamicdigitization::dRoutinesMap>& dynamicRoutinesMap,
51 const std::shared_ptr<GOptions>& gopts) {
52 // Thread-safe integer counter starts at 1.
53 // fetch_add returns the old value *and* bumps.
54 // Zero contention: each thread fetches the next free event number.
55 std::atomic<int> next{1};
56
57 // Pool of jthreads. jthread joins in its destructor so we don’t need an
58 // explicit loop at the end.
59 // Each element represents one worker thread running your event-processing lambda.
60 // std::vector<std::jthread> pool; use this when C++20 is widely available.
61 std::vector<jthread_alias> pool; // was std::vector<std::jthread>
62
63 pool.reserve(nthreads);
64
65 for (int tid = 0; tid < nthreads; ++tid) {
66 // The capture [&, tid] gives the thread references to variables like tid
67 pool.emplace_back([&, tid] // capture tid *by value*
68 {
69 // Start thread with a lambda.
70 log->info(0, "worker ", tid, " started");
71
72 int localCount = 0; // events built by *this* worker
73 thread_local std::vector<std::unique_ptr<GEventDataCollection>> localRunData;
74
75 // Create one streamer map for this thread and open all output connections.
76 auto gstreamer_map = gstreamer::gstreamersMapPtr(gopts, tid);
77 for (auto& [name, gstreamer] : *gstreamer_map) {
78 if (!gstreamer->openConnection()) {
79 log->error(1, "Failed to open connection for GStreamer ", name, " in thread ", tid);
80 }
81 }
82
83 while (true) {
84 // Repeatedly asks the shared atomic counter for “the next unclaimed event
85 // number,” processes that event, stores the result, and goes back for more.
86 // memory_order_relaxed: we only need *atomicity*, no ordering.
87 int evn = next.fetch_add(1, std::memory_order_relaxed);
88 // atomically returns the current value and increments it by 1.
89 if (evn > nevents) break; // exit the while loop
90
91 // Create an event header and event container for this thread.
92 auto gevent_header = GEventHeader::create(gopts, tid);
93 auto eventData = std::make_shared<GEventDataCollection>(gopts, std::move(gevent_header));
94
95 // Create a small synthetic detector dataset:
96 // each event has 10 hits, and each hit is converted into true and digitized data.
97 for (unsigned i = 1; i < 11; i++) {
98 auto hit = GHit::create(gopts);
99 auto true_data = dynamicRoutinesMap->at(plugin_name)->collectTrueInformation(hit, i);
100 auto digi_data = dynamicRoutinesMap->at(plugin_name)->digitizeHit(hit, i);
101
102 eventData->addDetectorDigitizedData("ctof", std::move(digi_data));
103 eventData->addDetectorTrueInfoData("ctof", std::move(true_data));
104 }
105
106 log->info(0, "worker ", tid, " event ", evn, " has ",
107 eventData->getDataCollectionMap().at("ctof")->getDigitizedData().size(), " digitized hits");
108
109 // Publish the event to each configured streamer.
110 // The streamer may buffer and flush based on its configured ebuffer value.
111 for (const auto& [name, gstreamer] : *gstreamer_map) {
112 gstreamer->publishEventData(eventData);
113 }
114
115 ++localCount; // tally for this worker
116 }
117
118 // Close streamer connections.
119 // Close implies a flush of any remaining buffered events.
120 for (const auto& [name, gstreamer] : *gstreamer_map) {
121 if (!gstreamer->closeConnection()) {
122 log->error(1, "Failed to close connection for GStreamer ", name, " in thread ", tid);
123 }
124 }
125
126 log->info(0, "worker ", tid, " processed ", localCount, " events");
127 }); // jthread constructor launches the thread immediately
128 } // pool’s destructor blocks until every jthread has joined
129}
130
131
132// emulation of a run of events, collecting and publish data in separate threads
133int main(int argc, char* argv[]) {
134 // Create GOptions using gstreamer::defineOptions, which aggregates options from gstreamer and gdynamicdigitization.
135 auto gopts = std::make_shared<GOptions>(argc, argv, gstreamer::defineOptions());
136
137 // Create a module logger for this example.
138 auto log = std::make_shared<GLogger>(gopts, SFUNCTION_NAME, GSTREAMER_LOGGER);
139
140 constexpr int nevents = 200;
141 constexpr int nthreads = 4;
142
143 // Load dynamic digitization routines and their constants.
144 auto dynamicRoutinesMap = gdynamicdigitization::dynamicRoutinesMap({plugin_name}, gopts);
145 if (dynamicRoutinesMap->at(plugin_name)->loadConstants(1, "default") == false) {
146 log->error(1, "Failed to load constants for dynamic routine", plugin_name,
147 "for run number 1 with variation 'default'.");
148 }
149
150 run_simulation_in_threads(nevents, nthreads, log, dynamicRoutinesMap, gopts);
151
152 return EXIT_SUCCESS;
153}
static std::unique_ptr< GEventHeader > create(const std::shared_ptr< GOptions > &gopts, int tid=-1)
static GHit * create(const std::shared_ptr< GOptions > &gopts)
#define SFUNCTION_NAME
Core streaming interface for gstreamer output plugins.
const std::string plugin_name
void run_simulation_in_threads(int nevents, int nthreads, const std::shared_ptr< GLogger > &log, const std::shared_ptr< const gdynamicdigitization::dRoutinesMap > &dynamicRoutinesMap, const std::shared_ptr< GOptions > &gopts)
Run a synthetic event simulation in multiple worker threads and publish results via gstreamer.
constexpr const char * GSTREAMER_LOGGER
Logger category name used by gstreamer components.
std::shared_ptr< const dRoutinesMap > dynamicRoutinesMap(const std::vector< std::string > &plugin_names, const std::shared_ptr< GOptions > &gopts)
GOptions defineOptions()
Contribute gstreamer options to the global option set.
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id)
Create a per-thread map of streamer instances based on configured outputs.
Definition gstreamer.h:456