Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pick](branch-2.0) Pick from branch-2.0 #27602

Merged
merged 33 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4ba7912
[fix](stats) Fix update rows for unique table didn't get updated prop…
Kikyou1997 Nov 22, 2023
c7f729a
[FIX](jsonb) fix jsonb in predict column #27325 (#27424)
amorynan Nov 22, 2023
ca96a2e
[fix](fe) slots in having clause should be set to need materialized(#…
mrhhsg Nov 22, 2023
d111905
[Bug](insert)fix insert wrong data on mv when stmt have multiple valu…
BiteTheDDDDt Nov 22, 2023
9c49c2d
[fix](fe ut) Fix OlapQueryCacheTest failed (#27305) (#27406)
xinyiZzz Nov 23, 2023
43e371b
[regression test](schema change) add some schema change regression ca…
luwei16 Nov 23, 2023
a171f02
[fix](Nereids) result type of add precision is 1 more than expected (…
morrySnow Nov 23, 2023
3d04663
[fix](Nereids): fill miss slot in having subquery (#27177) (#27394)
keanji-x Nov 23, 2023
c1a71d4
[fix](memory) Fix make_top_consumption_snapshots heap-use-after-free …
xinyiZzz Nov 23, 2023
ae7340d
[fix](function) make TIMESTAMP function DEPEND_ON_ARGUMENT (#27343) (…
Mryange Nov 23, 2023
6bb2897
[fix](test) order by clause in test_map(#27390) (#27391)
mrhhsg Nov 23, 2023
ff117fd
[performance](Planner): optimize getStringValue() in DateLiteral (#27…
jackwener Nov 23, 2023
63c46e7
[Chore](pick) do not push down agg on aggregate column (#27356) (#27498)
BiteTheDDDDt Nov 23, 2023
9ccde8c
[fix](stats) table not exists error msg not print objects name #27074…
Kikyou1997 Nov 23, 2023
95f4819
[improve](nereids) support agg function of count(const value) pushdow…
zhangstar333 Nov 23, 2023
89c2f68
[test](fe-ut) fix unstable MysqlServerTest (#27459)
morningman Nov 23, 2023
623f0a9
[opt](MergedIO) no need to merge large columns (#27315) (#27497)
AshinGau Nov 23, 2023
b15f3d6
[improvement](drop tablet) impr gc shutdown tablet lock (#26151) (#2…
yujun777 Nov 23, 2023
9949410
[doc](stats) SQL manual for stats (#27461)
Kikyou1997 Nov 23, 2023
723ac1d
[chore](merge-on-write) disable rowid conversion check for mow table …
liaoxin01 Nov 24, 2023
8556dbb
[fix](regression)Fix hive p2 case (#27466) (#27511)
Jibing-Li Nov 24, 2023
f7dce06
[fix](statistics)Fix auto analyze remove finished job bug #27486 (#27…
Jibing-Li Nov 24, 2023
5b50f73
[Bug](bitmap) Fix heap-use-after-free in the bitmap functions #27411 …
xy720 Nov 24, 2023
91d1381
[Pick](nereids) Pick: partition prune fails in case of NOT expression…
englefly Nov 24, 2023
dbf30df
[fix](clone) Fix engine_clone file exist (#27361) (#27536)
JackDrogon Nov 24, 2023
4d1aaaa
[chore](case) adjust timeout of broker load case #27540
hello-stephen Nov 24, 2023
7781a44
Fix auto analyze doesn't filter unsupported type bug. (#27547)
Jibing-Li Nov 24, 2023
f519514
[chore](fe plugin) Upgrade dependency to doris 2.0-SNAPSHOT #27522 (#…
zhiqiang-hhhh Nov 24, 2023
850e0db
[Bug](materialized-view) add limitation for duplicate expr on materia…
BiteTheDDDDt Nov 24, 2023
f4b5823
[fix](planner)join node should output required slot from parent node …
starocean999 Nov 24, 2023
b19d966
[branch-2.0](hive) enable hive view by default (#27550)
morningman Nov 24, 2023
7ea2ba5
[pick](nereids) adjust bc join and shuffle join #27113 (#27566)
englefly Nov 25, 2023
e41d995
[Fix](hive-transactional-table) Fix NPE when query empty hive transac…
kaka11chen Nov 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,8 @@ DEFINE_mInt64(lookup_connection_cache_bytes_limit, "4294967296");
DEFINE_mInt64(LZ4_HC_compression_level, "9");

DEFINE_mBool(enable_merge_on_write_correctness_check, "true");
// rowid conversion correctness check when compaction for mow table
DEFINE_mBool(enable_rowid_conversion_correctness_check, "false");

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,8 @@ DECLARE_mDouble(ratio_of_defaults_as_sparse_column);
DECLARE_mInt64(threshold_rows_to_estimate_sparse_column);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// rowid conversion correctness check when compaction for mow table
DECLARE_mBool(enable_rowid_conversion_correctness_check);

// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ inline std::string Status::to_string() const {
template <typename T>
using Result = expected<T, Status>;

using ResultError = unexpected<Status>;

#define RETURN_IF_ERROR_RESULT(stmt) \
do { \
Status _status_ = (stmt); \
Expand Down
10 changes: 5 additions & 5 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
_statistics.merged_io++;
_statistics.request_bytes += *bytes_read;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
return st;
}
if (offset + result.size > _random_access_ranges[range_index].end_offset) {
Expand All @@ -69,10 +69,10 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
if (cached_data.contains(offset)) {
// has cached data in box
_read_in_box(cached_data, offset, result, &has_read);
_statistics.request_bytes += has_read;
if (has_read == result.size) {
// all data is read in cache
*bytes_read = has_read;
_statistics.request_bytes += has_read;
return Status::OK();
}
} else if (!cached_data.empty()) {
Expand All @@ -92,7 +92,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -187,7 +187,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
*bytes_read = has_read + read_size;
_statistics.merged_io++;
_statistics.request_bytes += read_size;
_statistics.read_bytes += read_size;
_statistics.merged_bytes += read_size;
return Status::OK();
}

Expand Down Expand Up @@ -315,7 +315,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
RETURN_IF_ERROR(
_reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.read_bytes += *bytes_read;
_statistics.merged_bytes += *bytes_read;
}

SCOPED_RAW_TIMER(&_statistics.copy_time);
Expand Down
16 changes: 12 additions & 4 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MergeRangeFileReader : public io::FileReader {
int64_t request_io = 0;
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t read_bytes = 0;
int64_t merged_bytes = 0;
int64_t apply_bytes = 0;
};

struct RangeCachedData {
Expand Down Expand Up @@ -148,6 +149,9 @@ class MergeRangeFileReader : public io::FileReader {
// Equivalent min size of each IO that can reach the maximum storage speed limit:
// 512KB for oss, 4KB for hdfs
_equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER(_profile, random_profile);
Expand All @@ -157,7 +161,9 @@ class MergeRangeFileReader : public io::FileReader {
_merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT, random_profile);
_request_bytes =
ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES, random_profile);
_read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_merged_bytes =
ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
_apply_bytes = ADD_CHILD_COUNTER(_profile, "ApplyBytes", TUnit::BYTES, random_profile);
}
}

Expand All @@ -182,7 +188,8 @@ class MergeRangeFileReader : public io::FileReader {
COUNTER_UPDATE(_request_io, _statistics.request_io);
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
}
}
return Status::OK();
Expand Down Expand Up @@ -218,7 +225,8 @@ class MergeRangeFileReader : public io::FileReader {
RuntimeProfile::Counter* _request_io;
RuntimeProfile::Counter* _merged_io;
RuntimeProfile::Counter* _request_bytes;
RuntimeProfile::Counter* _read_bytes;
RuntimeProfile::Counter* _merged_bytes;
RuntimeProfile::Counter* _apply_bytes;

int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,9 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
}
location_map.clear();

{
Expand Down Expand Up @@ -753,7 +755,9 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
}

_tablet->merge_delete_bitmap(output_rowset_delete_bitmap);
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ Status ColumnReader::get_row_ranges_by_zone_map(
}

Status ColumnReader::next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) const {
if (_segment_zone_map == nullptr) {
return Status::InternalError("segment zonemap not exist");
}
// TODO: this work to get min/max value seems should only do once
FieldType type = _type_info->type();
std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length));
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/rowset/segment_v2/zone_map_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ void TypedZoneMapIndexWriter<Type>::reset_page_zone_map() {
_page_zone_map.pass_all = true;
}

template <PrimitiveType Type>
void TypedZoneMapIndexWriter<Type>::reset_segment_zone_map() {
_segment_zone_map.pass_all = true;
}

template <PrimitiveType Type>
Status TypedZoneMapIndexWriter<Type>::flush() {
// Update segment zone map.
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/segment_v2/zone_map_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ class ZoneMapIndexWriter {
virtual uint64_t size() const = 0;

virtual void reset_page_zone_map() = 0;
virtual void reset_segment_zone_map() = 0;
};

// Zone map index is represented by an IndexedColumn with ordinal index.
Expand All @@ -120,7 +119,6 @@ class TypedZoneMapIndexWriter final : public ZoneMapIndexWriter {
uint64_t size() const override { return _estimated_size; }

void reset_page_zone_map() override;
void reset_segment_zone_map() override;

private:
void _reset_zone_map(ZoneMap* zone_map) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ vectorized::IColumn::MutablePtr Schema::get_predicate_column_ptr(const Field& fi
break;
case FieldType::OLAP_FIELD_TYPE_VARCHAR:
case FieldType::OLAP_FIELD_TYPE_STRING:
case FieldType::OLAP_FIELD_TYPE_JSONB:
if (config::enable_low_cardinality_optimize && reader_type == ReaderType::READER_QUERY) {
ptr = doris::vectorized::ColumnDictionary<doris::vectorized::Int32>::create(
field.type());
Expand Down
44 changes: 32 additions & 12 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,24 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool in
TabletSharedPtr tablet;
tablet = _get_tablet_unlocked(tablet_id);
if (tablet == nullptr && include_deleted) {
std::shared_lock rdlock(_shutdown_tablets_lock);
for (auto& deleted_tablet : _shutdown_tablets) {
CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr";
if (deleted_tablet->tablet_id() == tablet_id) {
tablet = deleted_tablet;
break;
{
std::shared_lock rdlock(_shutdown_tablets_lock);
for (auto& deleted_tablet : _shutdown_tablets) {
CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr";
if (deleted_tablet->tablet_id() == tablet_id) {
tablet = deleted_tablet;
break;
}
}
}
if (tablet == nullptr) {
std::shared_lock rdlock(_shutdown_deleting_tablets_lock);
for (auto& deleted_tablet : _shutdown_deleting_tablets) {
CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr";
if (deleted_tablet->tablet_id() == tablet_id) {
tablet = deleted_tablet;
break;
}
}
}
}
Expand Down Expand Up @@ -1065,9 +1077,17 @@ Status TabletManager::start_trash_sweep() {
clean_num = 0;
// should get write lock here, because it will remove tablet from shut_down_tablets
// and get tablet will access shut_down_tablets
std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock);
auto it = _shutdown_tablets.begin();
while (it != _shutdown_tablets.end()) {
{
std::lock_guard<std::shared_mutex> wrlock1(_shutdown_tablets_lock);
std::lock_guard<std::shared_mutex> wrlock2(_shutdown_deleting_tablets_lock);
for (const auto& tablet : _shutdown_tablets) {
_shutdown_deleting_tablets.push_back(tablet);
}
_shutdown_tablets.clear();
}
std::lock_guard<std::shared_mutex> wrlock(_shutdown_deleting_tablets_lock);
auto it = _shutdown_deleting_tablets.begin();
while (it != _shutdown_deleting_tablets.end()) {
// check if the meta has the tablet info and its state is shutdown
if (it->use_count() > 1) {
// it means current tablet is referenced by other thread
Expand All @@ -1086,7 +1106,7 @@ Status TabletManager::start_trash_sweep() {
<< " old tablet_uid=" << (*it)->tablet_uid()
<< " cur tablet_uid=" << tablet_meta->tablet_uid();
// remove it from list
it = _shutdown_tablets.erase(it);
it = _shutdown_deleting_tablets.erase(it);
continue;
}
// move data to trash
Expand Down Expand Up @@ -1115,7 +1135,7 @@ Status TabletManager::start_trash_sweep() {
<< "tablet_id=" << (*it)->tablet_id()
<< ", schema_hash=" << (*it)->schema_hash()
<< ", tablet_path=" << tablet_path;
it = _shutdown_tablets.erase(it);
it = _shutdown_deleting_tablets.erase(it);
++clean_num;
} else {
// if could not find tablet info in meta store, then check if dir existed
Expand All @@ -1135,7 +1155,7 @@ Status TabletManager::start_trash_sweep() {
<< "tablet_id=" << (*it)->tablet_id()
<< ", schema_hash=" << (*it)->schema_hash()
<< ", tablet_path=" << tablet_path;
it = _shutdown_tablets.erase(it);
it = _shutdown_deleting_tablets.erase(it);
}
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ class TabletManager {
std::map<int64_t, std::set<TabletInfo>> _partition_tablet_map;
std::vector<TabletSharedPtr> _shutdown_tablets;

// gc thread will move _shutdown_tablets to _shutdown_deleting_tablets
std::shared_mutex _shutdown_deleting_tablets_lock;
std::list<TabletSharedPtr> _shutdown_deleting_tablets;

std::mutex _tablet_stat_cache_mutex;
std::shared_ptr<std::vector<TTabletStat>> _tablet_stat_list_cache =
std::make_shared<std::vector<TTabletStat>>();
Expand Down
79 changes: 72 additions & 7 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,58 @@ using strings::SkipWhitespace;
namespace doris {
using namespace ErrorCode;

namespace {
/// if binlog file exist, then check if binlog file md5sum equal
/// if equal, then skip link file
/// if not equal, then return error
/// return value: if binlog file not exist, then return to binlog file path
Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
const std::string& clone_file, bool* skip_link_file) {
// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);

// check to to file exist
bool exists = true;
auto status = io::global_local_filesystem()->exists(to, &exists);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (!exists) {
return to;
}

LOG(WARNING) << "binlog file already exist. "
<< "tablet_dir=" << tablet_dir << ", clone_file=" << clone_file;

std::string clone_file_md5sum;
status = io::global_local_filesystem()->md5sum(clone_file, &clone_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}
std::string to_file_md5sum;
status = io::global_local_filesystem()->md5sum(to, &to_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (clone_file_md5sum == to_file_md5sum) {
// if md5sum equal, then skip link file
*skip_link_file = true;
return to;
} else {
auto err_msg = fmt::format(
"binlog file already exist, but md5sum not equal. "
"tablet_dir={}, clone_file={}",
tablet_dir, clone_file);
LOG(WARNING) << err_msg;
return ResultError(Status::InternalError(std::move(err_msg)));
}
}
} // namespace

#define RETURN_IF_ERROR_(status, stmt) \
do { \
status = (stmt); \
Expand Down Expand Up @@ -603,6 +655,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
/// Traverse all downloaded clone files in CLONE dir.
/// If it does not exist in local tablet dir, link the file to local tablet dir
/// And save all linked files in linked_success_files.
/// if binlog exist in clone dir and md5sum equal, then skip link file
bool skip_link_file = false;
for (const string& clone_file : clone_file_names) {
if (local_file_names.find(clone_file) != local_file_names.end()) {
VLOG_NOTICE << "find same file when clone, skip it. "
Expand All @@ -619,19 +673,30 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
break;
}

// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
if (auto&& result = check_dest_binlog_valid(tablet_dir, clone_file, &skip_link_file);
result) {
to = std::move(result.value());
} else {
status = std::move(result.error());
return status;
}
} else {
to = fmt::format("{}/{}", tablet_dir, clone_file);
}

RETURN_IF_ERROR(io::global_local_filesystem()->link_file(from, to));
linked_success_files.emplace_back(std::move(to));
if (!skip_link_file) {
status = io::global_local_filesystem()->link_file(from, to);
if (!status.ok()) {
return status;
}
linked_success_files.emplace_back(std::move(to));
}
}
if (contain_binlog) {
RETURN_IF_ERROR(tablet->ingest_binlog_metas(&rowset_binlog_metas_pb));
status = tablet->ingest_binlog_metas(&rowset_binlog_metas_pb);
if (!status.ok()) {
return status;
}
}

// clone and compaction operation should be performed sequentially
Expand Down
Loading
Loading