Skip to content

Commit

Permalink
[fix](nereids)fix bug that query infomation_schema.rowsets fe send fr…
Browse files Browse the repository at this point in the history
…agment to one of muilti be. (apache#27025) (apache#27090)

Fixed the bug of incomplete query results when querying information_schema.rowsets in the case of multiple BEs.

The reason is that the schema scanner sends the scan fragment to one of multiple bes, and be queries the information of fe through rpc. Since the rowsets information requires information about all BEs, the scan fragment needs to be sent to all BEs.
  • Loading branch information
hubgeter authored and gnehil committed Dec 4, 2023
1 parent 1877ed9 commit 2446768
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.doris.planner.AggregationNode;
import org.apache.doris.planner.AnalyticEvalNode;
import org.apache.doris.planner.AssertNumRowsNode;
import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.EmptySetNode;
Expand Down Expand Up @@ -711,13 +712,24 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
Table table = schemaScan.getTable();
List<Slot> slots = ImmutableList.copyOf(schemaScan.getOutput());
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
SchemaScanNode scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);

// For the information_schema.rowsets table, the scan fragment needs to be sent to all BEs.
// For other information_schema tables, the scan fragment only needs to be sent to one of the BEs.
SchemaScanNode scanNode = null;
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
table.getName())) {
scanNode = new BackendPartitionedSchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);
} else {
scanNode = new SchemaScanNode(schemaScan.translatePlanNodeId(), tupleDescriptor);
}
SchemaScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getTargetOnScanNode(schemaScan.getRelationId())
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
.forEach(expr -> runtimeFilterGenerator
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
scanNode.finalizeForNereids();
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public void finalize(Analyzer analyzer) throws UserException {
createScanRangeLocations();
}

@Override
public void finalizeForNereids() throws UserException {
computeColumnsFilter();
computePartitionInfo();
createScanRangeLocations();
}

@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return scanRangeLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void finalize(Analyzer analyzer) throws UserException {
}

@Override
public void finalizeForNereids() {
public void finalizeForNereids() throws UserException {
// Convert predicates to MySQL columns and filters.
frontendIP = FrontendOptions.getLocalHostAddress();
frontendPort = Config.rpc_port;
Expand Down

0 comments on commit 2446768

Please sign in to comment.