Skip to content

Commit

Permalink
one way to fix
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Jan 4, 2023
1 parent d50bb7c commit 5dd10ad
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 19 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4325,7 +4325,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetKeepOrder(e.keepOrder == plannercore.KeepOrderBetweenRequest).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
Expand Down
10 changes: 5 additions & 5 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type TableReaderExecutor struct {
memTracker *memory.Tracker
selectResultHook // for testing

keepOrder bool
keepOrder plannercore.KeepOrderTypeForTableReader
desc bool
paging bool
storeType kv.StoreType
Expand Down Expand Up @@ -184,7 +184,7 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
}
}
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle)
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges, e.keepOrder != plannercore.NoOrder, e.desc, e.table.Meta() != nil && e.table.Meta().IsCommonHandle)

// Treat temporary table as dummy table, avoid sending distsql request to TiKV.
// Calculate the kv ranges here, UnionScan rely on this kv ranges.
Expand Down Expand Up @@ -343,7 +343,7 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetKeepOrder(e.keepOrder == plannercore.KeepOrderBetweenRequest).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetFromSessionVars(e.ctx.GetSessionVars()).
Expand Down Expand Up @@ -384,7 +384,7 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetKeepOrder(e.keepOrder == plannercore.KeepOrderBetweenRequest).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetFromSessionVars(e.ctx.GetSessionVars()).
Expand Down Expand Up @@ -432,7 +432,7 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetKeepOrder(e.keepOrder == plannercore.KeepOrderBetweenRequest).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (*ImplTableScan) OnImplement(expr *memo.GroupExpr, reqProp *property.Physic
logicalScan := expr.ExprNode.(*plannercore.LogicalTableScan)
ts := logicalScan.GetPhysicalScan(logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsSortItemEmpty() {
ts.KeepOrder = true
ts.KeepOrder = plannercore.KeepOrderBetweenRequest
ts.Desc = reqProp.SortItems[0].Desc
}
tblCols, tblColHists := logicalScan.Source.TblCols, logicalScan.Source.TblColHists
Expand Down
8 changes: 6 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,10 @@ func (p *LogicalJoin) constructInnerTableScanTask(
if keepOrder && ds.tableInfo.GetPartitionInfo() != nil {
return nil
}
var keepOrderStatus = NoOrder
if keepOrder {
keepOrderStatus = KeepOrderBetweenRequest
}
ts := PhysicalTableScan{
Table: ds.tableInfo,
Columns: ds.Columns,
Expand All @@ -991,7 +995,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
filterCondition: ds.pushedDownConds,
Ranges: ranges,
rangeInfo: rangeInfo,
KeepOrder: keepOrder,
KeepOrder: keepOrderStatus,
Desc: desc,
physicalTableID: ds.physicalTableID,
isPartition: ds.isPartition,
Expand Down Expand Up @@ -1025,7 +1029,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
tablePlan: ts,
indexPlanFinished: true,
tblColHists: ds.TblColHists,
keepOrder: ts.KeepOrder,
keepOrder: ts.KeepOrder == KeepOrderBetweenRequest,
}
copTask.partitionInfo = PartitionInfo{
PruningConds: ds.allConds,
Expand Down
2 changes: 1 addition & 1 deletion planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string {
}
}
buffer.WriteString("keep order:")
buffer.WriteString(strconv.FormatBool(p.KeepOrder))
buffer.WriteString(strconv.FormatBool(p.KeepOrder == KeepOrderBetweenRequest))
if p.Desc {
buffer.WriteString(", desc")
}
Expand Down
22 changes: 19 additions & 3 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ import (
"go.uber.org/zap"
)

// We need three status for the KEEP ORDER of the table reader.
// It's very strange problem.
// At the very beginning time, TiKV stores the unsigned pk at a wrong order: [MaxInt64+1, MaxUint64] is the beginning, then following [0, MaxInt64].
// So if we just directly KEEP ORDER, we will get data of [MaxInt64+1, MaxUint64] first, then [0, MaxInt64].
// We need to specially handle this case.
// And then we have the func pushTopNDownToDynamicPartition. It pushes part of the order property down to the partition table.
// It does not need KEEP ORDER between the request. But only need to nodify that each request itself should read the data in correct order.
// So we introduce the third type KeepOrderInRequest.
type KeepOrderTypeForTableReader uint

const (
NoOrder KeepOrderTypeForTableReader = iota
KeepOrderInRequest
KeepOrderBetweenRequest
)

const (
// SelectionFactor is the default factor of the selectivity.
// For example, If we have no idea how to estimate the selectivity
Expand Down Expand Up @@ -1976,7 +1992,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
return invalidTask, nil
}
ts, _ := ds.getOriginalPhysicalTableScan(prop, candidate.path, candidate.isMatchProp)
if ts.KeepOrder && ts.StoreType == kv.TiFlash && (ts.Desc || ds.SCtx().GetSessionVars().TiFlashFastScan) {
if ts.KeepOrder != NoOrder && ts.StoreType == kv.TiFlash && (ts.Desc || ds.SCtx().GetSessionVars().TiFlashFastScan) {
// TiFlash fast mode(https://github.com/pingcap/tidb/pull/35851) does not keep order in TableScan
return invalidTask, nil
}
Expand All @@ -1998,7 +2014,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash
canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed()
if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash {
if ts.KeepOrder {
if ts.KeepOrder != NoOrder {
return invalidTask, nil
}
if prop.MPPPartitionTp != property.AnyType || ts.isPartition {
Expand Down Expand Up @@ -2352,7 +2368,7 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper
ts.stats = ds.tableStats.ScaleByExpectCnt(rowCount)
if isMatchProp {
ts.Desc = prop.SortItems[0].Desc
ts.KeepOrder = true
ts.KeepOrder = KeepOrderBetweenRequest
}
return ts, rowCount
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (p *PhysicalTableReader) adjustReadReqType(ctx sessionctx.Context) {
// and all table scans contained are not keepOrder, try to use batch cop.
if len(tableScans) > 0 {
for _, tableScan := range tableScans {
if tableScan.KeepOrder {
if !(tableScan.KeepOrder == NoOrder) {
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ type PhysicalTableScan struct {
// works on the whole partition table, and `isPartition` is not used.
isPartition bool
// KeepOrder is true, if sort data by scanning pkcol,
KeepOrder bool
KeepOrder KeepOrderTypeForTableReader
Desc bool

isChildOfIndexLookUp bool
Expand Down
2 changes: 1 addition & 1 deletion planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType)
}
tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns)
tsExec.Desc = p.Desc
keepOrder := p.KeepOrder
keepOrder := !(p.KeepOrder == NoOrder)
tsExec.KeepOrder = &keepOrder
tsExec.IsFastScan = &(ctx.GetSessionVars().TiFlashFastScan)

Expand Down
4 changes: 1 addition & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,6 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo
if !propMatched {
return nil, false
}
// SplitRangesAcrossInt64Boundary needs the KeepOrder flag. See that func for more details.
idxScan.KeepOrder = true
idxScan.Desc = isDesc
childProfile := copTsk.plan().statsInfo()
newCount := p.Offset + p.Count
Expand Down Expand Up @@ -1105,7 +1103,7 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo
}
tblScan.Desc = isDesc
// SplitRangesAcrossInt64Boundary needs the KeepOrder flag. See that func and the struct tableResultHandler for more details.
tblScan.KeepOrder = true
tblScan.KeepOrder = KeepOrderInRequest
childProfile := copTsk.plan().statsInfo()
newCount := p.Offset + p.Count
stats := deriveLimitStats(childProfile, float64(newCount))
Expand Down

0 comments on commit 5dd10ad

Please sign in to comment.