gstreamer
Loading...
Searching...
No Matches
gstreamer.h
Go to the documentation of this file.
1#pragma once
2
3// gstreamer
4#include "gstreamer_options.h"
6
7// gemc
11#include "gfactory.h"
12#include "gbase.h"
13
14// c++
15#include <string>
16#include <vector>
17#include <map>
18
76class GStreamer : public GBase<GStreamer>
77{
78public:
84 explicit GStreamer(const std::shared_ptr<GOptions>& g) : GBase(g, GSTREAMER_LOGGER) {
85 }
86
93 virtual ~GStreamer() = default;
94
103 [[nodiscard]] virtual bool openConnection() { return false; }
104
113 [[nodiscard]] bool closeConnection() {
115 return closeConnectionImpl();
116 }
117
126 [[nodiscard]] virtual bool closeConnectionImpl() { return false; }
127
137 void publishEventData(const std::shared_ptr<GEventDataCollection>& event_data);
138
147 void publishRunData(const std::shared_ptr<GRunDataCollection>& run_data);
148
157 [[nodiscard]] inline std::string getStreamType() const { return gstreamer_definitions.type; }
158
169 inline void define_gstreamer(const GStreamerDefinition& gstreamerDefinition, int tid = -1) {
170 gstreamer_definitions = GStreamerDefinition(gstreamerDefinition, tid);
171 }
172
181 static const std::vector<std::string>& supported_formats();
182
191 static bool is_valid_format(const std::string& format);
192
200 void set_loggers(const std::shared_ptr<GOptions>& g) {
201 bufferFlushLimit = g->getScalarInt("ebuffer");
202 }
203
204protected:
207
217 [[nodiscard]] bool startEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
218 if (!event_data) { log->error(ERR_PUBLISH_ERROR, "eventData is null in GStreamer::startEvent"); }
219 if (!event_data->getHeader()) {
220 log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::startEvent");
221 }
222
223 log->debug(NORMAL, "GStreamer::startEvent");
224 return startEventImpl(event_data);
225 }
226
233 virtual bool startEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
234 return false;
235 }
236
243 [[nodiscard]] bool endEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
244 log->debug(NORMAL, "GStreamer::endEvent");
245 return endEventImpl(event_data);
246 }
247
254 virtual bool endEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
255 return false;
256 }
257
267 [[nodiscard]] bool publishEventHeader([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
268 if (!gevent_header) { log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::publishEventHeader"); }
269 log->debug(NORMAL, "GStreamer::publishEventHeader");
270 return publishEventHeaderImpl(gevent_header);
271 }
272
279 virtual bool publishEventHeaderImpl([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
280 return false;
281 }
282
293 [[nodiscard]] bool publishEventTrueInfoData([[maybe_unused]] const std::string& detectorName,
294 [[maybe_unused]] const std::vector<const GTrueInfoData*>& trueInfoData) {
295 log->debug(NORMAL, "GStreamer::publishEventTrueInfoData for detector ", detectorName);
296 return publishEventTrueInfoDataImpl(detectorName, trueInfoData);
297 }
298
306 virtual bool publishEventTrueInfoDataImpl([[maybe_unused]] const std::string& detectorName,
307 [[maybe_unused]] const std::vector<const GTrueInfoData*>& trueInfoData) {
308 return false;
309 }
310
321 [[nodiscard]] bool publishEventDigitizedData([[maybe_unused]] const std::string& detectorName,
322 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
323 log->debug(NORMAL, "GStreamer::publishEventDigitizedData for detector ", detectorName);
324 return publishEventDigitizedDataImpl(detectorName, digitizedData);
325 }
326
334 virtual bool publishEventDigitizedDataImpl([[maybe_unused]] const std::string& detectorName,
335 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
336 return false;
337 }
338
345 [[nodiscard]] bool startRun([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
346 if (!run_data) { log->error(ERR_PUBLISH_ERROR, "run_data is null in GStreamer::startRun"); }
347 if (!run_data->getHeader()) {
348 log->error(ERR_PUBLISH_ERROR, "run header is null in GStreamer::startRun");
349 }
350
351 log->debug(NORMAL, "GStreamer::startRun");
352 return startRunImpl(run_data);
353 }
354
361 virtual bool startRunImpl([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
362 return false;
363 }
364
371 [[nodiscard]] bool endRun([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
372 log->debug(NORMAL, "GStreamer::endRun");
373 return endRunImpl(run_data);
374 }
375
382 virtual bool endRunImpl([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
383 return false;
384 }
385
392 bool publishRunHeader([[maybe_unused]] const std::unique_ptr<GRunHeader>& run_header) {
393 log->debug(NORMAL, "GStreamer::publishRunHeader");
394 return publishRunHeaderImpl(run_header);
395 }
396
403 virtual bool publishRunHeaderImpl([[maybe_unused]] const std::unique_ptr<GRunHeader>& run_header) {
404 return false;
405 }
406
414 bool publishRunDigitizedData([[maybe_unused]] const std::string& detectorName,
415 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
416 log->debug(NORMAL, "GStreamer::publishRunDigitizedData for detector ", detectorName);
417 return publishRunDigitizedDataImpl(detectorName, digitizedData);
418 }
419
427 virtual bool publishRunDigitizedDataImpl([[maybe_unused]] const std::string& detectorName,
428 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
429 return false;
430 }
431
441 [[nodiscard]] bool startStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
443 log->debug(NORMAL, "GStreamer::startStream");
444 return startStreamImpl(frameRunData);
445 }
446
453 virtual bool startStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
454
461 [[nodiscard]] bool publishFrameHeader([[maybe_unused]] const GFrameHeader* gframeHeader) {
462 log->debug(NORMAL, "GStreamer::publishFrameHeader");
463 return publishFrameHeaderImpl(gframeHeader);
464 }
465
472 virtual bool publishFrameHeaderImpl([[maybe_unused]] const GFrameHeader* gframeHeader) { return false; }
473
480 [[nodiscard]] bool publishPayload([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) {
481 log->debug(NORMAL, "GStreamer::publishPayload");
482 return publishPayloadImpl(payload);
483 }
484
491 virtual bool publishPayloadImpl([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) { return false; }
492
499 [[nodiscard]] bool endStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
500 log->debug(NORMAL, "GStreamer::endStream");
501 return endStreamImpl(frameRunData);
502 }
503
510 virtual bool endStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
511
519 void flushEventBuffer();
520
521private:
530 [[nodiscard]] virtual std::string filename() const = 0;
531
533 std::vector<std::shared_ptr<GEventDataCollection>> eventBuffer;
534
536 size_t bufferFlushLimit = 10;
537
538public:
549 static GStreamer* instantiate(const dlhandle h, std::shared_ptr<GOptions> g) {
550 if (!h) return nullptr;
551 using fptr = GStreamer* (*)(std::shared_ptr<GOptions>);
552
553 auto sym = dlsym(h, "GStreamerFactory");
554 if (!sym) return nullptr;
555
556 auto func = reinterpret_cast<fptr>(sym);
557 return func(g);
558 }
559};
560
561
562namespace gstreamer {
563
564 using gstreamersMap = std::unordered_map<std::string, std::shared_ptr<GStreamer>>;
565
581 inline std::shared_ptr<const gstreamersMap> gstreamersMapPtr(const std::shared_ptr<GOptions>& gopts,
582 int thread_id = -1) {
583 auto log = std::make_shared<GLogger>(gopts, "gstreamersMap worker for thread id" + std::to_string(thread_id),
585
586 GManager manager(gopts);
587
588 auto gstreamers = std::make_shared<gstreamersMap>();
589
590 for (const auto& gstreamer_def : gstreamer::getGStreamerDefinition(gopts)) {
591 auto gstreamer_def_thread = GStreamerDefinition(gstreamer_def, thread_id);
592 std::string gstreamer_plugin = gstreamer_def_thread.gstreamerPluginName();
593
594 // Load the plugin object for this configured output.
595 auto streamer = manager.LoadAndRegisterObjectFromLibrary<GStreamer>(gstreamer_plugin, gopts);
596 gstreamers->emplace(gstreamer_plugin, streamer);
597
598 // Bind the thread-specialized definition to the plugin instance.
599 gstreamers->at(gstreamer_plugin)->define_gstreamer(gstreamer_def_thread);
600 }
601
602 return gstreamers;
603 }
604
605} // namespace gstreamer
std::shared_ptr< GLogger > log
void debug(debug_type type, Args &&... args) const
void error(int exit_code, Args &&... args) const
std::shared_ptr< T > LoadAndRegisterObjectFromLibrary(std::string_view name, const std::shared_ptr< GOptions > &gopts)
Abstract base class for all gstreamer output plugins.
Definition gstreamer.h:77
virtual bool publishPayloadImpl(const std::vector< GIntegralPayload * > *payload)
Plugin-specific implementation hook for serializing one frame payload.
Definition gstreamer.h:491
bool publishFrameHeader(const GFrameHeader *gframeHeader)
Publish one frame header.
Definition gstreamer.h:461
static const std::vector< std::string > & supported_formats()
Return the list of output format tokens supported by the module.
Definition gstreamer.cc:11
virtual bool startStreamImpl(const GFrameDataCollection *frameRunData)
Plugin-specific implementation hook called at the start of a frame stream record.
Definition gstreamer.h:453
bool publishEventDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish the digitized hit bank for one detector.
Definition gstreamer.h:321
std::string getStreamType() const
Return the semantic stream type associated with this streamer instance.
Definition gstreamer.h:157
bool startStream(const GFrameDataCollection *frameRunData)
Begin publishing one frame stream record.
Definition gstreamer.h:441
virtual ~GStreamer()=default
Virtual destructor.
void publishEventData(const std::shared_ptr< GEventDataCollection > &event_data)
Queue one event for publication.
Definition gstreamer.cc:25
virtual bool closeConnectionImpl()
Plugin-specific close implementation hook.
Definition gstreamer.h:126
bool publishRunDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish run-level digitized data for one detector.
Definition gstreamer.h:414
virtual bool publishEventTrueInfoDataImpl(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Plugin-specific implementation hook for one detector true-information collection.
Definition gstreamer.h:306
bool endRun(const std::shared_ptr< GRunDataCollection > &run_data)
End publishing one run-level collection.
Definition gstreamer.h:371
virtual bool endStreamImpl(const GFrameDataCollection *frameRunData)
Plugin-specific implementation hook called at the end of a frame stream record.
Definition gstreamer.h:510
void flushEventBuffer()
Flush all buffered events to the backend in publish order.
Definition gstreamer.cc:73
static bool is_valid_format(const std::string &format)
Validate whether a format token is supported.
Definition gstreamer.cc:17
bool endEvent(const std::shared_ptr< GEventDataCollection > &event_data)
End publishing one buffered event.
Definition gstreamer.h:243
bool publishEventHeader(const std::unique_ptr< GEventHeader > &gevent_header)
Publish the event header for the current event sequence.
Definition gstreamer.h:267
void define_gstreamer(const GStreamerDefinition &gstreamerDefinition, int tid=-1)
Assign the output definition used by this streamer instance.
Definition gstreamer.h:169
GStreamer(const std::shared_ptr< GOptions > &g)
Construct the streamer base and initialize module logging.
Definition gstreamer.h:84
bool startRun(const std::shared_ptr< GRunDataCollection > &run_data)
Begin publishing one run-level collection.
Definition gstreamer.h:345
void publishRunData(const std::shared_ptr< GRunDataCollection > &run_data)
Publish one run-level data collection immediately.
Definition gstreamer.cc:45
virtual bool publishRunDigitizedDataImpl(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Plugin-specific implementation hook for one detector run-level digitized collection.
Definition gstreamer.h:427
virtual bool publishRunHeaderImpl(const std::unique_ptr< GRunHeader > &run_header)
Plugin-specific implementation hook for serializing one run header.
Definition gstreamer.h:403
GStreamerDefinition gstreamer_definitions
Output definition currently bound to this streamer instance.
Definition gstreamer.h:206
bool publishRunHeader(const std::unique_ptr< GRunHeader > &run_header)
Publish the run header for the current run sequence.
Definition gstreamer.h:392
virtual bool endEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Plugin-specific implementation hook called at the end of one event publish sequence.
Definition gstreamer.h:254
virtual bool publishFrameHeaderImpl(const GFrameHeader *gframeHeader)
Plugin-specific implementation hook for serializing one frame header.
Definition gstreamer.h:472
bool endStream(const GFrameDataCollection *frameRunData)
End publishing one frame stream record.
Definition gstreamer.h:499
virtual bool publishEventHeaderImpl(const std::unique_ptr< GEventHeader > &gevent_header)
Plugin-specific implementation hook for serializing one event header.
Definition gstreamer.h:279
bool startEvent(const std::shared_ptr< GEventDataCollection > &event_data)
Begin publishing one buffered event.
Definition gstreamer.h:217
virtual bool startEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Plugin-specific implementation hook called at the start of one event publish sequence.
Definition gstreamer.h:233
static GStreamer * instantiate(const dlhandle h, std::shared_ptr< GOptions > g)
Instantiate a streamer plugin from a dynamic library handle.
Definition gstreamer.h:549
virtual bool openConnection()
Open the output medium used by this streamer.
Definition gstreamer.h:103
bool publishPayload(const std::vector< GIntegralPayload * > *payload)
Publish one frame payload.
Definition gstreamer.h:480
bool publishEventTrueInfoData(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Publish the true-information hit bank for one detector.
Definition gstreamer.h:293
virtual bool publishEventDigitizedDataImpl(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Plugin-specific implementation hook for one detector digitized collection.
Definition gstreamer.h:334
virtual bool startRunImpl(const std::shared_ptr< GRunDataCollection > &run_data)
Plugin-specific implementation hook called at the start of a run publish sequence.
Definition gstreamer.h:361
virtual bool endRunImpl(const std::shared_ptr< GRunDataCollection > &run_data)
Plugin-specific implementation hook called at the end of a run publish sequence.
Definition gstreamer.h:382
bool closeConnection()
Close the output medium after flushing buffered events.
Definition gstreamer.h:113
void set_loggers(const std::shared_ptr< GOptions > &g)
Load streamer runtime settings from the parsed options container.
Definition gstreamer.h:200
void * dlhandle
NORMAL
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id=-1)
Create a per-thread map of configured streamer instances.
Definition gstreamer.h:581
vector< GStreamerDefinition > getGStreamerDefinition(const std::shared_ptr< GOptions > &gopts)
Parse all configured gstreamer output definitions from the options container.
Shared constants and error codes for the gstreamer module.
#define ERR_PUBLISH_ERROR
Publish sequence encountered invalid state or invalid input data.
Option and configuration helpers for the gstreamer module.
constexpr const char * GSTREAMER_LOGGER
Logger category name used by gstreamer components.
std::unordered_map< std::string, std::shared_ptr< GStreamer > > gstreamersMap
Definition gstreamer.h:564
Lightweight description of one configured gstreamer output.
std::string type
Semantic output type, typically "event" or "stream".