Skip to content

Commit

Permalink
fully working with multi loop writes +signed dims support
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Sep 2, 2024
1 parent 0184785 commit 28b7809
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 35 deletions.
15 changes: 15 additions & 0 deletions test/src/unit-cppapi-max-fragment-size.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,15 @@ TEST_CASE(

cleanup();

bool more_than_one_loop = false;
SECTION("More than one loop write") {
more_than_one_loop = true;
}

SECTION("One loop write") {
more_than_one_loop = false;
}

// Remove the array at the end of this test.
ScopedExecutor deferred(cleanup);

Expand Down Expand Up @@ -573,6 +582,11 @@ TEST_CASE(
// the creation of two new fragments.
tiledb::Config cfg;
cfg.set("sm.consolidation.max_fragment_size", "150");
cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation
if (more_than_one_loop) {
cfg.set("sm.consolidation.buffer_size", "10");
}

ctx = Context(cfg);
Array::consolidate(ctx, array_name);
Array::vacuum(ctx, array_name);
Expand Down Expand Up @@ -664,6 +678,7 @@ TEST_CASE(
// the creation of two new fragments.
tiledb::Config cfg;
cfg.set("sm.consolidation.max_fragment_size", "80");
cfg.set("sm.consolidation.buffer_size", "10000"); // speed up consolidation
ctx = Context(cfg);
Array::consolidate(ctx, array_name);
Array::vacuum(ctx, array_name);
Expand Down
7 changes: 7 additions & 0 deletions tiledb/sm/enums/datatype.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ inline bool datatype_is_string(Datatype type) {
type == Datatype::STRING_UCS2 || type == Datatype::STRING_UCS4);
}

/** Returns true if the input datatype is an unsigned type. */
inline bool datatype_is_unsigned(Datatype type) {
return (
type == Datatype::UINT8 || type == Datatype::UINT32 ||
type == Datatype::UINT16 || type == Datatype::UINT64);
}

/** Returns true if the input datatype is an integer type. */
inline bool datatype_is_integer(Datatype type) {
return (
Expand Down
115 changes: 83 additions & 32 deletions tiledb/sm/query/writers/global_order_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ GlobalOrderWriter::GlobalOrderWriter(
, current_fragment_size_(0)
, rows_written_(0)
, tiles_in_current_row_(0)
, tiles_written_(0)
, tiles_since_last_split_(0)
, u_start_(0)
, start_(0)
, u_end_(0)
, end_(0)
, nd_if_dense_split_{} {
, nd_if_dense_split_{}
, dense_with_split_(false) {
// Check the layout is global order.
if (layout_ != Layout::GLOBAL_ORDER) {
throw GlobalOrderWriterException(
Expand Down Expand Up @@ -787,23 +792,26 @@ Status GlobalOrderWriter::global_write() {

// Compute the number of tiles that will fit in this fragment.
auto num = num_tiles_to_write(idx, tile_num, tiles);

if (array_schema_.array_type() ==
ArrayType::DENSE) { //&& this is consolidation
// if it is a dense array and not all tiles can fit in the current
// fragment then we need to split the domain, otherwise if all tiles can
// fit it means that we are in the middle of a write
nd_if_dense_split_ = ndranges_after_split(num, tile_num != num);
bool is_dense = array_schema_.array_type() == ArrayType::DENSE;
bool is_last_range = false;

if (is_dense && disable_checks_consolidation_) {
// if it is a dense array during consolidation and not all tiles can fit
// in the current fragment then we need to split the domain, otherwise if
// all tiles can fit it means that we are in the middle of a write
nd_if_dense_split_ = ndranges_after_split(num, tile_num, is_last_range);
}

if (tile_num != num && !nd_if_dense_split_.empty()) {
if ((!nd_if_dense_split_.empty() && num != tile_num) ||
(is_last_range && dense_with_split_)) {
frag_meta->init_domain(nd_if_dense_split_);
dense_with_split_ = true;
}

// If we're resuming a fragment write and the first tile doesn't fit into
// the previous fragment, we need to start a new fragment and recalculate
// the number of tiles to write.
if (current_fragment_size_ > 0 && num == 0) {
if (current_fragment_size_ > 0 && num == 0 && !dense_with_split_) {
RETURN_CANCEL_OR_ERROR(start_new_fragment());
num = num_tiles_to_write(idx, tile_num, tiles);
}
Expand Down Expand Up @@ -1462,33 +1470,47 @@ uint64_t GlobalOrderWriter::num_tiles_per_row(const Domain& domain) {
}

NDRange GlobalOrderWriter::ndranges_after_split(
uint64_t num, bool reached_end_of_fragment) {
uint64_t num, uint64_t tile_num, bool& is_last_range) {
// Expand domain to full tiles
bool reached_end_of_fragment = tile_num != num;
auto& domain{array_schema_.domain()};
if (disable_checks_consolidation_) {
auto expanded_subarray = subarray_.ndrange(0);
domain.expand_to_tiles(&expanded_subarray);
}

tiles_written_ += num;
tiles_since_last_split_ += num;

// Calculate how many tiles each row can hold
uint64_t tiles_per_row = num_tiles_per_row(domain);

// Calculate how many rows we will write in the current fragment
uint64_t rows_of_tiles_to_write =
(num - tiles_in_current_row_) / tiles_per_row;
uint64_t rows_of_tiles_to_write = 0;

if (num != 0) {
rows_of_tiles_to_write = (num - tiles_in_current_row_) / tiles_per_row;
}

// If we have not written a full row and we have reached the end of the
// fragment abort

// set vars
uint64_t remainder_of_tiles = 0;
bool moved_row = true;

// Calculate how many tiles we have in the current row
if (rows_of_tiles_to_write == 0) {
remainder_of_tiles += num;
moved_row = false;
} else {
remainder_of_tiles = (num - tiles_in_current_row_) % tiles_per_row;
}
tiles_in_current_row_ = remainder_of_tiles;
tiles_in_current_row_ += remainder_of_tiles;

// If we have not written a full row and we have reached the end of the
// fragment abort
if (tiles_in_current_row_ == tiles_per_row) {
tiles_in_current_row_ = 0;
}

// If we have finished the write in the middle of the row, throw
if (tiles_in_current_row_ != 0 && reached_end_of_fragment) {
throw GlobalOrderWriterException(
"The target fragment size cannot be achieved. Please try using a "
Expand All @@ -1513,25 +1535,41 @@ NDRange GlobalOrderWriter::ndranges_after_split(
if (rows_written_ == 0) {
// It means that the start has not been set yet. Set it to the minimum value
// of the expanded domain for that dim
auto ll = [&](auto T) {
auto dim_dom_data = (const decltype(T)*)dim_dom.data();
// todo this should be unsigned or signed
return static_cast<uint64_t>(dim_dom_data[0]);
auto ll = [&](auto T, size_t index) {
// Return a pair. The domain can be signed or unsigned
auto dim_dom_data = dim_dom.typed_data<decltype(T)>()[index];
int64_t ret_s = static_cast<int64_t>(dim_dom_data);
uint64_t ret_u = static_cast<uint64_t>(dim_dom_data);
return std::make_pair(ret_s, ret_u);
};

start_ = apply_with_type(ll, dim->type());
end_ = apply_with_type(ll, dim->type());
// based on whether the dim is signed or unsigned assign the proper vars
if (datatype_is_unsigned(dim->type())) {
u_start_ = apply_with_type(ll, dim->type(), 0).second;
u_end_ = apply_with_type(ll, dim->type(), 1).second;
} else {
start_ = apply_with_type(ll, dim->type(), 0).first;
end_ = apply_with_type(ll, dim->type(), 1).first;
}
}
uint64_t end = start_ + (rows_of_tiles_to_write * tile_extent) - 1;

// if there is a remainder it means we need one more row
if (tiles_in_current_row_ != 0 && !reached_end_of_fragment && end < end_ &&
moved_row) {
end++;
// Use 'auto' to temporarily use the cached signed or unsigned start_ and end_
// values
auto start_to_use = datatype_is_unsigned(dim->type()) ? u_start_ : start_;
auto end_to_use = datatype_is_unsigned(dim->type()) ? u_end_ : end_;

auto end =
start_to_use + ((tiles_since_last_split_ / tiles_per_row) * tile_extent);
if (tiles_since_last_split_ % tiles_per_row == 0 && end != start_to_use) {
// We are at the finish of the row, subtract 1 from the end so that
// we dont go to the next range
end--;
}

rows_written_ = tiles_written_ / tiles_per_row;

// Add range
Range range(&start_, &end, sizeof(int));
Range range(&start_to_use, &end, sizeof(int));
nd.emplace_back(range);

// For the rest of the dims, use their domains as ranges. No split there.
Expand All @@ -1543,8 +1581,21 @@ NDRange GlobalOrderWriter::ndranges_after_split(
}

// add rows written to the cache
rows_written_ += rows_of_tiles_to_write;
start_ = end + 1;
if (tile_num != num) {
// change both signed and unsigned. We could use an if here but it will just
// add complexity.
if (datatype_is_unsigned(dim->type())) {
u_start_ = end + 1;
end = u_start_;
} else {
start_ = end + 1;
end = start_;
}

tiles_since_last_split_ = 0;
}

is_last_range = end == end_to_use;

return nd;
}
Expand Down
40 changes: 37 additions & 3 deletions tiledb/sm/query/writers/global_order_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,23 +227,56 @@ class GlobalOrderWriter : public WriterBase {
*/
uint64_t tiles_in_current_row_;

/**
* The total number of tiles written. It is only being used when consolidating
* Dense arrays where the result can not fit into one fragment only
*/
uint64_t tiles_written_;

/**
* The number of tiles written since the last Dense domain split. It is only
* being used when consolidating Dense arrays where the result can not fit
* into one fragment only
*/
uint64_t tiles_since_last_split_;

/**
* This is the unsigned start for the dim range in case we need to split in
* multiple fragments in Dense arrays
*/
uint64_t u_start_;

/**
* This is the start for the dim range in case we need to split in multiple
* fragments in Dense arrays
*/
uint64_t start_;
int64_t start_;

/**
* This is the unsigned start for the dim range in case we need to split in
* multiple fragments in Dense arrays
*/
uint64_t u_end_;

/**
* This is the start for the dim range in case we need to split in multiple
* fragments in Dense arrays
*/
uint64_t end_;
int64_t end_;

/**
* NDRange in case we have a dense consolidation with split
*/
NDRange nd_if_dense_split_;

/**
* True if we have made at least one split in the Dense consolidation. By
* split we mean a fragment split so the result of the consolidation if we
* have n fragments is not n+1 but n+m where m is the number of fragments
* created after consolidation
*/
bool dense_with_split_;

/* ********************************* */
/* PRIVATE METHODS */
/* ********************************* */
Expand Down Expand Up @@ -422,7 +455,8 @@ class GlobalOrderWriter : public WriterBase {
* current frag
*
*/
NDRange ndranges_after_split(uint64_t num, bool reached_end_of_fragment);
NDRange ndranges_after_split(
uint64_t num, uint64_t tile_num, bool& is_last_range);

/**
* Return the number of tiles a single row can hold. More specifically, the
Expand Down

0 comments on commit 28b7809

Please sign in to comment.