gstreamer
Loading...
Searching...
No Matches
gstreamer.h
Go to the documentation of this file.
1#pragma once
2
3// gstreamer
4#include "gstreamer_options.h"
6
7// gemc
10#include "gfactory.h"
11#include "gbase.h"
12
13// c++
14#include <string>
15#include <vector>
16#include <map>
17
65class GStreamer : public GBase<GStreamer>
66{
67public:
72 explicit GStreamer(const std::shared_ptr<GOptions>& g) : GBase(g, GSTREAMER_LOGGER) {
73 }
74
81 virtual ~GStreamer() = default;
82
89 [[nodiscard]] virtual bool openConnection() { return false; }
90
99 [[nodiscard]] bool closeConnection() {
101 return closeConnectionImpl();
102 }
103
110 [[nodiscard]] virtual bool closeConnectionImpl() { return false; }
111
120 void publishEventData(const std::shared_ptr<GEventDataCollection>& event_data);
121
122 // runs the protected virtual methods below to write frames from a run to file
123 // void publishFrameRunData(const std::shared_ptr<GFrameDataCollection>& frameRunData);
124
131 [[nodiscard]] inline std::string getStreamType() const { return gstreamer_definitions.type; }
132
142 inline void define_gstreamer(const GStreamerDefinition& gstreamerDefinition, int tid = -1) {
143 gstreamer_definitions = GStreamerDefinition(gstreamerDefinition, tid);
144 }
145
152 static const std::vector<std::string>& supported_formats();
153
159 static bool is_valid_format(const std::string& format);
160
167 void set_loggers(const std::shared_ptr<GOptions>& g) {
168 bufferFlushLimit = g->getScalarInt("ebuffer");
169 }
170
171protected:
174
175 // Event virtual methods called during buffer flushing, in order.
176 // Each hook returns a bool for "success/failure" to support uniform logging and diagnostics.
177
187 [[nodiscard]] bool startEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
188 if (!event_data) { log->error(ERR_PUBLISH_ERROR, "eventData is null in GStreamer::startEvent"); }
189 if (!event_data->getHeader()) {
190 log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::startEvent");
191 }
192
193 log->debug(NORMAL, "GStreamer::startEvent");
194 return startEventImpl(event_data);
195 }
196
202 virtual bool startEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
203 return false;
204 }
205
215 [[nodiscard]] bool publishEventHeader([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
216 if (!gevent_header) { log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::publishEventHeader"); }
217 log->debug(NORMAL, "GStreamer::publishEventHeader");
218 return publishEventHeaderImpl(gevent_header);
219 }
220
226 virtual bool publishEventHeaderImpl([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
227 return false;
228 }
229
240 [[nodiscard]] bool publishEventTrueInfoData([[maybe_unused]] const std::string& detectorName,
241 [[maybe_unused]] const std::vector<const GTrueInfoData*>&
242 trueInfoData) {
243 log->debug(NORMAL, "GStreamer::publishEventTrueInfoData for detector ", detectorName);
244 return publishEventTrueInfoDataImpl(detectorName, trueInfoData);
245 }
246
253 virtual bool publishEventTrueInfoDataImpl([[maybe_unused]] const std::string& detectorName,
254 [[maybe_unused]] const std::vector<const GTrueInfoData*>& trueInfoData) {
255 return false;
256 }
257
258
269 [[nodiscard]] bool publishEventDigitizedData([[maybe_unused]] const std::string& detectorName,
270 [[maybe_unused]] const std::vector<const GDigitizedData*>&
271 digitizedData) {
272 log->debug(NORMAL, "GStreamer::publishEventDigitizedData for detector ", detectorName);
273 return publishEventDigitizedDataImpl(detectorName, digitizedData);
274 }
275
282 virtual bool publishEventDigitizedDataImpl([[maybe_unused]] const std::string& detectorName,
283 [[maybe_unused]] const std::vector<const GDigitizedData*>&
284 digitizedData) { return false; }
285
286
292 [[nodiscard]] bool endEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
293 log->debug(NORMAL, "GStreamer::endEvent");
294 return endEventImpl(event_data);
295 }
296
302 virtual bool endEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
303 return false;
304 }
305
306 // Frame stream virtual methods.
307 // These hooks are provided for plugins that serialize frame-based data.
308 // The base class flushes event buffers before starting a frame stream.
309
319 [[nodiscard]] bool startStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
321 log->debug(NORMAL, "GStreamer::startStream");
322 return startStreamImpl(frameRunData);
323 }
324
330 virtual bool startStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
331
337 [[nodiscard]] bool publishFrameHeader([[maybe_unused]] const GFrameHeader* gframeHeader) {
338 log->debug(NORMAL, "GStreamer::publishFrameHeader");
339 return publishFrameHeaderImpl(gframeHeader);
340 }
341
347 virtual bool publishFrameHeaderImpl([[maybe_unused]] const GFrameHeader* gframeHeader) { return false; }
348
354 [[nodiscard]] bool publishPayload([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) {
355 log->debug(NORMAL, "GStreamer::publishPayload");
356 return publishPayloadImpl(payload);
357 }
358
364 virtual bool publishPayloadImpl([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) { return false; }
365
371 [[nodiscard]] bool endStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
372 log->debug(NORMAL, "GStreamer::endStream");
373 return endStreamImpl(frameRunData);
374 }
375
381 virtual bool endStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
382
395 void flushEventBuffer();
396
397private:
406 [[nodiscard]] virtual std::string filename() const = 0; // must be implemented in derived classes
407
409 std::vector<std::shared_ptr<GEventDataCollection>> eventBuffer;
410
412 size_t bufferFlushLimit = 10; // default; can be overridden
413
414public:
422 static GStreamer* instantiate(const dlhandle h, std::shared_ptr<GOptions> g) {
423 if (!h) return nullptr;
424 using fptr = GStreamer* (*)(std::shared_ptr<GOptions>);
425
426 // Must match the extern "C" declaration in the derived factories.
427 auto sym = dlsym(h, "GStreamerFactory");
428 if (!sym) return nullptr;
429
430 auto func = reinterpret_cast<fptr>(sym);
431 return func(g);
432 }
433};
434
435
436namespace gstreamer {
437using gstreamersMap = std::unordered_map<std::string, std::shared_ptr<GStreamer>>;
438
456inline std::shared_ptr<const gstreamersMap> gstreamersMapPtr(const std::shared_ptr<GOptions>& gopts,
457 int thread_id) {
458 auto log = std::make_shared<GLogger>(gopts, "gstreamersMap worker for thread id" + std::to_string(thread_id),
460
461 GManager manager(gopts);
462
463 auto gstreamers = std::make_shared<gstreamersMap>();
464
465 for (const auto& gstreamer_def : gstreamer::getGStreamerDefinition(gopts)) {
466 auto gstreamer_def_thread = GStreamerDefinition(gstreamer_def, thread_id);
467 std::string gstreamer_plugin = gstreamer_def_thread.gstreamerPluginName();
468
469 // Load and register the streamer plugin. The loader returns a shared_ptr<GStreamer>.
470 auto streamer = manager.LoadAndRegisterObjectFromLibrary<GStreamer>(gstreamer_plugin, gopts);
471 gstreamers->emplace(gstreamer_plugin, streamer);
472
473 // Bind the per-thread definition (in particular the per-thread filename) to the streamer instance.
474 gstreamers->at(gstreamer_plugin)->define_gstreamer(gstreamer_def_thread);
475
476 // Connection opening is intentionally not performed here. This is typically done by the caller
477 // to control error handling and output lifetime explicitly.
478 // Example:
479 // if (!gstreamers->at(gstreamer_plugin)->openConnection()) {
480 // log->error(1, "Failed to open connection for GStreamer ", gstreamer_plugin, " in thread ", gstreamer_def_thread.tid);
481 // }
482 }
483
484 return gstreamers;
485}
486} // namespace gstreamer
std::shared_ptr< GLogger > log
void debug(debug_type type, Args &&... args) const
void error(int exit_code, Args &&... args) const
std::shared_ptr< T > LoadAndRegisterObjectFromLibrary(std::string_view name, const std::shared_ptr< GOptions > &gopts)
Abstract base class for streaming GEMC event or frame data to output media.
Definition gstreamer.h:66
virtual bool publishPayloadImpl(const std::vector< GIntegralPayload * > *payload)
Implementation hook for publishing a frame payload.
Definition gstreamer.h:364
bool publishFrameHeader(const GFrameHeader *gframeHeader)
Publish a frame header.
Definition gstreamer.h:337
static const std::vector< std::string > & supported_formats()
Return the list of supported output formats.
Definition gstreamer.cc:8
virtual bool startStreamImpl(const GFrameDataCollection *frameRunData)
Implementation hook for beginning a frame stream publish sequence.
Definition gstreamer.h:330
bool publishEventDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish digitized hits for one detector.
Definition gstreamer.h:269
std::string getStreamType() const
Return the semantic stream type for this streamer.
Definition gstreamer.h:131
bool startStream(const GFrameDataCollection *frameRunData)
Begin a frame stream publish sequence.
Definition gstreamer.h:319
virtual ~GStreamer()=default
Virtual destructor.
void publishEventData(const std::shared_ptr< GEventDataCollection > &event_data)
Buffer an event for later serialization.
Definition gstreamer.cc:22
virtual bool closeConnectionImpl()
Implementation hook for closing the output medium.
Definition gstreamer.h:110
virtual bool publishEventTrueInfoDataImpl(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Implementation hook for publishing true info hits for one detector.
Definition gstreamer.h:253
virtual bool endStreamImpl(const GFrameDataCollection *frameRunData)
Implementation hook for ending a frame stream publish sequence.
Definition gstreamer.h:381
void flushEventBuffer()
Flush the internal event buffer, writing all buffered events to the output medium.
Definition gstreamer.cc:34
static bool is_valid_format(const std::string &format)
Check whether a format token is supported.
Definition gstreamer.cc:14
bool endEvent(const std::shared_ptr< GEventDataCollection > &event_data)
End an event publish sequence.
Definition gstreamer.h:292
bool publishEventHeader(const std::unique_ptr< GEventHeader > &gevent_header)
Publish the event header.
Definition gstreamer.h:215
void define_gstreamer(const GStreamerDefinition &gstreamerDefinition, int tid=-1)
Assign the output definition used by this streamer instance.
Definition gstreamer.h:142
GStreamer(const std::shared_ptr< GOptions > &g)
Construct a streamer and bind it to module logging.
Definition gstreamer.h:72
GStreamerDefinition gstreamer_definitions
Output definition used by this streamer (format, base name, type, thread id).
Definition gstreamer.h:173
virtual bool endEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Implementation hook for ending an event publish sequence.
Definition gstreamer.h:302
virtual bool publishFrameHeaderImpl(const GFrameHeader *gframeHeader)
Implementation hook for publishing a frame header.
Definition gstreamer.h:347
bool endStream(const GFrameDataCollection *frameRunData)
End a frame stream publish sequence.
Definition gstreamer.h:371
virtual bool publishEventHeaderImpl(const std::unique_ptr< GEventHeader > &gevent_header)
Implementation hook for publishing the event header.
Definition gstreamer.h:226
bool startEvent(const std::shared_ptr< GEventDataCollection > &event_data)
Begin an event publish sequence.
Definition gstreamer.h:187
virtual bool startEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Implementation hook for beginning an event publish sequence.
Definition gstreamer.h:202
static GStreamer * instantiate(const dlhandle h, std::shared_ptr< GOptions > g)
Instantiate a streamer plugin by resolving the GStreamerFactory symbol from a dynamic library.
Definition gstreamer.h:422
virtual bool openConnection()
Open the output medium (file, socket, etc.).
Definition gstreamer.h:89
bool publishPayload(const std::vector< GIntegralPayload * > *payload)
Publish a frame payload.
Definition gstreamer.h:354
bool publishEventTrueInfoData(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Publish true (MC) information hits for one detector.
Definition gstreamer.h:240
virtual bool publishEventDigitizedDataImpl(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Implementation hook for publishing digitized hits for one detector.
Definition gstreamer.h:282
bool closeConnection()
Close the output medium after flushing any buffered events.
Definition gstreamer.h:99
void set_loggers(const std::shared_ptr< GOptions > &g)
Configure streamer settings derived from options.
Definition gstreamer.h:167
void * dlhandle
NORMAL
Shared constants and error codes for the gstreamer module.
#define ERR_PUBLISH_ERROR
Generic publish-time error (null pointers, invalid state).
Option and configuration helpers for the gstreamer module.
constexpr const char * GSTREAMER_LOGGER
Logger category name used by gstreamer components.
vector< GStreamerDefinition > getGStreamerDefinition(const std::shared_ptr< GOptions > &gopts)
Parse gstreamer output definitions from options.
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id)
Create a per-thread map of streamer instances based on configured outputs.
Definition gstreamer.h:456
std::unordered_map< std::string, std::shared_ptr< GStreamer > > gstreamersMap
Definition gstreamer.h:437
Utility struct describing one configured output for the gstreamer module.
std::string type
Semantic output type token (e.g. "event" or "stream").