Skip to content

Commit

Permalink
[fix](load) check -238 segment number limit earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Aug 13, 2024
1 parent 08d0497 commit 044a125
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
37 changes: 21 additions & 16 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,26 +514,32 @@ Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, u
return Status::OK();
}

// return true if there isn't any flying segcompaction, otherwise return false
// return true if there is any flying segcompaction, otherwise return false
bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() {
return !_is_doing_segcompaction.exchange(true);
return _is_doing_segcompaction.exchange(true);
}

Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
// leave _check_and_set_is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
// if not doing segcompaction, just check segment number
if (!config::enable_segcompaction || !_context.enable_segcompaction ||
!_context.tablet_schema->cluster_key_idxes().empty() ||
_context.tablet_schema->num_variant_columns() > 0 ||
!_check_and_set_is_doing_segcompaction()) {
_context.tablet_schema->num_variant_columns() > 0) {
return _check_segment_number_limit(_num_segment);
}
// leave _check_and_set_is_doing_segcompaction as the last condition
// otherwise _segcompacting_cond will never get notified
if (_check_and_set_is_doing_segcompaction()) {
return status;
}
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
_segcompaction_status.load());
} else if ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
} else {
status = _check_segment_number_limit(_segcompacted_point);
}
if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (!segments->empty())) {
Expand Down Expand Up @@ -719,7 +725,8 @@ Status BetaRowsetWriter::_close_file_writers() {
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
RETURN_IF_ERROR(_close_file_writers());

RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
"too many segments when build new rowset");
RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
if (_is_pending) {
Expand Down Expand Up @@ -900,11 +907,10 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
return Status::OK();
}

Status BaseBetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment + 1;
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
"_num_segment:{}, rowset_num_rows:{}",
Expand All @@ -914,11 +920,10 @@ Status BaseBetaRowsetWriter::_check_segment_number_limit() {
return Status::OK();
}

Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
{ total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
{ segnum = dp->param("segnum", 1024); });
if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}",
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class BaseBetaRowsetWriter : public RowsetWriter {
Status _build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num = false);
Status _create_file_writer(const std::string& path, io::FileWriterPtr& file_writer);
virtual Status _close_file_writers();
virtual Status _check_segment_number_limit();
virtual Status _check_segment_number_limit(size_t segnum);
virtual int64_t _num_seg() const;
// build a tmp rowset for load segment to calc delete_bitmap for this segment
Status _build_tmp(RowsetSharedPtr& rowset_ptr);
Expand Down Expand Up @@ -238,7 +238,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
// segment compaction
friend class SegcompactionWorker;
Status _close_file_writers() override;
Status _check_segment_number_limit() override;
Status _check_segment_number_limit(size_t segnum) override;
int64_t _num_seg() const override;
Status _wait_flying_segcompaction();
Status _create_segment_writer_for_segcompaction(
Expand Down

0 comments on commit 044a125

Please sign in to comment.