Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -129,6 +130,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -230,6 +232,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in,
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down Expand Up @@ -285,6 +288,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
if (in.has_schema_version()) {
Expand Down
10 changes: 10 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,12 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory, "true");

DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");

// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported.
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DEFINE_mBool(random_segments_key_bounds_truncation, "false");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down Expand Up @@ -1943,6 +1949,10 @@ Status set_fuzzy_configs() {
fuzzy_field_and_value["string_overflow_size"] =
((distribution(*generator) % 2) == 0) ? "10" : "4294967295";

std::uniform_int_distribution<int64_t> distribution2(-2, 10);
fuzzy_field_and_value["segments_key_bounds_truncation_threshold"] =
std::to_string(distribution2(*generator));

fmt::memory_buffer buf;
for (auto& it : fuzzy_field_and_value) {
const auto& field = it.first;
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,12 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory);

DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);

// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported.
DECLARE_mInt32(segments_key_bounds_truncation_threshold);
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DECLARE_mBool(random_segments_key_bounds_truncation);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
16 changes: 9 additions & 7 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/key_util.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -476,21 +477,22 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
RowLocation loc;

for (size_t i = 0; i < specified_rowsets.size(); i++) {
auto& rs = specified_rowsets[i];
auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
int num_segments = rs->num_segments();
const auto& rs = specified_rowsets[i];
std::vector<KeyBoundsPB> segments_key_bounds;
rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
int num_segments = static_cast<int>(rs->num_segments());
DCHECK_EQ(segments_key_bounds.size(), num_segments);
std::vector<uint32_t> picked_segments;
for (int i = num_segments - 1; i >= 0; i--) {
for (int j = num_segments - 1; j >= 0; j--) {
// If mow table has cluster keys, the key bounds is short keys, not primary keys
// use PrimaryKeyIndexMetaPB in primary key index?
if (schema->cluster_key_idxes().empty()) {
if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 ||
key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) {
if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
rs->rowset_meta()->is_segments_key_bounds_truncated())) {
continue;
}
}
picked_segments.emplace_back(i);
picked_segments.emplace_back(j);
}
if (picked_segments.empty()) {
continue;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ class BaseTablet {
RowsetSharedPtr rowset, const TupleDescriptor* desc,
OlapReaderStatistics& stats, std::string& values,
bool write_to_cache = false);

// Lookup the row location of `encoded_key`, the function sets `row_location` on success.
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Expand Down
17 changes: 11 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ using namespace ErrorCode;

namespace {

bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) {
bool is_rowset_tidy(std::string& pre_max_key, bool& pre_rs_key_bounds_truncated,
const RowsetSharedPtr& rhs) {
size_t min_tidy_size = config::ordered_data_compaction_min_segment_size;
if (rhs->num_segments() == 0) {
return true;
Expand All @@ -107,11 +108,13 @@ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) {
if (!ret) {
return false;
}
if (min_key <= pre_max_key) {
bool cur_rs_key_bounds_truncated {rhs->is_segments_key_bounds_truncated()};
if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_max_key}, pre_rs_key_bounds_truncated,
Slice {min_key}, cur_rs_key_bounds_truncated)) {
return false;
}
CHECK(rhs->last_key(&pre_max_key));

pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
return true;
}

Expand Down Expand Up @@ -292,12 +295,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
<< ", output_version=" << _output_version;
// link data to new rowset
auto seg_id = 0;
bool segments_key_bounds_truncated {false};
std::vector<KeyBoundsPB> segment_key_bounds;
for (auto rowset : _input_rowsets) {
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
_output_rs_writer->rowset_id(), seg_id));
seg_id += rowset->num_segments();

segments_key_bounds_truncated |= rowset->is_segments_key_bounds_truncated();
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
Expand All @@ -312,7 +316,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
rowset_meta->set_num_segments(_input_num_segments);
rowset_meta->set_segments_overlap(NONOVERLAPPING);
rowset_meta->set_rowset_state(VISIBLE);

rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
rowset_meta->set_segments_key_bounds(segment_key_bounds);
_output_rowset = _output_rs_writer->manual_build(rowset_meta);
return Status::OK();
Expand Down Expand Up @@ -392,8 +396,9 @@ bool CompactionMixin::handle_ordered_data_compaction() {
// files to handle compaction
auto input_size = _input_rowsets.size();
std::string pre_max_key;
bool pre_rs_key_bounds_truncated {false};
for (auto i = 0; i < input_size; ++i) {
if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) {
if (!is_rowset_tidy(pre_max_key, pre_rs_key_bounds_truncated, _input_rowsets[i])) {
if (i <= input_size / 2) {
return false;
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());

rowset_meta.set_segments_key_bounds_truncated(
spec_rowset_meta.is_segments_key_bounds_truncated());
std::vector<KeyBoundsPB> segments_key_bounds;
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
rowset_meta.set_segments_key_bounds(segments_key_bounds);
Expand Down Expand Up @@ -679,6 +680,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_num_segment += static_cast<int32_t>(rowset->num_segments());
// append key_bounds to current rowset
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
_segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();

// TODO update zonemap
if (rowset->rowset_meta()->has_delete_predicate()) {
Expand Down Expand Up @@ -888,6 +890,9 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
for (auto& key_bound : _segments_encoded_key_bounds) {
segments_encoded_key_bounds.push_back(key_bound);
}
if (_segments_key_bounds_truncated.has_value()) {
rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
}
// segment key bounds are empty in old version(before version 1.2.x). So we should not modify
// the overlap property when key bounds are empty.
if (!segments_encoded_key_bounds.empty() &&
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,10 @@ class BaseBetaRowsetWriter : public RowsetWriter {
// record rows number of every segment already written, using for rowid
// conversion when compaction in unique key with MoW model
std::vector<uint32_t> _segment_num_rows;

// for unique key table with merge-on-write
std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
std::optional<bool> _segments_key_bounds_truncated;

// counters and statistics maintained during add_rowset
std::atomic<int64_t> _num_rows_written;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
return true;
}

bool is_segments_key_bounds_truncated() const {
return _rowset_meta->is_segments_key_bounds_truncated();
}

bool check_rowset_segment();

[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
Expand Down
31 changes: 31 additions & 0 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>

#include <memory>
#include <random>

#include "common/logging.h"
#include "google/protobuf/util/message_differencer.h"
Expand Down Expand Up @@ -220,13 +221,43 @@ int64_t RowsetMeta::segment_file_size(int seg_id) {
: -1;
}

void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
}

int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
if (config::random_segments_key_bounds_truncation) {
static thread_local std::mt19937 generator(std::random_device {}());
std::uniform_int_distribution<int> distribution(-10, 40);
truncation_threshold = distribution(generator);
}
bool really_do_truncation {false};
if (truncation_threshold > 0) {
for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) {
if (segment_key_bounds.min_key().size() > truncation_threshold) {
really_do_truncation = true;
segment_key_bounds.mutable_min_key()->resize(truncation_threshold);
}
if (segment_key_bounds.max_key().size() > truncation_threshold) {
really_do_truncation = true;
segment_key_bounds.mutable_max_key()->resize(truncation_threshold);
}
}
}
set_segments_key_bounds_truncated(really_do_truncation || is_segments_key_bounds_truncated());
}

void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
set_num_segments(num_segments() + other.num_segments());
set_num_rows(num_rows() + other.num_rows());
set_data_disk_size(data_disk_size() + other.data_disk_size());
set_total_disk_size(total_disk_size() + other.total_disk_size());
set_index_disk_size(index_disk_size() + other.index_disk_size());
set_total_disk_size(data_disk_size() + index_disk_size());
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
other.is_segments_key_bounds_truncated());
for (auto&& key_bound : other.get_segments_key_bounds()) {
add_segment_key_bounds(key_bound);
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <vector>

#include "common/config.h"
#include "io/fs/file_system.h"
#include "olap/metadata_adder.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -299,6 +300,15 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {

auto& get_segments_key_bounds() const { return _rowset_meta_pb.segments_key_bounds(); }

bool is_segments_key_bounds_truncated() const {
return _rowset_meta_pb.has_segments_key_bounds_truncated() &&
_rowset_meta_pb.segments_key_bounds_truncated();
}

void set_segments_key_bounds_truncated(bool truncated) {
_rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
}

bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
// for compatibility, old version has not segment key bounds
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
Expand All @@ -316,12 +326,7 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
return true;
}

void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
}
}
void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds);

void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
*_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ Status IndexBuilder::update_inverted_index_info() {
rowset_meta->set_rowset_state(input_rowset_meta->rowset_state());
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
rowset_meta->set_segments_key_bounds_truncated(
input_rowset_meta->is_segments_key_bounds_truncated());
rowset_meta->set_segments_key_bounds(key_bounds);
auto output_rowset = output_rs_writer->manual_build(rowset_meta);
if (input_rowset_meta->has_delete_predicate()) {
Expand Down
32 changes: 32 additions & 0 deletions be/src/util/key_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "util/key_util.h"

namespace doris {

bool key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds,
bool is_segments_key_bounds_truncated) {
Slice maybe_truncated_min_key {segment_key_bounds.min_key()};
Slice maybe_truncated_max_key {segment_key_bounds.max_key()};
bool res1 = Slice::lhs_is_strictly_less_than_rhs(key, false, maybe_truncated_min_key,
is_segments_key_bounds_truncated);
bool res2 = Slice::lhs_is_strictly_less_than_rhs(maybe_truncated_max_key,
is_segments_key_bounds_truncated, key, false);
return res1 || res2;
}
} // namespace doris
11 changes: 6 additions & 5 deletions be/src/util/key_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

#pragma once

#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>

#include <cstdint>
#include <iterator>
#include <string>
#include <vector>

#include "common/status.h"
#include "util/debug_util.h"
#include "util/faststring.h"
#include "util/slice.h"

namespace doris {
Expand Down Expand Up @@ -111,4 +107,9 @@ void encode_key(std::string* buf, const RowType& row, size_t num_keys) {
}
}

// we can only know if a key is excluded from the segment
// based on strictly order compare result with segments key bounds
bool key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds,
bool is_segments_key_bounds_truncated);

} // namespace doris
Loading
Loading