diff --git a/be/src/common/config.h b/be/src/common/config.h index 75ed1eb79f18c2..065c1db9a95af9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -36,11 +36,11 @@ CONF_Int32(brpc_port, "8060"); // the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores CONF_Int32(brpc_num_threads, "-1") - // Declare a selection strategy for those servers have many ips. - // Note that there should at most one ip match this list. - // this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24 - // If no ip match this rule, will choose one randomly. - CONF_String(priority_networks, ""); +// Declare a selection strategy for those servers have many ips. +// Note that there should at most one ip match this list. +// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24 +// If no ip match this rule, will choose one randomly. +CONF_String(priority_networks, ""); //// //// tcmalloc gc parameter @@ -259,7 +259,7 @@ CONF_Int64(index_stream_cache_capacity, "10737418240"); // CONF_Int64(max_packed_row_block_size, "20971520"); // Cache for storage page size -CONF_String(storage_page_cache_limit, "20G"); +CONF_String(storage_page_cache_limit, "20%"); // whether to disable page cache feature in storage CONF_Bool(disable_storage_page_cache, "false"); @@ -369,7 +369,7 @@ CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. -CONF_mInt32(olap_table_sink_send_interval_ms, "10"); +CONF_mInt32(olap_table_sink_send_interval_ms, "1"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num_min, "64"); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 5a959e5092121b..05d5bf74700eba 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -257,7 +257,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; { SCOPED_TIMER(_parent->_scan_timer); - SCOPED_TIMER(_parent->_scan_cpu_timer); + SCOPED_CPU_TIMER(_parent->_scan_cpu_timer); while (true) { // Batch is full, break if (batch->is_full()) { diff --git a/be/src/olap/rowset/alpha_rowset.cpp b/be/src/olap/rowset/alpha_rowset.cpp index f1e7af3ee1918a..9ff817863e1af4 100644 --- a/be/src/olap/rowset/alpha_rowset.cpp +++ b/be/src/olap/rowset/alpha_rowset.cpp @@ -330,8 +330,7 @@ OLAPStatus AlphaRowset::init() { // table value column, so when first start the two number is not the same, // it causes start failed. When `expect_zone_maps_num > zone_maps_size` it may be the first start after upgrade if (expect_zone_maps_num > zone_maps_size) { - LOG(WARNING) - << "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is " + VLOG(1) << "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is " << expect_zone_maps_num << ", actual num is " << zone_maps_size << ". If this is not the first start after upgrade, please pay attention!"; } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 1537a172269518..c0d61ab8a4ce54 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -45,6 +45,9 @@ class KafkaLoadInfo { topic(t_info.topic), begin_offset(t_info.partition_begin_offset), properties(t_info.properties) { + // The offset(begin_offset) sent from FE is the starting offset, + // and the offset(cmt_offset) reported by BE to FE is the consumed offset, + // so we need to minus 1 here. for (auto& p : t_info.partition_begin_offset) { cmt_offset[p.first] = p.second - 1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 25501bd97eb06d..a63784becf2b63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -28,11 +28,11 @@ import org.apache.doris.thrift.TTabletStat; import org.apache.doris.thrift.TTabletStatResult; -import com.google.common.collect.ImmutableMap; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.ImmutableMap; + import java.util.List; import java.util.Map; @@ -61,7 +61,7 @@ protected void runAfterCatalogReady() { client = ClientPool.backendPool.borrowObject(address); TTabletStatResult result = client.getTabletStat(); - LOG.info("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); + LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize()); updateTabletStat(backend.getId(), result); ok = true; @@ -112,7 +112,7 @@ protected void runAfterCatalogReady() { index.setRowCount(indexRowCount); } // end for indices } // end for partitions - LOG.info("finished to set row num for table: {} in database: {}", + LOG.debug("finished to set row num for table: {} in database: {}", table.getName(), db.getFullName()); } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 656099578aa291..37d0143b78387c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -68,7 +68,8 @@ public class Config extends ConfigBase { public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static String sys_log_level = "INFO"; @ConfField public static int sys_log_roll_num = 10; - @ConfField public static String[] sys_log_verbose_modules = {"org.apache.thrift", "org.apache.doris.thrift", "org.apache.doris.http", "org.apache.doris.service.FrontendServiceImpl"}; + @ConfField + public static String[] sys_log_verbose_modules = {}; @ConfField public static String sys_log_roll_interval = "DAY"; @ConfField public static String sys_log_delete_age = "7d"; @Deprecated diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 34a5a4eef8d24a..6d704d98b06861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -19,8 +19,12 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TKafkaRLTaskProgress; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.gson.Gson; @@ -38,6 +42,8 @@ */ // {"partitionIdToOffset": {}} public class KafkaProgress extends RoutineLoadProgress { + private static final Logger LOG = LogManager.getLogger(KafkaProgress.class); + public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2 public static final String OFFSET_END = "OFFSET_END"; // -1 // OFFSET_ZERO is just for show info, if user specified offset is 0 @@ -47,7 +53,7 @@ public class KafkaProgress extends RoutineLoadProgress { public static final long OFFSET_END_VAL = -1; // (partition id, begin offset) - // the offset the next msg to be consumed + // the offset saved here is the next offset need to be consumed private Map partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { @@ -101,12 +107,13 @@ private void getReadableProgress(Map showPartitionIdToOffset) { } else if (entry.getValue() == -2) { showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING); } else { + // The offset saved in partitionIdToOffset is the next offset to be consumed. + // So here we minus 1 to return the "already consumed" offset. showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1)); } } } - // modify the partition offset of this progress. // throw exception is the specified partition does not exist in progress. public void modifyOffset(List> kafkaPartitionOffsets) throws DdlException { @@ -138,11 +145,13 @@ public String toJsonString() { } @Override - public void update(RoutineLoadProgress progress) { - KafkaProgress newProgress = (KafkaProgress) progress; + public void update(RLTaskTxnCommitAttachment attachment) { + KafkaProgress newProgress = (KafkaProgress) attachment.getProgress(); // + 1 to point to the next msg offset to be consumed newProgress.partitionIdToOffset.entrySet().stream() .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); + LOG.debug("update kafka progress: {}, task: {}, job: {}", + newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index a9e6b786ecdc13..a72944aee33e80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -42,17 +42,19 @@ import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -145,7 +147,6 @@ private void convertCustomProperties(boolean rebuild) throws DdlException { } } - @Override public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException { List result = new ArrayList<>(); @@ -198,46 +199,45 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return currentTaskConcurrentNum; } - // case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found), - // after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0. - // In this case, FE should not update the progress. - // - // case2: partitionIdToOffset must be not empty when loaded rows > 0 - // be commit txn but fe throw error when committing txn, - // fe rollback txn without partitionIdToOffset by itself - // this task should not be commit - // otherwise currentErrorNum and currentTotalNum is updated when progress is not updated + // Through the transaction status and attachment information, to determine whether the progress needs to be updated. @Override protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionStatus txnStatus) { - if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) { - // case 1 - return false; + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason) { + if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) { + // For committed txn, update the progress. + return true; } - if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 - && (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) { - // case 2 - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) - .add("job_id", id) - .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) - .add("progress_partition_offset_size", 0) - .add("msg", "commit attachment info is incorrect")); - return false; + if (txnStatusChangeReason != null && txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) { + // Because the max_filter_ratio of routine load task is always 1. + // Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows". + // If no data is imported, the error "all partitions have no load data" may only be returned. + // In this case, the status of the transaction is ABORTED, + // but we still need to update the offset to skip these error lines. + Preconditions.checkState(txnState.getTransactionStatus() == TransactionStatus.ABORTED, txnState.getTransactionStatus()); + return true; } - return true; + + // Running here, the status of the transaction should be ABORTED, + // and it is caused by other errors. In this case, we should not update the offset. + LOG.debug("no need to update the progress of kafka routine load. txn status: {}, " + + "txnStatusChangeReason: {}, task: {}, job: {}", + txnState.getTransactionStatus(), txnStatusChangeReason, + DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id); + return false; } @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment.getProgress()); + this.progress.update(attachment); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment.getProgress()); + this.progress.update(attachment); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 70d67e70aa3d36..73417ae63efb54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -67,6 +67,10 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac } } + public long getJobId() { + return jobId; + } + public TUniqueId getTaskId() { return taskId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e82d234833b4c9..f1d858b76693d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -69,6 +69,10 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -77,8 +81,6 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -878,8 +880,9 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); - executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); ++committedTaskNum; + LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id); } } catch (Throwable e) { LOG.warn("after committed failed", e); @@ -989,8 +992,9 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String .build()); } ++abortedTaskNum; + TransactionState.TxnStatusChangeReason txnStatusChangeReason = null; if (txnStatusChangeReasonString != null) { - TransactionState.TxnStatusChangeReason txnStatusChangeReason = + txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { @@ -1009,7 +1013,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String // TODO(ml): use previous be id depend on change reason } // step2: commit task , update progress, maybe create a new task - executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED, txnStatusChangeReason); } } catch (Exception e) { String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(); @@ -1037,7 +1041,7 @@ public void replayOnAborted(TransactionState txnState) { // check task exists or not before call method private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState, - TransactionStatus txnStatus) throws UserException { + TransactionStatus txnStatus, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws UserException { // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); if (rlTaskTxnCommitAttachment == null) { @@ -1049,7 +1053,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn + " maybe task was aborted by master when timeout") .build()); } - } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) { + } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -1256,7 +1260,8 @@ public void setOrigStmt(OriginStatement origStmt) { // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, - TransactionStatus txnStatus); + TransactionState txnState, + TransactionState.TxnStatusChangeReason txnStatusChangeReason); protected abstract String getStatistic(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index c8948d47675d90..bf746a621f6db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -37,7 +37,7 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) { this.loadDataSourceType = loadDataSourceType; } - abstract void update(RoutineLoadProgress progress); + abstract void update(RLTaskTxnCommitAttachment attachment); abstract String toJsonString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d66476f5c4e759..e353c7978fdaa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -36,13 +36,13 @@ import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Queues; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -130,6 +130,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling needScheduleTasksQueue.put(routineLoadTaskInfo); + return; } } catch (UserException e) { routineLoadManager.getJob(routineLoadTaskInfo.getJobId()). @@ -152,6 +153,7 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc // set BE id to -1 to release the BE slot routineLoadTaskInfo.setBeId(-1); needScheduleTasksQueue.put(routineLoadTaskInfo); + return; } } catch (Exception e) { // exception happens, PAUSE the job @@ -196,6 +198,10 @@ private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exc submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask); LOG.debug("send routine load task cost(ms): {}, job id: {}", (System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId()); + if (tRoutineLoadTask.isSetKafkaLoadInfo()) { + LOG.debug("send kafka routine load task {} with partition offset: {}, job: {}", + tRoutineLoadTask.label, tRoutineLoadTask.kafka_load_info.partition_begin_offset, tRoutineLoadTask.getJobId()); + } } catch (LoadException e) { // submit task failed (such as TOO_MANY_TASKS error), but txn has already begun. // Here we will still set the ExecuteStartTime of this task, which means diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ed404d559aa329..85ed3a5ce09421 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -54,6 +54,10 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -61,10 +65,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataOutput; import java.io.IOException; import java.util.ArrayDeque; @@ -291,7 +291,8 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req checkRunningTxnExceedLimit(sourceType); long tid = idGenerator.getNextTransactionId(); - LOG.info("begin transaction: txn id {} with label {} from coordinator {}", tid, label, coordinator); + LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listner id: {}", + tid, label, coordinator, listenerId); TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 6d161f5703531a..0559195c692214 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -100,7 +100,8 @@ public enum TxnStatusChangeReason { DB_DROPPED, TIMEOUT, OFFSET_OUT_OF_RANGE, - PAUSE; + PAUSE, + NO_PARTITIONS; public static TxnStatusChangeReason fromString(String reasonString) { for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) { @@ -116,6 +117,8 @@ public String toString() { switch (this) { case OFFSET_OUT_OF_RANGE: return "Offset out of range"; + case NO_PARTITIONS: + return "all partitions have no load data"; default: return this.name(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 3ea320bedaf78a..9a295422d7b6ff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; - import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.catalog.Catalog; @@ -28,16 +27,18 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.KafkaUtil; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TKafkaRLTaskProgress; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - import org.apache.kafka.common.PartitionInfo; import org.junit.Assert; import org.junit.Test; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.util.List; import java.util.Map; @@ -96,12 +97,19 @@ void writeUnlock() { @Test public void testAfterAborted(@Injectable TransactionState transactionState, - @Injectable KafkaTaskInfo routineLoadTaskInfo, - @Injectable KafkaProgress progress) throws UserException { + @Injectable KafkaTaskInfo routineLoadTaskInfo) throws UserException { List routineLoadTaskInfoList = Lists.newArrayList(); routineLoadTaskInfoList.add(routineLoadTaskInfo); long txnId = 1L; + RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment(); + TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); + tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap(); + KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress); + Deencapsulation.setField(attachment, "progress", kafkaProgress); + + KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress); + new Expectations() { { transactionState.getTransactionId(); @@ -112,7 +120,7 @@ public void testAfterAborted(@Injectable TransactionState transactionState, result = txnId; transactionState.getTxnCommitAttachment(); minTimes = 0; - result = new RLTaskTxnCommitAttachment(); + result = attachment; routineLoadTaskInfo.getPartitions(); minTimes = 0; result = Lists.newArrayList(); @@ -129,7 +137,7 @@ void writeUnlock() { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); - Deencapsulation.setField(routineLoadJob, "progress", progress); + Deencapsulation.setField(routineLoadJob, "progress", currentProgress); routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 65864e5ea1b1c6..ac736dc293ae19 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -17,10 +17,6 @@ package org.apache.doris.transaction; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.FakeCatalog; @@ -49,31 +45,31 @@ import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import mockit.Injectable; -import mockit.Mocked; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import mockit.Injectable; +import mockit.Mocked; + public class GlobalTransactionMgrTest { private static FakeEditLog fakeEditLog; @@ -345,7 +341,7 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map kafkaProgress = Maps.newHashMap(); - kafkaProgress.put(1, 10L); + kafkaProgress.put(1, 100L); // start from 0, so rows number is 101, and consumed offset is 100 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); @@ -358,7 +354,7 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") Assert.assertEquals(Long.valueOf(101), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(Long.valueOf(11L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); + Assert.assertEquals(Long.valueOf(101L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(1, routineLoadManager.getNeedScheduleTasksQueue().size()); // Assert.assertNotEquals("label", routineLoadManager.getNeedScheduleTasksQueue().peek().getId()); @@ -411,7 +407,7 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map kafkaProgress = Maps.newHashMap(); - kafkaProgress.put(1, 10L); + kafkaProgress.put(1, 110L); // start from 0, so rows number is 111, consumed offset is 110 tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); @@ -422,9 +418,10 @@ LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1") Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); masterTransMgr.commitTransaction(1L, 1L, transTablets, txnCommitAttachment); + // current total rows and error rows will be reset after job pause, so here they should be 0. Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentTotalRows")); Assert.assertEquals(Long.valueOf(0), Deencapsulation.getField(routineLoadJob, "currentErrorRows")); - Assert.assertEquals(Long.valueOf(11L), + Assert.assertEquals(Long.valueOf(111L), ((KafkaProgress) routineLoadJob.getProgress()).getOffsetByPartition(1)); // todo(ml): change to assert queue // Assert.assertEquals(0, routineLoadManager.getNeedScheduleTasksQueue().size());