Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRuntimeFilterInfo;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TTopnFilterDesc;
Expand Down Expand Up @@ -116,17 +117,16 @@ public static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToThr
workerProcessInstanceNum, coordinatorContext);

TPipelineInstanceParams instanceParam = instanceToThrift(
currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder,
topNFilterThriftSupplier, currentInstanceIndex++
);
currentFragmentParam, instanceJob, currentInstanceIndex++);
currentFragmentParam.getLocalParams().add(instanceParam);
}

// arrange fragments by the same worker,
// so we can merge and send multiple fragment to a backend use one rpc
for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : workerToCurrentFragment.entrySet()) {
TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent(
kv.getKey(), w -> new TPipelineFragmentParamsList());
kv.getKey(), w -> beToThrift(runtimeFiltersThriftBuilder,
topNFilterThriftSupplier));
fragments.addToParamsList(kv.getValue());
}
}
Expand Down Expand Up @@ -297,6 +297,22 @@ private static TPlanFragmentDestination instanceToDestination(AssignedJob instan
return destination;
}

private static TPipelineFragmentParamsList beToThrift(
RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
TPipelineFragmentParamsList beParam = new TPipelineFragmentParamsList();
TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());

// set for runtime filter
TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
beParam.setRuntimeFilterInfo(runtimeFilterInfo);
return beParam;
}

private static TPipelineFragmentParams fragmentToThriftIfAbsent(
PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
Map<DistributedPlanWorker, TPipelineFragmentParams> workerToFragmentParams,
Expand Down Expand Up @@ -415,26 +431,13 @@ private static Map<Integer, TFileScanRangeParams> computeFileScanRangeParams(
}

private static TPipelineInstanceParams instanceToThrift(
TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int currentInstanceNum) {
TPipelineFragmentParams currentFragmentParam, AssignedJob instance, int currentInstanceNum) {
TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
instanceParam.setFragmentInstanceId(instance.instanceId());
setScanSourceParam(currentFragmentParam, instance, instanceParam);

instanceParam.setSenderId(instance.indexInUnassignedJob());
instanceParam.setBackendNum(currentInstanceNum);
instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());

instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());

// set for runtime filter
TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
instanceParam.setRuntimeFilterParams(runtimeFilterParams);
if (runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
}
boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
if (isLocalShuffle) {
// a fragment in a backend only enable local shuffle once for the first local shuffle instance,
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,14 @@ struct TQueryOptions {
162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
163: optional bool inverted_index_compatible_read = false
164: optional bool check_orc_init_sargs_success = false
165: optional i32 exchange_multi_blocks_byte_size = 262144
165: optional i32 exchange_multi_blocks_byte_size = 262144
// true to use strict cast mode.
166: optional bool enable_strict_cast = false
167: optional bool new_version_unix_timestamp = false

168: optional i32 hnsw_ef_search = 32;
169: optional bool hnsw_check_relative_distance = true;
170: optional bool hnsw_bounded_queue = true;
170: optional bool hnsw_bounded_queue = true;

171: optional bool optimize_index_scan_parallelism = false;

Expand Down
Loading