Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa

AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
{
auto& ac = ctx.services().get<DanglingEdgesContext>();
auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
int compressionLevel = 505;
if (ctx.options().hasOption("aod-writer-compression")) {
compressionLevel = ctx.options().get<int>("aod-writer-compression");
}
return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
auto outputInputs = ic.services().get<DanglingEdgesContext>().outputsInputsAOD;
LOGP(debug, "======== getGlobalAODSink::Init ==========");

// find out if any table needs to be saved
Expand Down Expand Up @@ -241,14 +241,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
};
}

AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ctx*/)
{
using namespace monitoring;
auto& ac = ctx.services().get<DanglingEdgesContext>();
auto tskmap = ac.outTskMap;
auto objmap = ac.outObjHistMap;

return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
return AlgorithmSpec{[](InitContext& ic) -> std::function<void(ProcessingContext&)> {
using namespace monitoring;
auto& dec = ic.services().get<DanglingEdgesContext>();
auto tskmap = dec.outTskMap;
auto objmap = dec.outObjHistMap;
auto& callbacks = ic.services().get<CallbackService>();
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();

Expand Down Expand Up @@ -278,7 +277,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)

callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
if (!ref.header) {
Expand Down Expand Up @@ -474,7 +473,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
};
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
mergePart(pc.inputs().get("x", pi));
}
O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");
Expand Down
58 changes: 29 additions & 29 deletions Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,38 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::Outpu
}
} // namespace

AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
{
auto& ac = ctx.services().get<DanglingEdgesContext>();
std::vector<std::shared_ptr<arrow::Schema>> schemas;
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
auto& dec = ic.services().get<DanglingEdgesContext>();
std::vector<std::shared_ptr<arrow::Schema>> schemas;
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();

for (auto& input : ac.analysisCCDBInputs) {
std::vector<std::shared_ptr<arrow::Field>> fields;
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
schemaMetadata->Append("outputBinding", input.binding);
for (auto& input : dec.analysisCCDBInputs) {
std::vector<std::shared_ptr<arrow::Field>> fields;
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
schemaMetadata->Append("outputBinding", input.binding);

for (auto& m : input.metadata) {
// Save the list of input tables
if (m.name.starts_with("input:")) {
auto name = m.name.substr(6);
schemaMetadata->Append("sourceTable", name);
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
continue;
}
// Ignore the non ccdb: entries
if (!m.name.starts_with("ccdb:")) {
continue;
for (auto& m : input.metadata) {
// Save the list of input tables
if (m.name.starts_with("input:")) {
auto name = m.name.substr(6);
schemaMetadata->Append("sourceTable", name);
continue;
}
// Ignore the non ccdb: entries
if (!m.name.starts_with("ccdb:")) {
continue;
}
// Create the schema of the output
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
metadata->Append("url", m.defaultValue.asString());
auto columnName = m.name.substr(strlen("ccdb:"));
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
}
// Create the schema of the output
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
metadata->Append("url", m.defaultValue.asString());
auto columnName = m.name.substr(strlen("ccdb:"));
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
}
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
}
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {

std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
CCDBFetcherHelper::initialiseHelper(*helper, options);
std::unordered_map<std::string, int> bindings;
Expand Down Expand Up @@ -129,11 +129,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
int outputRouteIndex = bindings.at(outRouteDesc);
auto& spec = helper->routes[outputRouteIndex].matcher;
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
for (auto& _ : schema->fields()) {
for (auto const& _ : schema->fields()) {
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
}

for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);

Expand Down
2 changes: 1 addition & 1 deletion Framework/CCDBSupport/src/AnalysisCCDBHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace o2::framework
{

struct AnalysisCCDBHelpers {
static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx);
static AlgorithmSpec fetchFromCCDB(ConfigContext const&);
};

} // namespace o2::framework
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
std::vector<ExpressionInfo> expressionInfos;

/// make sure options and configurables are set before expression infos are created
homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
homogeneous_apply_refs([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
/// extract conditions and append them as inputs
homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());

Expand Down Expand Up @@ -620,7 +620,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
}
// reset pre-slice for the next dataframe
auto slices = pc.services().get<ArrowTableSlicingCache>();
homogeneous_apply_refs([&pc, &slices](auto& element) {
homogeneous_apply_refs([&slices](auto& element) {
return analysis_task_parsers::updateSliceInfo(element, slices);
},
*(task.get()));
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirecto
if (!keepString.empty()) {
dod->reset();
std::string d("dangling");
if (d.find(keepString) == 0) {
if (keepString.starts_with(d)) {
// use the dangling outputs
std::vector<InputSpec> danglingOutputs;
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
Expand Down Expand Up @@ -144,7 +144,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec>
sinks::append_to{publisher.outputs}; // append them to the publisher outputs

std::vector<InputSpec> additionalInputs;
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
for (auto const& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
input.metadata |
views::filter_string_params_with("input:") |
views::params_to_input_specs() |
Expand Down
27 changes: 1 addition & 26 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -691,33 +691,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}
}

// replace writer as some outputs may have become dangling and some are now consumed
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);

// create DataOutputDescriptor
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);

// select outputs of type AOD which need to be saved
// ATTENTION: if there are dangling outputs the getGlobalAODSink
// has to be created in any case!
dec.outputsInputsAOD.clear();

for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
if (!ds.empty() || isDangling[ii]) {
dec.outputsInputsAOD.emplace_back(outputsInputs[ii]);
}
}
}
WorkflowHelpers::injectAODWriter(workflow, ctx);

// file sink for any AOD output
if (!dec.outputsInputsAOD.empty()) {
// add TFNumber and TFFilename as input to the writer
dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
}
// Move the dummy sink at the end, if needed
for (size_t i = 0; i < workflow.size(); ++i) {
if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
Expand Down
79 changes: 39 additions & 40 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
// AODs are being injected on-the-fly, add error-handler reader
aodReader.algorithm = AlgorithmSpec{
adaptStateful(
[outputs = aodReader.outputs](DeviceSpec const&) {
[](DeviceSpec const& spec) {
LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
for (auto const& output : outputs) {
LOGP(warn, " {}", DataSpecUtils::describe(output));
for (auto const& output : spec.outputs) {
LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
}
LOGP(fatal, "Stopping.");
// to ensure the output type for adaptStateful
Expand Down Expand Up @@ -532,43 +532,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
extraSpecs.clear();

/// Analyze all ouputs
auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
dec.isDangling = isDanglingTmp;
dec.outputsInputs = outputsInputsTmp;

// create DataOutputDescriptor
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);

// select outputs of type AOD which need to be saved
// ATTENTION: if there are dangling outputs the getGlobalAODSink
// has to be created in any case!
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
if (ds.size() > 0 || dec.isDangling[ii]) {
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
}
}
}

// file sink for any AOD output
if (dec.outputsInputsAOD.size() > 0) {
// add TFNumber and TFFilename as input to the writer
dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
extraSpecs.push_back(fileSink);

auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec& spec) -> bool {
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
});
size_t ii = std::distance(dec.outputsInputs.begin(), it);
dec.isDangling[ii] = false;
}

workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
extraSpecs.clear();
injectAODWriter(workflow, ctx);

// Select dangling outputs which are not of type AOD
std::vector<InputSpec> redirectedOutputsInputs;
Expand Down Expand Up @@ -712,6 +676,41 @@ void WorkflowHelpers::adjustTopology(WorkflowSpec& workflow, ConfigContext const
}
}

void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx)
{
auto& dec = ctx.services().get<DanglingEdgesContext>();
/// Analyze all ouputs
std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);

// create DataOutputDescriptor
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);

// select outputs of type AOD which need to be saved
dec.outputsInputsAOD.clear();
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
if (ds.size() > 0 || dec.isDangling[ii]) {
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
}
}
}

// file sink for any AOD output
if (dec.outputsInputsAOD.size() > 0) {
// add TFNumber and TFFilename as input to the writer
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
workflow.push_back(fileSink);

auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
});
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
}
}

void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
std::vector<DeviceConnectionEdge>& logicalEdges,
std::vector<OutputSpec>& outputs,
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/src/WorkflowHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ struct WorkflowHelpers {
// @a ctx the context for the configuration phase
static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx);

// Function to correctly add AOD writer
static void injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx);

// Final adjustments to @a workflow after service devices have been injected.
static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx);

Expand Down