Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](merge-on-write) Reduce the version not continuous logs for merge-on-write table #40946

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
if (_version != max_version + 1) {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
bool need_log = (config::publish_version_gap_logging_threshold < 0 ||
max_version + config::publish_version_gap_logging_threshold >= _version);
if (need_log) {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
}
auto error_st =
Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>("version not continuous");
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,10 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DEFINE_mInt32(publish_version_gap_logging_threshold, "200");

// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,10 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
DECLARE_mInt32(publish_version_gap_logging_threshold);

// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
Expand Down
26 changes: 16 additions & 10 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,22 @@ Status EnginePublishVersionTask::execute() {
int64_t missed_version = max_version + 1;
int64_t missed_txn_id = _engine.txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second, tablet->tablet_id(),
_publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
bool need_log =
(config::publish_version_gap_logging_threshold < 0 ||
max_version + config::publish_version_gap_logging_threshold >=
version.second);
if (need_log) {
auto msg = fmt::format(
"uniq key with merge-on-write version not continuous, "
"missed version={}, it's transaction_id={}, current publish "
"version={}, tablet_id={}, transaction_id={}",
missed_version, missed_txn_id, version.second,
tablet->tablet_id(), _publish_version_req.transaction_id);
if (first_time_update) {
LOG(INFO) << msg;
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
}
};
// The versions during the schema change period need to be also continuous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ public class Config extends ConfigBase {
"print log interval for publish transaction failed interval"})
public static long publish_fail_log_interval_second = 5 * 60;

@ConfField(mutable = true, masterOnly = true, description = {"一个 PUBLISH_VERSION 任务打印失败日志的次数上限",
"the upper limit of failure logs of PUBLISH_VERSION task"})
public static long publish_version_task_failed_log_threshold = 80;

@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
Expand Down
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
Expand Down Expand Up @@ -88,11 +89,13 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
// check task status
// retry task by report process
TStatus taskStatus = request.getTaskStatus();
TTaskType taskType = request.getTaskType();
long signature = request.getSignature();
if (LOG.isDebugEnabled()) {
LOG.debug("get task report: {}", request);
}

if (taskStatus.getStatusCode() != TStatusCode.OK) {
if (taskStatus.getStatusCode() != TStatusCode.OK && taskType != TTaskType.PUBLISH_VERSION) {
LOG.warn("finish task reports bad. request: {}", request);
}

Expand All @@ -111,8 +114,6 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
}

long backendId = backend.getId();
TTaskType taskType = request.getTaskType();
long signature = request.getSignature();

AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature);
if (task == null) {
Expand All @@ -130,6 +131,13 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
} else {
if (taskStatus.getStatusCode() != TStatusCode.OK) {
task.failed();
if (taskType == TTaskType.PUBLISH_VERSION) {
boolean needLog = (Config.publish_version_task_failed_log_threshold < 0
|| task.getFailedTimes() <= Config.publish_version_task_failed_log_threshold);
if (needLog) {
LOG.warn("finish task reports bad. request: {}", request);
}
}
String errMsg = "task type: " + taskType + ", status_code: " + taskStatus.getStatusCode().toString()
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
Expand Down
Loading