Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ CONF_Int32(brpc_port, "8060");
// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1")

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
// If no ip match this rule, will choose one randomly.
CONF_String(priority_networks, "");
// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24
// If no ip match this rule, will choose one randomly.
CONF_String(priority_networks, "");

////
//// tcmalloc gc parameter
Expand Down Expand Up @@ -259,7 +259,7 @@ CONF_Int64(index_stream_cache_capacity, "10737418240");
// CONF_Int64(max_packed_row_block_size, "20971520");

// Cache for storage page size
CONF_String(storage_page_cache_limit, "20G");
CONF_String(storage_page_cache_limit, "20%");
// whether to disable page cache feature in storage
CONF_Bool(disable_storage_page_cache, "false");

Expand Down Expand Up @@ -369,7 +369,7 @@ CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// or encounter 'tablet writer write failed' error when loading.
// CONF_Int32(tablet_writer_rpc_timeout_sec, "600");
// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
CONF_mInt32(olap_table_sink_send_interval_ms, "10");
CONF_mInt32(olap_table_sink_send_interval_ms, "1");

// Fragment thread pool
CONF_Int32(fragment_pool_thread_num_min, "64");
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
{
SCOPED_TIMER(_parent->_scan_timer);
SCOPED_TIMER(_parent->_scan_cpu_timer);
SCOPED_CPU_TIMER(_parent->_scan_cpu_timer);
while (true) {
// Batch is full, break
if (batch->is_full()) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/alpha_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ OLAPStatus AlphaRowset::init() {
// table value column, so when first start the two number is not the same,
// it causes start failed. When `expect_zone_maps_num > zone_maps_size` it may be the first start after upgrade
if (expect_zone_maps_num > zone_maps_size) {
LOG(WARNING)
<< "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is "
VLOG(1) << "tablet: " << _rowset_meta->tablet_id() << " expect zone map size is "
<< expect_zone_maps_num << ", actual num is " << zone_maps_size
<< ". If this is not the first start after upgrade, please pay attention!";
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class KafkaLoadInfo {
topic(t_info.topic),
begin_offset(t_info.partition_begin_offset),
properties(t_info.properties) {
// The offset(begin_offset) sent from FE is the starting offset,
// and the offset(cmt_offset) reported by BE to FE is the consumed offset,
// so we need to minus 1 here.
for (auto& p : t_info.partition_begin_offset) {
cmt_offset[p.first] = p.second - 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.doris.thrift.TTabletStat;
import org.apache.doris.thrift.TTabletStatResult;

import com.google.common.collect.ImmutableMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -61,7 +61,7 @@ protected void runAfterCatalogReady() {
client = ClientPool.backendPool.borrowObject(address);
TTabletStatResult result = client.getTabletStat();

LOG.info("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize());
LOG.debug("get tablet stat from backend: {}, num: {}", backend.getId(), result.getTabletsStatsSize());
updateTabletStat(backend.getId(), result);

ok = true;
Expand Down Expand Up @@ -112,7 +112,7 @@ protected void runAfterCatalogReady() {
index.setRowCount(indexRowCount);
} // end for indices
} // end for partitions
LOG.info("finished to set row num for table: {} in database: {}",
LOG.debug("finished to set row num for table: {} in database: {}",
table.getName(), db.getFullName());
}
} finally {
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class Config extends ConfigBase {
public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log";
@ConfField public static String sys_log_level = "INFO";
@ConfField public static int sys_log_roll_num = 10;
@ConfField public static String[] sys_log_verbose_modules = {"org.apache.thrift", "org.apache.doris.thrift", "org.apache.doris.http", "org.apache.doris.service.FrontendServiceImpl"};
@ConfField
public static String[] sys_log_verbose_modules = {};
@ConfField public static String sys_log_roll_interval = "DAY";
@ConfField public static String sys_log_delete_age = "7d";
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKafkaRLTaskProgress;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
Expand All @@ -38,6 +42,8 @@
*/
// {"partitionIdToOffset": {}}
public class KafkaProgress extends RoutineLoadProgress {
private static final Logger LOG = LogManager.getLogger(KafkaProgress.class);

public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2
public static final String OFFSET_END = "OFFSET_END"; // -1
// OFFSET_ZERO is just for show info, if user specified offset is 0
Expand All @@ -47,7 +53,7 @@ public class KafkaProgress extends RoutineLoadProgress {
public static final long OFFSET_END_VAL = -1;

// (partition id, begin offset)
// the offset the next msg to be consumed
// the offset saved here is the next offset need to be consumed
private Map<Integer, Long> partitionIdToOffset = Maps.newConcurrentMap();

public KafkaProgress() {
Expand Down Expand Up @@ -101,12 +107,13 @@ private void getReadableProgress(Map<Integer, String> showPartitionIdToOffset) {
} else if (entry.getValue() == -2) {
showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING);
} else {
// The offset saved in partitionIdToOffset is the next offset to be consumed.
// So here we minus 1 to return the "already consumed" offset.
showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1));
}
}
}


// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
Expand Down Expand Up @@ -138,11 +145,13 @@ public String toJsonString() {
}

@Override
public void update(RoutineLoadProgress progress) {
KafkaProgress newProgress = (KafkaProgress) progress;
public void update(RLTaskTxnCommitAttachment attachment) {
KafkaProgress newProgress = (KafkaProgress) attachment.getProgress();
// + 1 to point to the next msg offset to be consumed
newProgress.partitionIdToOffset.entrySet().stream()
.forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
LOG.debug("update kafka progress: {}, task: {}, job: {}",
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -145,7 +147,6 @@ private void convertCustomProperties(boolean rebuild) throws DdlException {
}
}


@Override
public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException {
List<RoutineLoadTaskInfo> result = new ArrayList<>();
Expand Down Expand Up @@ -198,46 +199,45 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return currentTaskConcurrentNum;
}

// case1: BE execute the task successfully and commit it to FE, but failed on FE(such as db renamed, not found),
// after commit failed, BE try to rollback this txn, and loaded rows in its attachment is larger than 0.
// In this case, FE should not update the progress.
//
// case2: partitionIdToOffset must be not empty when loaded rows > 0
// be commit txn but fe throw error when committing txn,
// fe rollback txn without partitionIdToOffset by itself
// this task should not be commit
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
// Through the transaction status and attachment information, to determine whether the progress needs to be updated.
@Override
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionStatus txnStatus) {
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) {
// case 1
return false;
TransactionState txnState,
TransactionState.TxnStatusChangeReason txnStatusChangeReason) {
if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED) {
// For committed txn, update the progress.
return true;
}

if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
&& (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) {
// case 2
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
.add("progress_partition_offset_size", 0)
.add("msg", "commit attachment info is incorrect"));
return false;
if (txnStatusChangeReason != null && txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
// Because the max_filter_ratio of routine load task is always 1.
// Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows".
// If no data is imported, the error "all partitions have no load data" may only be returned.
// In this case, the status of the transaction is ABORTED,
// but we still need to update the offset to skip these error lines.
Preconditions.checkState(txnState.getTransactionStatus() == TransactionStatus.ABORTED, txnState.getTransactionStatus());
return true;
}
return true;

// Running here, the status of the transaction should be ABORTED,
// and it is caused by other errors. In this case, we should not update the offset.
LOG.debug("no need to update the progress of kafka routine load. txn status: {}, " +
"txnStatusChangeReason: {}, task: {}, job: {}",
txnState.getTransactionStatus(), txnStatusChangeReason,
DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id);
return false;
}

@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
super.updateProgress(attachment);
this.progress.update(attachment.getProgress());
this.progress.update(attachment);
}

@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
this.progress.update(attachment.getProgress());
this.progress.update(attachment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttac
}
}

public long getJobId() {
return jobId;
}

public TUniqueId getTaskId() {
return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -77,8 +81,6 @@
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -878,8 +880,9 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst();
RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get();
taskBeId = routineLoadTaskInfo.getBeId();
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED);
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null);
++committedTaskNum;
LOG.debug("routine load task committed. task id: {}, job id: {}", txnState.getLabel(), id);
}
} catch (Throwable e) {
LOG.warn("after committed failed", e);
Expand Down Expand Up @@ -989,8 +992,9 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
.build());
}
++abortedTaskNum;
TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
if (txnStatusChangeReasonString != null) {
TransactionState.TxnStatusChangeReason txnStatusChangeReason =
txnStatusChangeReason =
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
Expand All @@ -1009,7 +1013,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
// TODO(ml): use previous be id depend on change reason
}
// step2: commit task , update progress, maybe create a new task
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED);
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED, txnStatusChangeReason);
}
} catch (Exception e) {
String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage();
Expand Down Expand Up @@ -1037,7 +1041,7 @@ public void replayOnAborted(TransactionState txnState) {

// check task exists or not before call method
private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState,
TransactionStatus txnStatus) throws UserException {
TransactionStatus txnStatus, TransactionState.TxnStatusChangeReason txnStatusChangeReason) throws UserException {
// step0: get progress from transaction state
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
if (rlTaskTxnCommitAttachment == null) {
Expand All @@ -1049,7 +1053,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn
+ " maybe task was aborted by master when timeout")
.build());
}
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState.getTransactionStatus())) {
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
}
Expand Down Expand Up @@ -1256,7 +1260,8 @@ public void setOrigStmt(OriginStatement origStmt) {

// check the correctness of commit info
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionStatus txnStatus);
TransactionState txnState,
TransactionState.TxnStatusChangeReason txnStatusChangeReason);

protected abstract String getStatistic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) {
this.loadDataSourceType = loadDataSourceType;
}

abstract void update(RoutineLoadProgress progress);
abstract void update(RLTaskTxnCommitAttachment attachment);

abstract String toJsonString();

Expand Down
Loading