Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 140 additions & 107 deletions Framework/Core/include/Framework/DataAllocator.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
Expand Down Expand Up @@ -29,9 +29,9 @@

#include "Headers/DataHeader.h"
#include <TClass.h>
#include <gsl/span>

#include <memory>
#include <ranges>
#include <vector>
#include <map>
#include <string>
Expand Down Expand Up @@ -127,6 +127,10 @@ template <typename T>
concept VectorOfMessageableTypes = is_specialization_v<T, std::vector> &&
is_messageable<typename T::value_type>::value;

template <typename T>
concept ContiguousMessageablesRange = std::ranges::contiguous_range<T> &&
is_messageable<typename T::value_type>::value;

/// This allocator is responsible to make sure that the messages created match
/// the provided spec and that depending on how many pipelined reader we
/// have, messages get created on the channel for the reader of the current
Expand Down Expand Up @@ -296,8 +300,9 @@ class DataAllocator
///
/// Supported types:
/// - messageable types (trivially copyable, non-polymorphic
/// - std::vector of messageable types
/// - std::vector of pointers of messageable type
/// - contiguous_range of messageable types
/// - random_access_ranges of pointers of messageable type
/// - sized range of messageable type
/// - types with ROOT dictionary and implementing the ROOT ClassDef interface
///
/// Note: for many use cases, especially for the messageable types, the `make` interface
Expand All @@ -308,116 +313,144 @@ class DataAllocator
/// Use @a ROOTSerialized type wrapper to force ROOT serialization. Same applies to
/// types which do not implement the ClassDef interface but have a dictionary.
template <typename T>
requires(!std::ranges::contiguous_range<T> && is_messageable<T>::value == true)
void snapshot(const Output& spec, T const& object)
{
return snapshot(spec, std::span<T const>(&object, &object + 1));
}

void snapshot(const Output& spec, std::string_view const& object)
{
return snapshot(spec, std::span<char const>(object.data(), object.size()));
}

// This is for snapshotting a range of contiguous messageable types
template <typename T>
requires(ContiguousMessageablesRange<T> && std::is_pointer_v<typename T::value_type> == false)
void snapshot(const Output& spec, T const& object)
{
auto& proxy = mRegistry.get<MessageContext>().proxy();
fair::mq::MessagePtr payloadMessage;
auto serializationType = o2::header::gSerializationMethodNone;
RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
if constexpr (is_messageable<T>::value == true) {
// Serialize a snapshot of a trivially copyable, non-polymorphic object,
payloadMessage = proxy.createOutputMessage(routeIndex, sizeof(T));
memcpy(payloadMessage->GetData(), &object, sizeof(T));

serializationType = o2::header::gSerializationMethodNone;
} else if constexpr (is_specialization_v<T, std::vector> == true ||
(gsl::details::is_span<T>::value && has_messageable_value_type<T>::value)) {
using ElementType = typename std::remove_pointer<typename T::value_type>::type;
if constexpr (is_messageable<ElementType>::value) {
// Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
// Note: in most cases it is better to use the `make` function und work with the provided
// reference object
constexpr auto elementSizeInBytes = sizeof(ElementType);
auto sizeInBytes = elementSizeInBytes * object.size();
payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);

if constexpr (std::is_pointer<typename T::value_type>::value == false) {
// vector of elements
if (object.data() && sizeInBytes) {
memcpy(payloadMessage->GetData(), object.data(), sizeInBytes);
}
} else {
// serialize vector of pointers to elements
auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
for (auto const& pointer : object) {
memcpy(target, pointer, elementSizeInBytes);
target += elementSizeInBytes;
}
}

serializationType = o2::header::gSerializationMethodNone;
} else if constexpr (has_root_dictionary<ElementType>::value) {
return snapshot(spec, ROOTSerialized<T const>(object));
} else {
static_assert(always_static_assert_v<T>,
"value type of std::vector not supported by API, supported types:"
"\n - messageable tyeps (trivially copyable, non-polymorphic structures)"
"\n - pointers to those"
"\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
}
} else if constexpr (is_container<T>::value == true && has_messageable_value_type<T>::value == true) {
// Serialize a snapshot of a std::container of trivially copyable, non-polymorphic elements
// Note: in most cases it is better to use the `make` function und work with the provided
// reference object
constexpr auto elementSizeInBytes = sizeof(typename T::value_type);
auto sizeInBytes = elementSizeInBytes * object.size();
payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);

// serialize vector of pointers to elements
auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
for (auto const& entry : object) {
memcpy(target, (void*)&entry, elementSizeInBytes);
target += elementSizeInBytes;
}
serializationType = o2::header::gSerializationMethodNone;
} else if constexpr (has_root_dictionary<T>::value == true || is_specialization_v<T, ROOTSerialized> == true) {
// Serialize a snapshot of an object with root dictionary
payloadMessage = proxy.createOutputMessage(routeIndex);
payloadMessage->Rebuild(4096, {64});
if constexpr (is_specialization_v<T, ROOTSerialized> == true) {
// Explicitely ROOT serialize a snapshot of object.
// An object wrapped into type `ROOTSerialized` is explicitely marked to be ROOT serialized
// and is expected to have a ROOT dictionary. Availability can not be checked at compile time
// for all cases.
using WrappedType = typename T::wrapped_type;
static_assert(std::is_same<typename T::hint_type, const char>::value ||
std::is_same<typename T::hint_type, TClass>::value ||
std::is_void<typename T::hint_type>::value,
"class hint must be of type TClass or const char");

const TClass* cl = nullptr;
if (object.getHint() == nullptr) {
// get TClass info by wrapped type
cl = TClass::GetClass(typeid(WrappedType));
} else if (std::is_same<typename T::hint_type, TClass>::value) {
// the class info has been passed directly
cl = reinterpret_cast<const TClass*>(object.getHint());
} else if (std::is_same<typename T::hint_type, const char>::value) {
// get TClass info by optional name
cl = TClass::GetClass(reinterpret_cast<const char*>(object.getHint()));
}
if (has_root_dictionary<WrappedType>::value == false && cl == nullptr) {
if (std::is_same<typename T::hint_type, const char>::value) {
throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
reinterpret_cast<const char*>(object.getHint()));
} else {
throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
typeid(WrappedType).name());
}
}
typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object(), cl);
using ElementType = typename std::remove_pointer<typename T::value_type>::type;
// Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
// Note: in most cases it is better to use the `make` function und work with the provided
// reference object
constexpr auto elementSizeInBytes = sizeof(ElementType);
auto sizeInBytes = elementSizeInBytes * object.size();
fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);

// vector of elements
if (object.data() && sizeInBytes) {
memcpy(payloadMessage->GetData(), object.data(), sizeInBytes);
}

addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
}

// A random access range of pointers we can serialise by storing the contens one after the other.
// On the receiving side you will have to retrieve it via a span
template <typename T>
requires(std::ranges::random_access_range<T> && is_messageable<typename std::remove_pointer_t<typename T::value_type>>::value && std::is_pointer_v<typename T::value_type> == true)
void snapshot(const Output& spec, T const& object)
{
auto& proxy = mRegistry.get<MessageContext>().proxy();
RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
using ElementType = typename std::remove_pointer_t<typename T::value_type>;
// Serialize a snapshot of a std::vector of trivially copyable, non-polymorphic elements
// Note: in most cases it is better to use the `make` function und work with the provided
// reference object
constexpr auto elementSizeInBytes = sizeof(ElementType);
auto sizeInBytes = elementSizeInBytes * object.size();
fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);

// serialize vector of pointers to elements
auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
for (auto const& pointer : object) {
memcpy(target, pointer, elementSizeInBytes);
target += elementSizeInBytes;
}

addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
}

// This is for a range where we can know upfront how many elements there are,
// so that we can preallocate the final size by simply multipling sizeof(T) x N elements
template <typename T>
requires(!std::ranges::contiguous_range<T> && std::ranges::sized_range<T> && has_messageable_value_type<T>::value)
void snapshot(const Output& spec, T const& object)
{
auto& proxy = mRegistry.get<MessageContext>().proxy();
RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
// Serialize a snapshot of a std::container of trivially copyable, non-polymorphic elements
// Note: in most cases it is better to use the `make` function und work with the provided
// reference object
constexpr auto elementSizeInBytes = sizeof(typename T::value_type);
auto sizeInBytes = elementSizeInBytes * object.size();
fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex, sizeInBytes);

// serialize vector of pointers to elements
auto target = reinterpret_cast<unsigned char*>(payloadMessage->GetData());
for (auto const& entry : object) {
memcpy(target, (void*)&entry, elementSizeInBytes);
target += elementSizeInBytes;
}
addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodNone);
}

template <typename T>
requires(is_specialization_v<T, ROOTSerialized>)
void snapshot(const Output& spec, T const& object)
{
auto& proxy = mRegistry.get<MessageContext>().proxy();
RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
// Serialize a snapshot of an object with root dictionary
fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
payloadMessage->Rebuild(4096, {64});
const TClass* cl = nullptr;
// Explicitely ROOT serialize a snapshot of object.
// An object wrapped into type `ROOTSerialized` is explicitely marked to be ROOT serialized
// and is expected to have a ROOT dictionary. Availability can not be checked at compile time
// for all cases.
using WrappedType = typename T::wrapped_type;
static_assert(std::is_same<typename T::hint_type, const char>::value ||
std::is_same<typename T::hint_type, TClass>::value ||
std::is_void<typename T::hint_type>::value,
"class hint must be of type TClass or const char");

if (object.getHint() == nullptr) {
// get TClass info by wrapped type
cl = TClass::GetClass(typeid(WrappedType));
} else if (std::is_same<typename T::hint_type, TClass>::value) {
// the class info has been passed directly
cl = reinterpret_cast<const TClass*>(object.getHint());
} else if (std::is_same<typename T::hint_type, const char>::value) {
// get TClass info by optional name
cl = TClass::GetClass(reinterpret_cast<const char*>(object.getHint()));
}
if (has_root_dictionary<WrappedType>::value == false && cl == nullptr) {
if (std::is_same<typename T::hint_type, const char>::value) {
throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
reinterpret_cast<const char*>(object.getHint()));
} else {
typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object, TClass::GetClass(typeid(T)));
throw runtime_error_f("ROOT serialization not supported, dictionary not found for type %s",
typeid(WrappedType).name());
}
serializationType = o2::header::gSerializationMethodROOT;
} else {
static_assert(always_static_assert_v<T>,
"data type T not supported by API, \n specializations available for"
"\n - trivially copyable, non-polymorphic structures"
"\n - std::vector of messageable structures or pointers to those"
"\n - types with ROOT dictionary and implementing ROOT ClassDef interface");
}
addPartToContext(routeIndex, std::move(payloadMessage), spec, serializationType);
typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object(), cl);
addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodROOT);
}

template <typename T>
requires(is_messageable<T>::value == false && !ContiguousMessageablesRange<T> && has_root_dictionary<T>::value == true && is_specialization_v<T, ROOTSerialized> == false)
void snapshot(const Output& spec, T const& object)
{
auto& proxy = mRegistry.get<MessageContext>().proxy();
RouteIndex routeIndex = matchDataHeader(spec, mRegistry.get<TimingInfo>().timeslice);
// Serialize a snapshot of an object with root dictionary
fair::mq::MessagePtr payloadMessage = proxy.createOutputMessage(routeIndex);
payloadMessage->Rebuild(4096, {64});
typename root_serializer<T>::serializer().Serialize(*payloadMessage, &object, TClass::GetClass(typeid(T)));
addPartToContext(routeIndex, std::move(payloadMessage), spec, o2::header::gSerializationMethodROOT);
}

/// Take a snapshot of a raw data array which can be either POD or may contain a serialized
Expand Down