gstreamer
Loading...
Searching...
No Matches
gstreamer.h
Go to the documentation of this file.
1#pragma once
2
3// gstreamer
4#include "gstreamer_options.h"
6
7// gemc
11#include "gfactory.h"
12#include "gbase.h"
13
14// c++
15#include <string>
16#include <vector>
17#include <map>
18
77class GStreamer : public GBase<GStreamer>
78{
79public:
85 explicit GStreamer(const std::shared_ptr<GOptions>& g) : GBase(g, GSTREAMER_LOGGER) {
86 }
87
94 virtual ~GStreamer() = default;
95
104 [[nodiscard]] virtual bool openConnection() { return false; }
105
114 [[nodiscard]] bool closeConnection() {
116 return closeConnectionImpl();
117 }
118
127 [[nodiscard]] virtual bool closeConnectionImpl() { return false; }
128
138 void publishEventData(const std::shared_ptr<GEventDataCollection>& event_data);
139
148 void publishRunData(const std::shared_ptr<GRunDataCollection>& run_data);
149
158 [[nodiscard]] inline std::string getStreamType() const { return gstreamer_definitions.type; }
159
170 inline void define_gstreamer(const GStreamerDefinition& gstreamerDefinition, int tid = -1) {
171 gstreamer_definitions = GStreamerDefinition(gstreamerDefinition, tid);
172 }
173
182 static const std::vector<std::string>& supported_formats();
183
192 static bool is_valid_format(const std::string& format);
193
201 void set_loggers(const std::shared_ptr<GOptions>& g) {
202 bufferFlushLimit = g->getScalarInt("ebuffer");
203 }
204
205protected:
208
218 [[nodiscard]] bool startEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
219 if (!event_data) { log->error(ERR_PUBLISH_ERROR, "eventData is null in GStreamer::startEvent"); }
220 if (!event_data->getHeader()) {
221 log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::startEvent");
222 }
223
224 log->debug(NORMAL, "GStreamer::startEvent");
225 return startEventImpl(event_data);
226 }
227
234 virtual bool startEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
235 return false;
236 }
237
244 [[nodiscard]] bool endEvent([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
245 log->debug(NORMAL, "GStreamer::endEvent");
246 return endEventImpl(event_data);
247 }
248
255 virtual bool endEventImpl([[maybe_unused]] const std::shared_ptr<GEventDataCollection>& event_data) {
256 return false;
257 }
258
268 [[nodiscard]] bool publishEventHeader([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
269 if (!gevent_header) { log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::publishEventHeader"); }
270 log->debug(NORMAL, "GStreamer::publishEventHeader");
271 return publishEventHeaderImpl(gevent_header);
272 }
273
280 virtual bool publishEventHeaderImpl([[maybe_unused]] const std::unique_ptr<GEventHeader>& gevent_header) {
281 return false;
282 }
283
294 [[nodiscard]] bool publishEventTrueInfoData([[maybe_unused]] const std::string& detectorName,
295 [[maybe_unused]] const std::vector<const GTrueInfoData*>& trueInfoData) {
296 log->debug(NORMAL, "GStreamer::publishEventTrueInfoData for detector ", detectorName);
297 return publishEventTrueInfoDataImpl(detectorName, trueInfoData);
298 }
299
307 virtual bool publishEventTrueInfoDataImpl([[maybe_unused]] const std::string& detectorName,
308 [[maybe_unused]] const std::vector<const GTrueInfoData*>& trueInfoData) {
309 return false;
310 }
311
322 [[nodiscard]] bool publishEventDigitizedData([[maybe_unused]] const std::string& detectorName,
323 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
324 log->debug(NORMAL, "GStreamer::publishEventDigitizedData for detector ", detectorName);
325 return publishEventDigitizedDataImpl(detectorName, digitizedData);
326 }
327
328 [[nodiscard]] bool publishEventGeneratedParticles([[maybe_unused]] const std::string& bankName,
329 [[maybe_unused]] const GGeneratedParticleBank& particles) {
330 log->debug(NORMAL, "GStreamer::publishEventGeneratedParticles for bank ", bankName);
331 return publishEventGeneratedParticlesImpl(bankName, particles);
332 }
333
348 virtual bool publishEventGeneratedParticlesImpl([[maybe_unused]] const std::string& bankName,
349 [[maybe_unused]] const GGeneratedParticleBank& particles) {
350 return true;
351 }
352
360 virtual bool publishEventDigitizedDataImpl([[maybe_unused]] const std::string& detectorName,
361 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
362 return false;
363 }
364
371 [[nodiscard]] bool startRun([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
372 if (!run_data) { log->error(ERR_PUBLISH_ERROR, "run_data is null in GStreamer::startRun"); }
373 if (!run_data->getHeader()) {
374 log->error(ERR_PUBLISH_ERROR, "run header is null in GStreamer::startRun");
375 }
376
377 log->debug(NORMAL, "GStreamer::startRun");
378 return startRunImpl(run_data);
379 }
380
387 virtual bool startRunImpl([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
388 return false;
389 }
390
397 [[nodiscard]] bool endRun([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
398 log->debug(NORMAL, "GStreamer::endRun");
399 return endRunImpl(run_data);
400 }
401
408 virtual bool endRunImpl([[maybe_unused]] const std::shared_ptr<GRunDataCollection>& run_data) {
409 return false;
410 }
411
418 bool publishRunHeader([[maybe_unused]] const std::unique_ptr<GRunHeader>& run_header) {
419 log->debug(NORMAL, "GStreamer::publishRunHeader");
420 return publishRunHeaderImpl(run_header);
421 }
422
429 virtual bool publishRunHeaderImpl([[maybe_unused]] const std::unique_ptr<GRunHeader>& run_header) {
430 return false;
431 }
432
440 bool publishRunDigitizedData([[maybe_unused]] const std::string& detectorName,
441 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
442 log->debug(NORMAL, "GStreamer::publishRunDigitizedData for detector ", detectorName);
443 return publishRunDigitizedDataImpl(detectorName, digitizedData);
444 }
445
453 virtual bool publishRunDigitizedDataImpl([[maybe_unused]] const std::string& detectorName,
454 [[maybe_unused]] const std::vector<const GDigitizedData*>& digitizedData) {
455 return false;
456 }
457
467 [[nodiscard]] bool startStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
469 log->debug(NORMAL, "GStreamer::startStream");
470 return startStreamImpl(frameRunData);
471 }
472
479 virtual bool startStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
480
487 [[nodiscard]] bool publishFrameHeader([[maybe_unused]] const GFrameHeader* gframeHeader) {
488 log->debug(NORMAL, "GStreamer::publishFrameHeader");
489 return publishFrameHeaderImpl(gframeHeader);
490 }
491
498 virtual bool publishFrameHeaderImpl([[maybe_unused]] const GFrameHeader* gframeHeader) { return false; }
499
506 [[nodiscard]] bool publishPayload([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) {
507 log->debug(NORMAL, "GStreamer::publishPayload");
508 return publishPayloadImpl(payload);
509 }
510
517 virtual bool publishPayloadImpl([[maybe_unused]] const std::vector<GIntegralPayload*>* payload) { return false; }
518
525 [[nodiscard]] bool endStream([[maybe_unused]] const GFrameDataCollection* frameRunData) {
526 log->debug(NORMAL, "GStreamer::endStream");
527 return endStreamImpl(frameRunData);
528 }
529
536 virtual bool endStreamImpl([[maybe_unused]] const GFrameDataCollection* frameRunData) { return false; }
537
545 void flushEventBuffer();
546
547private:
556 [[nodiscard]] virtual std::string filename() const = 0;
557
559 std::vector<std::shared_ptr<GEventDataCollection>> eventBuffer;
560
562 size_t bufferFlushLimit = 10;
563
564public:
575 static GStreamer* instantiate(const dlhandle h, std::shared_ptr<GOptions> g) {
576 if (!h) return nullptr;
577 using fptr = GStreamer* (*)(std::shared_ptr<GOptions>);
578
579 auto sym = dlsym(h, "GStreamerFactory");
580 if (!sym) return nullptr;
581
582 auto func = reinterpret_cast<fptr>(sym);
583 return func(g);
584 }
585};
586
587
588namespace gstreamer {
589
590 using gstreamersMap = std::unordered_map<std::string, std::shared_ptr<GStreamer>>;
591
607 inline std::shared_ptr<const gstreamersMap> gstreamersMapPtr(const std::shared_ptr<GOptions>& gopts,
608 int thread_id = -1) {
609 auto log = std::make_shared<GLogger>(gopts, "gstreamersMap worker for thread id" + std::to_string(thread_id),
611
612 GManager manager(gopts);
613
614 auto gstreamers = std::make_shared<gstreamersMap>();
615
616 for (const auto& gstreamer_def : gstreamer::getGStreamerDefinition(gopts)) {
617 auto gstreamer_def_thread = GStreamerDefinition(gstreamer_def, thread_id);
618 std::string gstreamer_plugin = gstreamer_def_thread.gstreamerPluginName();
619
620 // Load the plugin object for this configured output.
621 auto streamer = manager.LoadAndRegisterObjectFromLibrary<GStreamer>(gstreamer_plugin, gopts);
622 gstreamers->emplace(gstreamer_plugin, streamer);
623
624 // Bind the thread-specialized definition to the plugin instance.
625 gstreamers->at(gstreamer_plugin)->define_gstreamer(gstreamer_def_thread);
626 }
627
628 return gstreamers;
629 }
630
642 inline std::shared_ptr<const gstreamersMap> preloadGStreamerPlugins(const std::shared_ptr<GOptions>& gopts) {
643 if (gstreamer::getGStreamerDefinition(gopts).empty()) { return std::make_shared<gstreamersMap>(); }
644 return gstreamersMapPtr(gopts);
645 }
646
647} // namespace gstreamer
std::shared_ptr< GLogger > log
void debug(debug_type type, Args &&... args) const
void error(int exit_code, Args &&... args) const
std::shared_ptr< T > LoadAndRegisterObjectFromLibrary(std::string_view name, const std::shared_ptr< GOptions > &gopts)
Abstract base class for all gstreamer output plugins.
Definition gstreamer.h:78
virtual bool publishPayloadImpl(const std::vector< GIntegralPayload * > *payload)
Plugin-specific implementation hook for serializing one frame payload.
Definition gstreamer.h:517
bool publishFrameHeader(const GFrameHeader *gframeHeader)
Publish one frame header.
Definition gstreamer.h:487
static const std::vector< std::string > & supported_formats()
Return the list of output format tokens supported by the module.
Definition gstreamer.cc:11
virtual bool startStreamImpl(const GFrameDataCollection *frameRunData)
Plugin-specific implementation hook called at the start of a frame stream record.
Definition gstreamer.h:479
bool publishEventDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish the digitized hit bank for one detector.
Definition gstreamer.h:322
std::string getStreamType() const
Return the semantic stream type associated with this streamer instance.
Definition gstreamer.h:158
bool startStream(const GFrameDataCollection *frameRunData)
Begin publishing one frame stream record.
Definition gstreamer.h:467
virtual ~GStreamer()=default
Virtual destructor.
void publishEventData(const std::shared_ptr< GEventDataCollection > &event_data)
Queue one event for publication.
Definition gstreamer.cc:25
virtual bool closeConnectionImpl()
Plugin-specific close implementation hook.
Definition gstreamer.h:127
bool publishRunDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish run-level digitized data for one detector.
Definition gstreamer.h:440
virtual bool publishEventTrueInfoDataImpl(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Plugin-specific implementation hook for one detector true-information collection.
Definition gstreamer.h:307
bool endRun(const std::shared_ptr< GRunDataCollection > &run_data)
End publishing one run-level collection.
Definition gstreamer.h:397
virtual bool endStreamImpl(const GFrameDataCollection *frameRunData)
Plugin-specific implementation hook called at the end of a frame stream record.
Definition gstreamer.h:536
void flushEventBuffer()
Flush all buffered events to the backend in publish order.
Definition gstreamer.cc:73
static bool is_valid_format(const std::string &format)
Validate whether a format token is supported.
Definition gstreamer.cc:17
virtual bool publishEventGeneratedParticlesImpl(const std::string &bankName, const GGeneratedParticleBank &particles)
Plugin-specific implementation hook for a generated-particle event bank.
Definition gstreamer.h:348
bool endEvent(const std::shared_ptr< GEventDataCollection > &event_data)
End publishing one buffered event.
Definition gstreamer.h:244
bool publishEventHeader(const std::unique_ptr< GEventHeader > &gevent_header)
Publish the event header for the current event sequence.
Definition gstreamer.h:268
void define_gstreamer(const GStreamerDefinition &gstreamerDefinition, int tid=-1)
Assign the output definition used by this streamer instance.
Definition gstreamer.h:170
GStreamer(const std::shared_ptr< GOptions > &g)
Construct the streamer base and initialize module logging.
Definition gstreamer.h:85
bool startRun(const std::shared_ptr< GRunDataCollection > &run_data)
Begin publishing one run-level collection.
Definition gstreamer.h:371
void publishRunData(const std::shared_ptr< GRunDataCollection > &run_data)
Publish one run-level data collection immediately.
Definition gstreamer.cc:45
virtual bool publishRunDigitizedDataImpl(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Plugin-specific implementation hook for one detector run-level digitized collection.
Definition gstreamer.h:453
virtual bool publishRunHeaderImpl(const std::unique_ptr< GRunHeader > &run_header)
Plugin-specific implementation hook for serializing one run header.
Definition gstreamer.h:429
GStreamerDefinition gstreamer_definitions
Output definition currently bound to this streamer instance.
Definition gstreamer.h:207
bool publishRunHeader(const std::unique_ptr< GRunHeader > &run_header)
Publish the run header for the current run sequence.
Definition gstreamer.h:418
virtual bool endEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Plugin-specific implementation hook called at the end of one event publish sequence.
Definition gstreamer.h:255
virtual bool publishFrameHeaderImpl(const GFrameHeader *gframeHeader)
Plugin-specific implementation hook for serializing one frame header.
Definition gstreamer.h:498
bool endStream(const GFrameDataCollection *frameRunData)
End publishing one frame stream record.
Definition gstreamer.h:525
virtual bool publishEventHeaderImpl(const std::unique_ptr< GEventHeader > &gevent_header)
Plugin-specific implementation hook for serializing one event header.
Definition gstreamer.h:280
bool startEvent(const std::shared_ptr< GEventDataCollection > &event_data)
Begin publishing one buffered event.
Definition gstreamer.h:218
virtual bool startEventImpl(const std::shared_ptr< GEventDataCollection > &event_data)
Plugin-specific implementation hook called at the start of one event publish sequence.
Definition gstreamer.h:234
static GStreamer * instantiate(const dlhandle h, std::shared_ptr< GOptions > g)
Instantiate a streamer plugin from a dynamic library handle.
Definition gstreamer.h:575
virtual bool openConnection()
Open the output medium used by this streamer.
Definition gstreamer.h:104
bool publishPayload(const std::vector< GIntegralPayload * > *payload)
Publish one frame payload.
Definition gstreamer.h:506
bool publishEventTrueInfoData(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Publish the true-information hit bank for one detector.
Definition gstreamer.h:294
virtual bool publishEventDigitizedDataImpl(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Plugin-specific implementation hook for one detector digitized collection.
Definition gstreamer.h:360
virtual bool startRunImpl(const std::shared_ptr< GRunDataCollection > &run_data)
Plugin-specific implementation hook called at the start of a run publish sequence.
Definition gstreamer.h:387
bool publishEventGeneratedParticles(const std::string &bankName, const GGeneratedParticleBank &particles)
Definition gstreamer.h:328
virtual bool endRunImpl(const std::shared_ptr< GRunDataCollection > &run_data)
Plugin-specific implementation hook called at the end of a run publish sequence.
Definition gstreamer.h:408
bool closeConnection()
Close the output medium after flushing buffered events.
Definition gstreamer.h:114
void set_loggers(const std::shared_ptr< GOptions > &g)
Load streamer runtime settings from the parsed options container.
Definition gstreamer.h:201
std::vector< GGeneratedParticleData > GGeneratedParticleBank
void * dlhandle
NORMAL
std::shared_ptr< const gstreamersMap > gstreamersMapPtr(const std::shared_ptr< GOptions > &gopts, int thread_id=-1)
Create a per-thread map of configured streamer instances.
Definition gstreamer.h:607
std::shared_ptr< const gstreamersMap > preloadGStreamerPlugins(const std::shared_ptr< GOptions > &gopts)
Preload configured streamer plugins before worker threads are started.
Definition gstreamer.h:642
vector< GStreamerDefinition > getGStreamerDefinition(const std::shared_ptr< GOptions > &gopts)
Parse all configured gstreamer output definitions from the options container.
Shared constants and error codes for the gstreamer module.
#define ERR_PUBLISH_ERROR
Publish sequence encountered invalid state or invalid input data.
Option and configuration helpers for the gstreamer module.
constexpr const char * GSTREAMER_LOGGER
Logger category name used by gstreamer components.
std::unordered_map< std::string, std::shared_ptr< GStreamer > > gstreamersMap
Definition gstreamer.h:590
Lightweight description of one configured gstreamer output.
std::string type
Semantic output type, typically "event" or "stream".