Skip to content

Commit

Permalink
[improvement](iceberg)Parallelize splits for count(*) for 2.1 (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi authored Oct 16, 2024
1 parent e62e477 commit e6545a3
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 22 deletions.
10 changes: 7 additions & 3 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx, int64_t push_down_count)
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_state(state),
_params(params),
_range(range),
_kv_cache(kv_cache),
_io_ctx(io_ctx),
_remaining_push_down_count(push_down_count) {
_io_ctx(io_ctx) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
Expand All @@ -95,6 +94,11 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_push_down_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Expand Down
13 changes: 6 additions & 7 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class IcebergTableReader : public TableFormatReader {

IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
int64_t push_down_count);
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx);
~IcebergTableReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;
Expand Down Expand Up @@ -197,9 +197,9 @@ class IcebergParquetReader final : public IcebergTableReader {
IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx, int64_t push_down_count)
io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, push_down_count) {}
kv_cache, io_ctx) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
Expand Down Expand Up @@ -237,10 +237,9 @@ class IcebergOrcReader final : public IcebergTableReader {

IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
int64_t push_down_count)
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, push_down_count) {}
kv_cache, io_ctx) {}

void set_delete_rows() override {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
_io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
Expand Down Expand Up @@ -914,9 +914,9 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state,
*_params, range, _kv_cache, _io_ctx.get());

init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class IcebergScanNode extends FileQueryScanNode {
private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();
private boolean pushDownCount = false;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;

/**
* External file scan node for Query iceberg table
Expand Down Expand Up @@ -137,6 +140,9 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (pushDownCount) {
fileDesc.setRowCount(icebergSplit.getRowCount());
}
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
Expand Down Expand Up @@ -255,9 +261,24 @@ private List<Split> doGetSplits() throws UserException {
}

TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) {
if (aggOp.equals(TPushAggOp.COUNT)) {
// we can create a special empty split and skip the plan process
return splits.isEmpty() ? splits : Collections.singletonList(splits.get(0));
if (splits.isEmpty()) {
return splits;
}
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
pushDownCount = true;
List<Split> pushDownCountSplits;
if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum));
} else {
pushDownCountSplits = Collections.singletonList(splits.get(0));
}
assignCountToSplits(pushDownCountSplits, countFromSnapshot);
return pushDownCountSplits;
}
}

selectedPartitionNum = partitionPathSet.size();
Expand Down Expand Up @@ -374,12 +395,6 @@ private long getCountFromSnapshot() {
@Override
protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}

@Override
Expand All @@ -399,4 +414,13 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}

private void assignCountToSplits(List<Split> splits, long totalCount) {
int size = splits.size();
long countPerSplit = totalCount / size;
for (int i = 0; i < size - 1; i++) {
((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
}
((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IcebergSplit extends FileSplit {
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
private long rowCount = -1;

// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts,
Expand All @@ -47,4 +48,12 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength,
this.config = config;
this.originalPath = originalPath;
}

public long getRowCount() {
return rowCount;
}

public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ struct TIcebergFileDesc {
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
6: optional string original_file_path;
7: optional i64 row_count;
}

struct TPaimonDeletionFileDesc {
Expand Down

0 comments on commit e6545a3

Please sign in to comment.