From 5ef7c68a229c251c28d5d1a5c43a4d9fbdd9dee7 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 13 Jan 2026 10:19:10 +0100 Subject: [PATCH 1/7] use DEC in analysis object writer --- .../AnalysisSupport/src/AODWriterHelpers.cxx | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index d868b7498fb76..dba7113877ae2 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -241,14 +241,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) }; } -AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) +AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ctx*/) { - using namespace monitoring; - auto& ac = ctx.services().get(); - auto tskmap = ac.outTskMap; - auto objmap = ac.outObjHistMap; - - return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function { + return AlgorithmSpec{[](InitContext& ic) -> std::function { + using namespace monitoring; + auto& dec = ic.services().get(); + auto tskmap = dec.outTskMap; + auto objmap = dec.outObjHistMap; auto& callbacks = ic.services().get(); auto inputObjects = std::make_shared>>(); @@ -278,7 +277,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) callbacks.set(endofdatacb); return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { - auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) { + auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) { O2_SIGNPOST_ID_GENERATE(hid, histogram_registry); O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram"); if (!ref.header) { @@ -474,7 +473,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) }; O2_SIGNPOST_ID_GENERATE(rid, histogram_registry); O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0)); - for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) { + for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) { mergePart(pc.inputs().get("x", pi)); } O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message."); From 38721b73ffe8257d41eef1ab287c89b316030a31 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 13 Jan 2026 10:38:47 +0100 Subject: [PATCH 2/7] use DEC in aod writer --- Framework/AnalysisSupport/src/AODWriterHelpers.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx index dba7113877ae2..b76ffca13977e 100644 --- a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -62,13 +62,13 @@ const static std::unordered_map ROOTfileNa AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) { - auto& ac = ctx.services().get(); auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); int compressionLevel = 505; if (ctx.options().hasOption("aod-writer-compression")) { compressionLevel = ctx.options().get("aod-writer-compression"); } - return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function { + return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function { + auto outputInputs = ic.services().get().outputsInputsAOD; LOGP(debug, "======== getGlobalAODSink::Init =========="); // find out if any table needs to be saved From a94ed77ca52b40c87021836f0e4641418e58cebc Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 13 Jan 2026 10:44:40 +0100 Subject: [PATCH 3/7] use DEC in analysis CCDB fetcher --- .../CCDBSupport/src/AnalysisCCDBHelpers.cxx | 54 +++++++++---------- .../CCDBSupport/src/AnalysisCCDBHelpers.h | 2 +- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 9ec911518f754..b516547b06bb1 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -67,38 +67,38 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector(); - std::vector> schemas; - auto schemaMetadata = std::make_shared(); + return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) { + auto& dec = ic.services().get(); + std::vector> schemas; + auto schemaMetadata = std::make_shared(); - for (auto& input : ac.analysisCCDBInputs) { - std::vector> fields; - schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input)); - schemaMetadata->Append("outputBinding", input.binding); + for (auto& input : dec.analysisCCDBInputs) { + std::vector> fields; + schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input)); + schemaMetadata->Append("outputBinding", input.binding); - for (auto& m : input.metadata) { - // Save the list of input tables - if (m.name.starts_with("input:")) { - auto name = m.name.substr(6); - schemaMetadata->Append("sourceTable", name); - schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get(DataSpecUtils::fromMetadataString(m.defaultValue.get()).matcher))); - continue; - } - // Ignore the non ccdb: entries - if (!m.name.starts_with("ccdb:")) { - continue; + for (auto& m : input.metadata) { + // Save the list of input tables + if (m.name.starts_with("input:")) { + auto name = m.name.substr(6); + schemaMetadata->Append("sourceTable", name); + continue; + } + // Ignore the non ccdb: entries + if (!m.name.starts_with("ccdb:")) { + continue; + } + // Create the schema of the output + auto metadata = std::make_shared(); + metadata->Append("url", m.defaultValue.asString()); + auto columnName = m.name.substr(strlen("ccdb:")); + fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, metadata)); } - // Create the schema of the output - auto metadata = std::make_shared(); - metadata->Append("url", m.defaultValue.asString()); - auto columnName = m.name.substr(strlen("ccdb:")); - fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, metadata)); + schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } - schemas.emplace_back(std::make_shared(fields, schemaMetadata)); - } - return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) { + std::shared_ptr helper = std::make_shared(); CCDBFetcherHelper::initialiseHelper(*helper, options); std::unordered_map bindings; diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h index f8175034da0ba..3be2138bd2b5c 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.h @@ -17,7 +17,7 @@ namespace o2::framework { struct AnalysisCCDBHelpers { - static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx); + static AlgorithmSpec fetchFromCCDB(ConfigContext const&); }; } // namespace o2::framework From ba0b518aaca4cb5032932351751180a49c3383ec Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 13 Jan 2026 10:19:23 +0100 Subject: [PATCH 4/7] minor cleanups --- Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx | 4 ++-- Framework/Core/include/Framework/AnalysisTask.h | 4 ++-- Framework/Core/src/AnalysisSupportHelpers.cxx | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index b516547b06bb1..ea13d412cd0b8 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -129,11 +129,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) int outputRouteIndex = bindings.at(outRouteDesc); auto& spec = helper->routes[outputRouteIndex].matcher; std::vector> builders; - for (auto& _ : schema->fields()) { + for (auto const& _ : schema->fields()) { builders.emplace_back(std::make_shared()); } - for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) { + for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) { std::shared_ptr chunk = timestampColumn->chunk(ci); auto const* timestamps = chunk->data()->GetValuesSafe(1); diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index c50b5358990de..4f8a9e719e4b9 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -521,7 +521,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) std::vector expressionInfos; /// make sure options and configurables are set before expression infos are created - homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get()); + homogeneous_apply_refs([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get()); /// extract conditions and append them as inputs homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get()); @@ -620,7 +620,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // reset pre-slice for the next dataframe auto slices = pc.services().get(); - homogeneous_apply_refs([&pc, &slices](auto& element) { + homogeneous_apply_refs([&slices](auto& element) { return analysis_task_parsers::updateSliceInfo(element, slices); }, *(task.get())); diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 15b56f9afbff5..7edf9a2d8d27f 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -98,7 +98,7 @@ std::shared_ptr AnalysisSupportHelpers::getDataOutputDirecto if (!keepString.empty()) { dod->reset(); std::string d("dangling"); - if (d.find(keepString) == 0) { + if (d.starts_with(keepString) == 0) { // use the dangling outputs std::vector danglingOutputs; for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { @@ -144,7 +144,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector sinks::append_to{publisher.outputs}; // append them to the publisher outputs std::vector additionalInputs; - for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { + for (auto const& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { input.metadata | views::filter_string_params_with("input:") | views::params_to_input_specs() | From 7d9df5c8700d277c4ac991c1a2f78543e3587187 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 23 Jan 2026 10:53:35 +0100 Subject: [PATCH 5/7] consolidate writer configuration between injection and adjusting the topology --- Framework/Core/src/ArrowSupport.cxx | 27 +--------- Framework/Core/src/WorkflowHelpers.cxx | 73 +++++++++++++------------- Framework/Core/src/WorkflowHelpers.h | 3 ++ 3 files changed, 40 insertions(+), 63 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 95e763343671a..dd1040e3cd52a 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -691,33 +691,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } - // replace writer as some outputs may have become dangling and some are now consumed - auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); - - // create DataOutputDescriptor - std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); - - // select outputs of type AOD which need to be saved - // ATTENTION: if there are dangling outputs the getGlobalAODSink - // has to be created in any case! - dec.outputsInputsAOD.clear(); - - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); - if (!ds.empty() || isDangling[ii]) { - dec.outputsInputsAOD.emplace_back(outputsInputs[ii]); - } - } - } + WorkflowHelpers::injectAODWriter(workflow, ctx); - // file sink for any AOD output - if (!dec.outputsInputsAOD.empty()) { - // add TFNumber and TFFilename as input to the writer - dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); - dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); - workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); - } // Move the dummy sink at the end, if needed for (size_t i = 0; i < workflow.size(); ++i) { if (workflow[i].name == "internal-dpl-injected-dummy-sink") { diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index fd9099e1aa24e..a18268b845fcc 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -532,43 +532,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); extraSpecs.clear(); - /// Analyze all ouputs - auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow); - dec.isDangling = isDanglingTmp; - dec.outputsInputs = outputsInputsTmp; - - // create DataOutputDescriptor - std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); - - // select outputs of type AOD which need to be saved - // ATTENTION: if there are dangling outputs the getGlobalAODSink - // has to be created in any case! - for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]); - if (ds.size() > 0 || dec.isDangling[ii]) { - dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]); - } - } - } - - // file sink for any AOD output - if (dec.outputsInputsAOD.size() > 0) { - // add TFNumber and TFFilename as input to the writer - dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); - dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); - auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); - extraSpecs.push_back(fileSink); - - auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec& spec) -> bool { - return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); - }); - size_t ii = std::distance(dec.outputsInputs.begin(), it); - dec.isDangling[ii] = false; - } - - workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); - extraSpecs.clear(); + injectAODWriter(workflow, ctx); // Select dangling outputs which are not of type AOD std::vector redirectedOutputsInputs; @@ -712,6 +676,41 @@ void WorkflowHelpers::adjustTopology(WorkflowSpec& workflow, ConfigContext const } } +void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx) +{ + auto& dec = ctx.services().get(); + /// Analyze all ouputs + std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow); + + // create DataOutputDescriptor + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); + + // select outputs of type AOD which need to be saved + dec.outputsInputsAOD.clear(); + for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) { + auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]); + if (ds.size() > 0 || dec.isDangling[ii]) { + dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]); + } + } + } + + // file sink for any AOD output + if (dec.outputsInputsAOD.size() > 0) { + // add TFNumber and TFFilename as input to the writer + DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"}); + DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"}); + auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); + workflow.push_back(fileSink); + + auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool { + return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); + }); + dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false; + } +} + void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow, std::vector& logicalEdges, std::vector& outputs, diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index b2a4d4cab55df..5c0aa363c6d67 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -182,6 +182,9 @@ struct WorkflowHelpers { // @a ctx the context for the configuration phase static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx); + // Function to correctly add AOD writer + static void injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx); + // Final adjustments to @a workflow after service devices have been injected. static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx); From 598f4c06248c24b9e3a520609b0d4cb9da0b0983 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 23 Jan 2026 12:44:48 +0100 Subject: [PATCH 6/7] fix recognition of dangling keyword --- Framework/Core/src/AnalysisSupportHelpers.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 7edf9a2d8d27f..7453751315626 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -98,7 +98,7 @@ std::shared_ptr AnalysisSupportHelpers::getDataOutputDirecto if (!keepString.empty()) { dod->reset(); std::string d("dangling"); - if (d.starts_with(keepString) == 0) { + if (keepString.starts_with(d)) { // use the dangling outputs std::vector danglingOutputs; for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { From 544daef01e63424765bad2c11f443cc90608b249 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 23 Jan 2026 12:45:37 +0100 Subject: [PATCH 7/7] use own output list for the mc-injected reader so that correct error is printed --- Framework/Core/src/WorkflowHelpers.cxx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index a18268b845fcc..02e44fa36cb41 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -421,10 +421,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // AODs are being injected on-the-fly, add error-handler reader aodReader.algorithm = AlgorithmSpec{ adaptStateful( - [outputs = aodReader.outputs](DeviceSpec const&) { + [](DeviceSpec const& spec) { LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:"); - for (auto const& output : outputs) { - LOGP(warn, " {}", DataSpecUtils::describe(output)); + for (auto const& output : spec.outputs) { + LOGP(warn, " {}", DataSpecUtils::describe(output.matcher)); } LOGP(fatal, "Stopping."); // to ensure the output type for adaptStateful