Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.0](txn) be dead exceeds 5min abort its txns (#22781, #28662, #35342) #39317

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
Expand Down Expand Up @@ -242,6 +243,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
request.__set_backend_id(_exec_env->master_info()->backend_id);

TLoadTxnBeginResult result;
Status status;
Expand Down Expand Up @@ -374,6 +376,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx,
}

Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);

DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1);

TLoadTxnCommitRequest request;
Expand Down
10 changes: 10 additions & 0 deletions docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ Is it possible to configure dynamically: true

Whether it is a configuration item unique to the Master FE node: true

### `abort_txn_after_lost_heartbeat_time_second`

Abort transaction time after lost heartbeat. The default value is 300, which means transactions of be will be aborted after lost heartbeat 300s.

Default: 300(s)

Is it possible to configure dynamically: true

Whether it is a configuration item unique to the Master FE node: true

#### `enable_access_file_without_broker`

Default:false
Expand Down
10 changes: 10 additions & 0 deletions docs/zh-CN/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。

是否为 Master FE 节点独有的配置项:true

#### `abort_txn_after_lost_heartbeat_time_second`

丢失be心跳后丢弃be事务的时间。默认时间为三百秒,当三百秒fe没有接收到be心跳时,会丢弃该be的所有事务。

默认值:300(秒)

是否可以动态配置:true

是否为 Master FE 节点独有的配置项:true

#### `enable_access_file_without_broker`

默认值:false
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,20 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long max_backend_heartbeat_failure_tolerance_count = 1;

/**
* Abort transaction time after lost heartbeat.
* The default value is 300s, which means transactions of be will be aborted after lost heartbeat 300s.
*/
@ConfField(mutable = true, masterOnly = true)
public static int abort_txn_after_lost_heartbeat_time_second = 300;

/**
* Heartbeat interval in seconds.
* Default is 5, which means every 5 seconds, the master will send a heartbeat to all backends.
*/
@ConfField(mutable = false, masterOnly = false)
public static int heartbeat_interval_second = 5;

/**
* The iceberg and hudi table will be removed in v1.3
* Use multi catalog instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -358,7 +359,9 @@ public void analyze(Analyzer analyzer) throws UserException {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class ClientPool {
static GenericKeyedObjectPoolConfig heartbeatConfig = new GenericKeyedObjectPoolConfig();
static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000;
static int heartbeatTimeoutMs = Config.heartbeat_interval_second * 1000;

static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class FeConstants {
public static int shortkey_max_column_count = 3;
public static int shortkey_maxsize_bytes = 36;

public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes

// dpp version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
Expand Down Expand Up @@ -229,6 +230,11 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) {
}

private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.QueryStateException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand Down Expand Up @@ -245,7 +246,9 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
// begin txn here and generate txn id
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(olapTable.getId()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -104,7 +105,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
Expand Down Expand Up @@ -198,7 +199,9 @@ public void beginTxn()
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -206,7 +207,9 @@ public boolean beginTxn() throws UserException {
try {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
Expand Down Expand Up @@ -133,8 +134,10 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
Expand Down Expand Up @@ -80,7 +81,9 @@ public Transaction(ConnectContext ctx, Database database, Table table, String la
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
this.createAt = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -176,7 +175,7 @@ public static void addToBlacklist(Long backendID, String reason) {
return;
}

blacklistBackends.put(backendID, Pair.of(FeConstants.heartbeat_interval_second + 1, reason));
blacklistBackends.put(backendID, Pair.of(Config.heartbeat_interval_second + 1, reason));
LOG.warn("add backend {} to black list. reason: {}", backendID, reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
Expand Down Expand Up @@ -1908,9 +1909,10 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ public class ExecuteEnv {
private static volatile ExecuteEnv INSTANCE;
private MultiLoadMgr multiLoadMgr;
private ConnectScheduler scheduler;
private long startupTime;

private ExecuteEnv() {
multiLoadMgr = new MultiLoadMgr();
scheduler = new ConnectScheduler(Config.qe_max_connection);
startupTime = System.currentTimeMillis();
}

public static ExecuteEnv getInstance() {
Expand All @@ -50,4 +52,9 @@ public ConnectScheduler getScheduler() {
public MultiLoadMgr getMultiLoadMgr() {
return multiLoadMgr;
}

public long getStartupTime() {
return startupTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1249,10 +1249,12 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());
return result;
Expand Down Expand Up @@ -1356,10 +1358,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp)
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;

Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), tableIdList, request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);

// step 7: return result
Expand Down
18 changes: 13 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class HeartbeatMgr extends MasterDaemon {
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();

public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
super("heartbeat mgr", Config.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
Expand Down Expand Up @@ -168,13 +168,21 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() != HbStatus.OK) {
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay) {
Env.getCurrentEnv().getGlobalTransactionMgr()
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
be.getId(), be.getHost(), 100);
}
}
return isChanged;
Expand Down
Loading
Loading