gstreamer
Loading...
Searching...
No Matches
gstreamer.cc
Go to the documentation of this file.
1// gstreamer
2#include "gstreamer.h"
3
4// gemc
5#include "gutilities.h"
6
7// Implementation summary:
8// Common base-class logic for format validation, buffered event publication,
9// and immediate run publication. Concrete serialization remains in plugin hooks.
10
11const std::vector<std::string>& GStreamer::supported_formats() {
12 // Keep this list aligned with the available gstreamer_<format>_plugin factories.
13 static const std::vector<std::string> formats = {"jlabsro", "root", "ascii", "csv", "json"};
14 return formats;
15}
16
17bool GStreamer::is_valid_format(const std::string& format) {
18 const auto& supported = GStreamer::supported_formats();
19 const auto f = gutilities::convertToLowercase(format);
20 return std::find(supported.begin(), supported.end(), f) != supported.end();
21}
22
23
24// pragma todo: pass someting like map<string, bitset> to each detector to decide which data to publish
25void GStreamer::publishEventData(const std::shared_ptr<GEventDataCollection>& event_data) {
26 // The event collection and its header are required for any plugin to publish
27 // a meaningful event record.
28 if (!event_data) { log->error(ERR_PUBLISH_ERROR, "event data is null in GStreamer::publishEventData"); }
29 if (!event_data->getHeader()) {
30 log->error(ERR_PUBLISH_ERROR, "event header is null in GStreamer::publishEventData");
31 }
32
33 // Retain ownership of the event until the buffer is flushed. This guarantees
34 // that raw pointers extracted later from hit collections remain valid.
35 eventBuffer.emplace_back(event_data);
36
37 // Once the configured threshold is reached, publish all buffered events in one pass.
38 if (eventBuffer.size() >= bufferFlushLimit) { flushEventBuffer(); }
39}
40
41
42// Implementation summary:
43// Run data are published immediately instead of being buffered. The publish order
44// mirrors the base-class run sequence: start, header, detector banks, end.
45void GStreamer::publishRunData(const std::shared_ptr<GRunDataCollection>& run_data) {
46 log->info(2, "GStreamer::publishRunData->startRun: ",
48
49 log->info(2, SFUNCTION_NAME, "->publishRunHeader -> ",
50 gutilities::success_or_fail(publishRunHeader(run_data->getHeader())));
51
52 // Iterate over each detector collection and expose it to the plugin as a
53 // vector of raw pointers. The owning run collection remains alive throughout
54 // this method call.
55 for (const auto& [sdname, gDataCollection] : run_data->getDataCollectionMap()) {
56 const GDataCollection* tdptr = gDataCollection.get();
57
58 // Extract digitized hits into a flat raw-pointer view expected by the hooks.
59 std::vector<const GDigitizedData*> digitizedPtrs;
60 digitizedPtrs.reserve(tdptr->getDigitizedData().size());
61
62 for (const auto& hit : tdptr->getDigitizedData()) { digitizedPtrs.push_back(hit.get()); }
63
64 log->info(2, SFUNCTION_NAME, "->publishEventDigitizedData for detector -> ", sdname,
66 }
67
68 log->info(2, "GStreamer::endEvent -> ",
70}
71
72
74 log->info(2, "GStreamer::flushEventBuffer -> flushing ", eventBuffer.size(), " events to file");
75
76 // Each buffered event is treated as read-only while the plugin hooks serialize it.
77 // The buffer's shared_ptr ownership keeps all event-owned hit objects alive during the flush.
78 for (const auto& eventData : eventBuffer) {
79 log->info(2, SFUNCTION_NAME, "->startEvent: ",
81
82 log->info(2, SFUNCTION_NAME, "->publishEventHeader -> ",
83 gutilities::success_or_fail(publishEventHeader(eventData->getHeader())));
84
85 // Publish one detector collection at a time.
86 for (const auto& [sdname, gDataCollection] : eventData->getDataCollectionMap()) {
87 const GDataCollection* tdptr = gDataCollection.get();
88
89 // Convert ownership-bearing containers into temporary raw-pointer views.
90 // Plugins consume these views immediately and do not own the pointed data.
91 std::vector<const GTrueInfoData*> trueInfoPtrs;
92 std::vector<const GDigitizedData*> digitizedPtrs;
93 trueInfoPtrs.reserve(tdptr->getTrueInfoData().size());
94 digitizedPtrs.reserve(tdptr->getDigitizedData().size());
95
96 for (const auto& hit : tdptr->getTrueInfoData()) { trueInfoPtrs.push_back(hit.get()); }
97 for (const auto& hit : tdptr->getDigitizedData()) { digitizedPtrs.push_back(hit.get()); }
98
99 log->info(2, SFUNCTION_NAME, "->publishEventTrueInfoData for detector -> ", sdname,
101
102 log->info(2, SFUNCTION_NAME, "->publishEventDigitizedData for detector -> ", sdname,
104 }
105
106 log->info(2, "GStreamer::endEvent -> ", gutilities::success_or_fail(endEvent(eventData)));
107 }
108
109 // All buffered events have now been handed to the plugin hooks.
110 eventBuffer.clear();
111}
112
113// stream an individual frame
114// void GStreamer::publishFrameRunData(const std::shared_ptr<GFrameDataCollection>& frameRunData) {
115// TODO: add more infor like frame number or number of entries in paylod
116
117// log->info(2, "GStreamer::publishFrameRunData: ",
118// gutilities::success_or_fail(startStream(frameRunData)));
119// log->info(2, "GStreamer::publishFrameHeader: ",
120// gutilities::success_or_fail(publishFrameHeader(frameRunData->getHeader())));
121// log->info(2, "GStreamer::publishPayload: ",
122// gutilities::success_or_fail(publishPayload(frameRunData->getIntegralPayload())));
123// log->info(2, "GStreamer::endStream: ",
124// gutilities::success_or_fail(endStream(frameRunData)));
125// }
std::shared_ptr< GLogger > log
auto getDigitizedData() const -> const std::vector< std::unique_ptr< GDigitizedData > > &
auto getTrueInfoData() const -> const std::vector< std::unique_ptr< GTrueInfoData > > &
void info(int level, Args &&... args) const
void error(int exit_code, Args &&... args) const
static const std::vector< std::string > & supported_formats()
Return the list of output format tokens supported by the module.
Definition gstreamer.cc:11
bool publishEventDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish the digitized hit bank for one detector.
Definition gstreamer.h:321
void publishEventData(const std::shared_ptr< GEventDataCollection > &event_data)
Queue one event for publication.
Definition gstreamer.cc:25
bool publishRunDigitizedData(const std::string &detectorName, const std::vector< const GDigitizedData * > &digitizedData)
Publish run-level digitized data for one detector.
Definition gstreamer.h:414
bool endRun(const std::shared_ptr< GRunDataCollection > &run_data)
End publishing one run-level collection.
Definition gstreamer.h:371
void flushEventBuffer()
Flush all buffered events to the backend in publish order.
Definition gstreamer.cc:73
static bool is_valid_format(const std::string &format)
Validate whether a format token is supported.
Definition gstreamer.cc:17
bool endEvent(const std::shared_ptr< GEventDataCollection > &event_data)
End publishing one buffered event.
Definition gstreamer.h:243
bool publishEventHeader(const std::unique_ptr< GEventHeader > &gevent_header)
Publish the event header for the current event sequence.
Definition gstreamer.h:267
bool startRun(const std::shared_ptr< GRunDataCollection > &run_data)
Begin publishing one run-level collection.
Definition gstreamer.h:345
void publishRunData(const std::shared_ptr< GRunDataCollection > &run_data)
Publish one run-level data collection immediately.
Definition gstreamer.cc:45
bool publishRunHeader(const std::unique_ptr< GRunHeader > &run_header)
Publish the run header for the current run sequence.
Definition gstreamer.h:392
bool startEvent(const std::shared_ptr< GEventDataCollection > &event_data)
Begin publishing one buffered event.
Definition gstreamer.h:217
bool publishEventTrueInfoData(const std::string &detectorName, const std::vector< const GTrueInfoData * > &trueInfoData)
Publish the true-information hit bank for one detector.
Definition gstreamer.h:293
#define SFUNCTION_NAME
#define ERR_PUBLISH_ERROR
Publish sequence encountered invalid state or invalid input data.
Core streaming interface and helper utilities for the gstreamer module.
std::string success_or_fail(bool condition)
string convertToLowercase(const string &str)