Skip to content

Commit

Permalink
[fix](binlog-load) binlog load fails because txn exceeds the default …
Browse files Browse the repository at this point in the history
…value (#9471)

binlog load Because txn exceeds the default value, resume is a failure,
and a friendly prompt message is given to the user, instead of prompting success now,
it still fails after a while, and the user will feel inexplicable
Issue Number: close #9468
  • Loading branch information
hf200012 authored and morningman committed May 16, 2022
1 parent 0c73dc8 commit e5c46b7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
Expand All @@ -41,6 +42,7 @@
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -121,53 +123,69 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
TStreamLoadPutRequest request = null;
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
TStreamLoadPutRequest request = null;
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
db.getId(), txnId).getAuthCode();
request = new TStreamLoadPutRequest()
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
.setColumns(targetColumn);
txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
txnEntry.setLabel(label);
txnExecutor.setTxnId(txnId);
} catch (DuplicatedRequestException e) {
LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
txnEntry.setLabel(label);
txnExecutor.setTxnId (txnId);
} catch (DuplicatedRequestException e) {
LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
txnExecutor.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
// this happens when channel re-consume same batch, we should just pass through it without begin a new txn
LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
return;
} catch (AnalysisException | BeginTransactionException e) {
LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
throw e;
} catch (UserException e) {
LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
throw e;
}
try {
// async exec begin transaction
long txnId = txnExecutor.getTxnId();
if (txnId != -1L) {
this.txnExecutor.beginTransaction(request);
LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
txnExecutor.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
// this happens when channel re-consume same batch,
// we should just pass through it without begin a new txn
LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}",
id, label, targetTable, batchId);
return;
} catch (AnalysisException | BeginTransactionException e) {
LOG.warn ("encounter an error when beginning txn in channel {}, table: {}",
id, targetTable);
throw e;
} catch (UserException e) {
LOG.warn ("encounter an error when creating plan in channel {}, table: {}",
id, targetTable);
throw e;
}
} catch (TException e) {
LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
try {
// async exec begin transaction
long txnId = txnExecutor.getTxnId();
if ( txnId != - 1L ) {
this.txnExecutor.beginTransaction (request);
LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}",
id, targetTable, label, txnExecutor.getTxnId());
}
} catch ( TException e) {
LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch ( TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
throw e;
}
} else {
String failMsg = "current running txns on db " + db.getId() + " is "
+ databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected Set<Long> unprotectedGetTxnIdsByLabel(String label) {
return labelToTxnIds.get(label);
}

protected int getRunningTxnNums() {
public int getRunningTxnNums() {
return runningTxnNums;
}

Expand Down

0 comments on commit e5c46b7

Please sign in to comment.