Skip to content

Commit

Permalink
opt
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Sep 18, 2024
1 parent d3ea49c commit a4d2f6a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 18 deletions.
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
if (_version != max_version + 1) {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
bool need_log = (config::publish_version_gap_logging_threshold < 0 ||
max_version + config::publish_version_gap_logging_threshold >= _version);
if (need_log) {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
}
auto error_st =
Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>("version not continuous");
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,10 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DEFINE_mInt32(publish_version_gap_logging_threshold, "200")

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,10 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DECLARE_mInt32(publish_version_gap_logging_threshold);

// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
Expand Down
26 changes: 16 additions & 10 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,22 @@ Status EnginePublishVersionTask::execute() {
int64_t missed_version = max_version + 1;
int64_t missed_txn_id = _engine.txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second, tablet->tablet_id(),
_publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
bool need_log =
(config::publish_version_gap_logging_threshold < 0 ||
max_version + config::publish_version_gap_logging_threshold >=
version.second);
if (need_log) {
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second,
tablet->tablet_id(), _publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
}
};
// The versions during the schema change period need to be also continuous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,9 @@ public class Config extends ConfigBase {
"print log interval for publish transaction failed interval"})
public static long publish_fail_log_interval_second = 5 * 60;

@ConfField(mutable = true, masterOnly = true, description = {"一个 PUBLISH_VERSION 任务失败打印日志次数上限",
"print log interval for publish transaction failed interval"})
public static long publish_version_task_failed_times_log_threshold = 100;
@ConfField(mutable = true, masterOnly = true, description = {"一个 PUBLISH_VERSION 任务打印失败日志的次数上限",
"the upper limit of failure logs of PUBLISH_VERSION task"})
public static long publish_version_task_failed_log_threshold = 80;

@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;

import com.amazonaws.services.glue.model.TaskType;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -131,8 +131,12 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
} else {
if (taskStatus.getStatusCode() != TStatusCode.OK) {
task.failed();
if (taskType == TTaskType.PUBLISH_VERSION && task.getFailedTimes() < ) {
LOG.warn("finish task reports bad. request: {}", request);
if (taskType == TTaskType.PUBLISH_VERSION) {
boolean needLog = (Config.publish_version_task_failed_log_threshold < 0
|| task.getFailedTimes() <= Config.publish_version_task_failed_log_threshold);
if (needLog) {
LOG.warn("finish task reports bad. request: {}", request);
}
}
String errMsg = "task type: " + taskType + ", status_code: " + taskStatus.getStatusCode().toString()
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
Expand Down

0 comments on commit a4d2f6a

Please sign in to comment.