Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/columnar data serialization #708

Merged
merged 31 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c73a699
add unit tests for columnar (de)serializer
mgovers Aug 21, 2024
1724672
add columnar data support to deserializer
mgovers Aug 28, 2024
0a8fe41
make serializer tests pass as well
mgovers Aug 29, 2024
f8d9331
cleanup
mgovers Aug 30, 2024
c7e47b2
extend dataset with forgotten functionality
mgovers Aug 30, 2024
00ad355
fix includes
mgovers Aug 30, 2024
9fed3bd
make gcc/macos happy
mgovers Aug 30, 2024
1c9fa3b
more gcc happiness
mgovers Aug 30, 2024
f62def5
Merge branch 'main' into feature/columnar-data-serialization
mgovers Aug 30, 2024
3d36714
clang-tidy
mgovers Aug 30, 2024
e2ccc5b
Merge branch 'feature/columnar-data-serialization' of https://github.…
mgovers Aug 30, 2024
9c5a2d2
made deserializer work with more efficient attribute lookup
mgovers Sep 3, 2024
5383c01
vector instead of unordered_map
mgovers Sep 3, 2024
d642f96
sonar cloud + clang-tidy
mgovers Sep 3, 2024
587f2ff
go to separate map and array view
mgovers Sep 4, 2024
be0ffb0
cleanup
mgovers Sep 4, 2024
53787cb
process minor feedback
mgovers Sep 4, 2024
9a71960
gcc + clang-tdy
mgovers Sep 4, 2024
099db60
Merge remote-tracking branch 'origin/main' into feature/columnar-data…
mgovers Sep 4, 2024
c67b9b0
more gcc
mgovers Sep 4, 2024
dfe6ce3
fix compilation
mgovers Sep 4, 2024
ba7a133
fix unix
mgovers Sep 4, 2024
b90173c
more fix compilation
mgovers Sep 4, 2024
6d98c08
more fix compilation
mgovers Sep 4, 2024
b908b45
fix lifetime of reordered attribute buffers
mgovers Sep 4, 2024
747d29a
update serializer with similar logic to deserializer
mgovers Sep 4, 2024
7be28f4
fix clang-tidy
mgovers Sep 5, 2024
1bdd43a
fix edge case component not in dataset
mgovers Sep 5, 2024
91e92ad
remove unused functions
mgovers Sep 5, 2024
404422e
rename buffer view
mgovers Sep 5, 2024
233cee3
make assertions more clear
mgovers Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ template <dataset_type_tag dataset_type_> class Dataset {
Dataset get_individual_scenario(Idx scenario)
requires(!is_indptr_mutable_v<dataset_type>)
{
using AdvanceablePtr = std::conditional_t<is_data_mutable_v<dataset_type>, char*, char const*>;

assert(0 <= scenario && scenario < batch_size());

Dataset result{false, 1, dataset().name, meta_data()};
Expand All @@ -366,10 +368,17 @@ template <dataset_type_tag dataset_type_> class Dataset {
Idx size = component_info.elements_per_scenario >= 0
? component_info.elements_per_scenario
: buffer.indptr[scenario + 1] - buffer.indptr[scenario];
Data* data = component_info.elements_per_scenario >= 0
? component_info.component->advance_ptr(buffer.data, size * scenario)
: component_info.component->advance_ptr(buffer.data, buffer.indptr[scenario]);
result.add_buffer(component_info.component->name, size, size, nullptr, data);
Idx offset = component_info.elements_per_scenario >= 0 ? size * scenario : buffer.indptr[scenario];
if (is_columnar(buffer)) {
mgovers marked this conversation as resolved.
Show resolved Hide resolved
result.add_buffer(component_info.component->name, size, size, nullptr, nullptr);
for (auto const& attribute_buffer : buffer.attributes) {
result.add_attribute_buffer(component_info.component->name, attribute_buffer.meta_attribute->name,
static_cast<Data*>(static_cast<AdvanceablePtr>(attribute_buffer.data)));
}
} else {
Data* data = component_info.component->advance_ptr(buffer.data, offset);
result.add_buffer(component_info.component->name, size, size, nullptr, data);
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,24 @@ template <class Functor, class... Args> decltype(auto) ctype_func_selector(CType
}

// set nan
inline void set_nan(double& x) { x = nan; }
inline void set_nan(IntS& x) { x = na_IntS; }
inline void set_nan(ID& x) { x = na_IntID; }
constexpr void set_nan(double& x) { x = nan; }
mgovers marked this conversation as resolved.
Show resolved Hide resolved
constexpr void set_nan(IntS& x) { x = na_IntS; }
constexpr void set_nan(ID& x) { x = na_IntID; }
inline void set_nan(RealValue<asymmetric_t>& x) { x = RealValue<asymmetric_t>{nan}; }
template <class Enum>
requires std::same_as<std::underlying_type_t<Enum>, IntS>
inline void set_nan(Enum& x) {
constexpr void set_nan(Enum& x) {
x = static_cast<Enum>(na_IntS);
}
template <typename T>
requires requires(T t) {
{ set_nan(t) };
}
inline T const nan_value = [] {
T v{};
set_nan(v);
return v;
}();

using RawDataPtr = void*; // raw mutable data ptr
using RawDataConstPtr = void const*; // raw read-only data ptr
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: Contributors to the Power Grid Model project <powergridmodel@lfenergy.org>
//
// SPDX-License-Identifier: MPL-2.0

#pragma once

#include <concepts>

namespace power_grid_model::meta_data::detail {

struct row_based_t {};
struct columnar_t {};
constexpr row_based_t row_based{};
constexpr columnar_t columnar{};

template <typename T>
concept row_based_or_columnar_c = std::derived_from<T, row_based_t> || std::derived_from<T, columnar_t>;

} // namespace power_grid_model::meta_data::detail
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

#pragma once

#include "common.hpp"

#include "../../common/common.hpp"
#include "../../common/exception.hpp"
#include "../../common/typing.hpp"
#include "../dataset.hpp"
#include "../meta_data.hpp"

Expand Down Expand Up @@ -364,6 +367,17 @@ class Deserializer {
using DataByteMeta = std::vector<std::vector<ComponentByteMeta>>;
using AttributeByteMeta = std::vector<std::pair<std::string_view, std::vector<std::string_view>>>;

struct BufferView {
WritableDataset::Buffer const* buffer{nullptr};
Idx idx{0};
std::vector<AttributeBuffer<void> const*> const* attribute_order{nullptr};
TonyXiang8787 marked this conversation as resolved.
Show resolved Hide resolved
};

using row_based_t = detail::row_based_t;
using columnar_t = detail::columnar_t;
static constexpr auto row_based = detail::row_based;
static constexpr auto columnar = detail::columnar;

public:
// not copyable
Deserializer(Deserializer const&) = delete;
Expand Down Expand Up @@ -433,6 +447,7 @@ class Deserializer {
std::string version_;
bool is_batch_{};
std::map<MetaComponent const*, std::vector<MetaAttribute const*>, std::less<>> attributes_;

// offset of the msgpack bytes, the number of elements,
// for the actual data, per component (outer), per batch (inner)
// if a component has no element for a certain scenario, that offset and size will be zero.
Expand Down Expand Up @@ -685,22 +700,23 @@ class Deserializer {
}

void parse_component(Idx component_idx) {
auto const& buffer = dataset_handler_.get_buffer(component_idx);
auto buffer =
BufferView{.buffer = &dataset_handler_.get_buffer(component_idx), .idx = 0, .attribute_order = nullptr};
auto const& info = dataset_handler_.get_component_info(component_idx);
auto const& msg_data = msg_data_offsets_[component_idx];
Idx const batch_size = dataset_handler_.batch_size();
component_key_ = info.component->name;
// handle indptr
if (info.elements_per_scenario < 0) {
// first always zero
buffer.indptr.front() = 0;
buffer.buffer->indptr.front() = 0;
// accumulate sum
std::transform_inclusive_scan(
msg_data.cbegin(), msg_data.cend(), buffer.indptr.begin() + 1, std::plus{},
msg_data.cbegin(), msg_data.cend(), buffer.buffer->indptr.begin() + 1, std::plus{},
[](auto const& x) { return x.size; }, Idx{});
}
// set nan
info.component->set_nan(buffer.data, 0, info.total_elements);
set_nan(buffer, info);
// attributes
auto const attributes = [&]() -> std::span<MetaAttribute const* const> {
auto const found = attributes_.find(info.component);
Expand All @@ -709,27 +725,29 @@ class Deserializer {
}
return found->second;
}();
auto const attribute_buffer_order = get_attribute_buffer_mapping(*buffer.buffer, attributes);
buffer.attribute_order = &attribute_buffer_order;
TonyXiang8787 marked this conversation as resolved.
Show resolved Hide resolved
// all scenarios
for (scenario_number_ = 0; scenario_number_ != batch_size; ++scenario_number_) {
Idx const scenario_offset = info.elements_per_scenario < 0 ? buffer.indptr[scenario_number_]
Idx const scenario_offset = info.elements_per_scenario < 0 ? buffer.buffer->indptr[scenario_number_]
: scenario_number_ * info.elements_per_scenario;
#ifndef NDEBUG
if (info.elements_per_scenario < 0) {
assert(buffer.indptr[scenario_number_ + 1] - buffer.indptr[scenario_number_] ==
assert(buffer.buffer->indptr[scenario_number_ + 1] - buffer.buffer->indptr[scenario_number_] ==
msg_data[scenario_number_].size);

} else {
assert(info.elements_per_scenario == msg_data[scenario_number_].size);
}
#endif
void* scenario_pointer = info.component->advance_ptr(buffer.data, scenario_offset);
parse_scenario(*info.component, scenario_pointer, msg_data[scenario_number_], attributes);
BufferView scenario = advance(buffer, scenario_offset);
parse_scenario(*info.component, scenario, msg_data[scenario_number_], attributes);
}
scenario_number_ = -1;
component_key_ = "";
}

void parse_scenario(MetaComponent const& component, void* scenario_pointer, ComponentByteMeta const& msg_data,
void parse_scenario(MetaComponent const& component, BufferView const& buffer, ComponentByteMeta const& msg_data,
std::span<MetaAttribute const* const> attributes) {
// skip for empty scenario
if (msg_data.size == 0) {
Expand All @@ -739,49 +757,102 @@ class Deserializer {
offset_ = msg_data.offset;
parse_map_array<visit_array_t, move_forward>();
for (element_number_ = 0; element_number_ != msg_data.size; ++element_number_) {
void* element_pointer = component.advance_ptr(scenario_pointer, element_number_);
BufferView const element_buffer = advance(buffer, element_number_);
// check the element is map or array
auto const element_visitor = parse_map_array<visit_map_array_t, move_forward>();
if (element_visitor.is_map) {
parse_map_element(element_pointer, element_visitor.size, component);
parse_map_element(element_buffer, element_visitor.size, component);
} else {
parse_array_element(element_pointer, element_visitor.size, attributes);
parse_array_element(element_buffer, element_visitor.size, component, attributes);
}
}
element_number_ = -1;
offset_ = 0;
}

void parse_map_element(void* element_pointer, Idx map_size, MetaComponent const& component) {
void parse_map_element(BufferView const& buffer, Idx map_size, MetaComponent const& component) {
while (map_size-- != 0) {
attribute_key_ = parse_string();
Idx const found_idx = component.find_attribute(attribute_key_);
if (found_idx < 0) {
Idx const component_attribute_idx = component.find_attribute(attribute_key_);
TonyXiang8787 marked this conversation as resolved.
Show resolved Hide resolved
if (component_attribute_idx < 0) {
attribute_key_ = {};
// allow unknown key for additional user info
parse_skip();
continue;
}
parse_attribute(element_pointer, component.attributes[found_idx]);
parse_attribute(buffer, component, component.attributes[component_attribute_idx]);
}
attribute_key_ = "";
}

void parse_array_element(void* element_pointer, Idx array_size, std::span<MetaAttribute const* const> attributes) {
void parse_array_element(BufferView const& buffer, Idx array_size, MetaComponent const& component,
std::span<MetaAttribute const* const> attributes) {
if (array_size != static_cast<Idx>(attributes.size())) {
throw SerializationError{
"An element of a list should have same length as the list of predefined attributes!\n"};
}

for (attribute_number_ = 0; attribute_number_ != array_size; ++attribute_number_) {
parse_attribute(element_pointer, *attributes[attribute_number_]);
parse_attribute(buffer, component, *attributes[attribute_number_]);
}
attribute_number_ = -1;
}

void parse_attribute(void* element_pointer, MetaAttribute const& attribute) {
void parse_attribute(BufferView const& buffer, MetaComponent const& component, MetaAttribute const& attribute) {
assert(buffer.buffer != nullptr);

if (buffer.buffer->data != nullptr) {
TonyXiang8787 marked this conversation as resolved.
Show resolved Hide resolved
parse_attribute(row_based, buffer, component, attribute);
} else {
parse_attribute(columnar, buffer, component, attribute);
}
}

void parse_attribute(row_based_t /*tag*/, BufferView const& buffer, MetaComponent const& component,
MetaAttribute const& attribute) {
// call relevant parser
assert(buffer.buffer != nullptr);
assert(buffer.buffer->data != nullptr);

ctype_func_selector(attribute.ctype, [&buffer, &component, &attribute, this]<class T> {
ValueVisitor<T> visitor{{},
attribute.get_attribute<T>(component.advance_ptr(buffer.buffer->data, buffer.idx))};
msgpack::parse(data_, size_, offset_, visitor);
});
}

void parse_attribute(columnar_t /*tag*/, BufferView const& buffer, MetaComponent const& /*component*/,
MetaAttribute const& attribute) {
// call relevant parser
ctype_func_selector(attribute.ctype, [element_pointer, &attribute, this]<class T> {
ValueVisitor<T> visitor{{}, attribute.get_attribute<T>(element_pointer)};
assert(buffer.buffer != nullptr);
assert(buffer.buffer->data == nullptr);

if (attribute_number_ >= 0 && buffer.attribute_order != nullptr) {
assert(attribute_number_ < std::ssize(*buffer.attribute_order));

if (auto const* const attribute_buffer = (*buffer.attribute_order)[attribute_number_];
attribute_buffer != nullptr) {
parse_attribute(*attribute_buffer, buffer.idx);
} else {
parse_skip();
}
} else if (auto it = std::ranges::find_if(buffer.buffer->attributes,
[&attribute](auto const& attribute_buffer) {
return attribute_buffer.meta_attribute == &attribute;
});
it != buffer.buffer->attributes.end()) {
parse_attribute(*it, buffer.idx);
} else {
parse_skip();
}
}
TonyXiang8787 marked this conversation as resolved.
Show resolved Hide resolved

void parse_attribute(AttributeBuffer<void> const& buffer, Idx idx) {
assert(buffer.data != nullptr);
assert(buffer.meta_attribute != nullptr);

ctype_func_selector(buffer.meta_attribute->ctype, [&buffer, &idx, this]<class T> {
ValueVisitor<T> visitor{{}, *(reinterpret_cast<T*>(buffer.data) + idx)};
msgpack::parse(data_, size_, offset_, visitor);
});
}
Expand Down Expand Up @@ -816,6 +887,48 @@ class Deserializer {
}
}

static void set_nan(BufferView const& buffer, ComponentInfo const& info) {
if (buffer.buffer->data != nullptr) {
info.component->set_nan(buffer.buffer->data, buffer.idx, info.total_elements);
} else {
for (auto const& attribute_buffer : buffer.buffer->attributes) {
assert(attribute_buffer.meta_attribute != nullptr);
ctype_func_selector(
attribute_buffer.meta_attribute->ctype, [&attribute_buffer, &buffer, &info]<typename T> {
std::ranges::fill(std::span{reinterpret_cast<T*>(attribute_buffer.data) + buffer.idx,
narrow_cast<size_t>(info.total_elements)},
nan_value<T>);
});
}
}
}

static BufferView advance(BufferView buffer, Idx offset) {
mgovers marked this conversation as resolved.
Show resolved Hide resolved
buffer.idx += offset;
return buffer;
}

static std::vector<AttributeBuffer<void> const*>
get_attribute_buffer_mapping(WritableDataset::Buffer const& buffer,
std::span<MetaAttribute const* const> attributes) {
if (buffer.data != nullptr) {
return {};
}

std::vector<AttributeBuffer<void> const*> result(attributes.size());
std::ranges::transform(
attributes, result.begin(), [&buffer](auto const* const attribute) -> AttributeBuffer<void> const* {
auto it = std::ranges::find_if(buffer.attributes, [&attribute](auto const& attribute_buffer) {
return attribute_buffer.meta_attribute == attribute;
});
if (it != buffer.attributes.end()) {
return &*it;
}
return nullptr;
});
return result;
}

[[noreturn]] void handle_error(std::exception const& e) {
std::stringstream ss;
ss << e.what();
Expand Down
Loading
Loading