Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a config to enable write statements to read on TiFlash #37516

Merged
merged 12 commits into from
Sep 2, 2022
51 changes: 51 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7130,3 +7130,54 @@ func TestCastTimeAsDurationToTiFlash(t *testing.T) {
}
tk.MustQuery("explain select cast(a as time), cast(b as time) from t;").CheckAt([]int{0, 2, 4}, rows)
}

func TestEnableTiFlashReadForWriteStmt(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1, 2)")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2(a int)")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@tidb_enable_tiflash_read_for_write_stmt = ON")

tbl, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t", L: "t"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

tbl2, err := dom.InfoSchema().TableByName(model.CIStr{O: "test", L: "test"}, model.CIStr{O: "t2", L: "t2"})
require.NoError(t, err)
// Set the hacked TiFlash replica for explain tests.
tbl2.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{Count: 1, Available: true}

checkMpp := func(r [][]interface{}) {
check := false
for i := range r {
if r[i][2] == "mpp[tiflash]" {
check = true
break
}
}
require.Equal(t, check, true)
}

// Insert into ... select
rs := tk.MustQuery("explain insert into t2 select a+b from t").Rows()
checkMpp(rs)

rs = tk.MustQuery("explain insert into t2 select t.a from t2 join t on t2.a = t.a").Rows()
checkMpp(rs)

// Replace into ... select
rs = tk.MustQuery("explain replace into t2 select a+b from t").Rows()
checkMpp(rs)

// CTE
rs = tk.MustQuery("explain update t set a=a+1 where b in (select a from t2 where t.a > t2.a)").Rows()
checkMpp(rs)
}
4 changes: 1 addition & 3 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
}

// Because for write stmt, TiFlash has a different results when lock the data in point get plan. We ban the TiFlash
// engine in not read only stmt.
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(node, sessVars) {
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !sessVars.EnableTiFlashReadForWriteStmt && !IsReadOnly(node, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
defer func() {
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down
180 changes: 92 additions & 88 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,9 @@ type SessionVars struct {
// ConstraintCheckInPlacePessimistic controls whether to skip the locking of some keys in pessimistic transactions.
// Postpone the conflict check and constraint check to prewrite or later pessimistic locking requests.
ConstraintCheckInPlacePessimistic bool

// EnableTiFlashReadForWriteStmt indicates whether to enable TiFlash to read for write statements.
EnableTiFlashReadForWriteStmt bool
}

// GetPreparedStmtByName returns the prepared statement specified by stmtName.
Expand Down Expand Up @@ -1432,94 +1435,95 @@ func NewSessionVars() *SessionVars {
values: make(map[string]types.Datum),
types: make(map[string]*types.FieldType),
},
systems: make(map[string]string),
stmtVars: make(map[string]string),
PreparedStmts: make(map[uint32]interface{}),
PreparedStmtNameToID: make(map[string]uint32),
PreparedParams: make([]types.Datum, 0, 10),
TxnCtx: &TransactionContext{},
RetryInfo: &RetryInfo{},
ActiveRoles: make([]*auth.RoleIdentity, 0, 10),
StrictSQLMode: true,
AutoIncrementIncrement: DefAutoIncrementIncrement,
AutoIncrementOffset: DefAutoIncrementOffset,
Status: mysql.ServerStatusAutocommit,
StmtCtx: new(stmtctx.StatementContext),
AllowAggPushDown: false,
AllowCartesianBCJ: DefOptCartesianBCJ,
MPPOuterJoinFixedBuildSide: DefOptMPPOuterJoinFixedBuildSide,
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
EnableOuterJoinReorder: DefTiDBEnableOuterJoinReorder,
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
preferRangeScan: DefOptPreferRangeScan,
EnableCorrelationAdjustment: DefOptEnableCorrelationAdjustment,
LimitPushDownThreshold: DefOptLimitPushDownThreshold,
CorrelationThreshold: DefOptCorrelationThreshold,
CorrelationExpFactor: DefOptCorrelationExpFactor,
cpuFactor: DefOptCPUFactor,
copCPUFactor: DefOptCopCPUFactor,
CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor,
networkFactor: DefOptNetworkFactor,
scanFactor: DefOptScanFactor,
descScanFactor: DefOptDescScanFactor,
seekFactor: DefOptSeekFactor,
memoryFactor: DefOptMemoryFactor,
diskFactor: DefOptDiskFactor,
concurrencyFactor: DefOptConcurrencyFactor,
EnableVectorizedExpression: DefEnableVectorizedExpression,
CommandValue: uint32(mysql.ComSleep),
TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold,
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
enableIndexMerge: DefTiDBEnableIndexMerge,
NoopFuncsMode: TiDBOptOnOffWarn(DefTiDBEnableNoopFuncs),
replicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
EvolvePlanBaselines: DefTiDBEvolvePlanBaselines,
EnableExtendedStats: false,
IsolationReadEngines: make(map[kv.StoreType]struct{}),
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
MetricSchemaStep: DefTiDBMetricSchemaStep,
MetricSchemaRangeDuration: DefTiDBMetricSchemaRangeDuration,
SequenceState: NewSequenceState(),
WindowingUseHighPrecision: true,
PrevFoundInPlanCache: DefTiDBFoundInPlanCache,
FoundInPlanCache: DefTiDBFoundInPlanCache,
PrevFoundInBinding: DefTiDBFoundInBinding,
FoundInBinding: DefTiDBFoundInBinding,
SelectLimit: math.MaxUint64,
AllowAutoRandExplicitInsert: DefTiDBAllowAutoRandExplicitInsert,
EnableClusteredIndex: DefTiDBEnableClusteredIndex,
EnableParallelApply: DefTiDBEnableParallelApply,
ShardAllocateStep: DefTiDBShardAllocateStep,
EnableAmendPessimisticTxn: DefTiDBEnableAmendPessimisticTxn,
PartitionPruneMode: *atomic2.NewString(DefTiDBPartitionPruneMode),
TxnScope: kv.NewDefaultTxnScopeVar(),
EnabledRateLimitAction: DefTiDBEnableRateLimitAction,
EnableAsyncCommit: DefTiDBEnableAsyncCommit,
Enable1PC: DefTiDBEnable1PC,
GuaranteeLinearizability: DefTiDBGuaranteeLinearizability,
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreLastFailTime: make(map[string]time.Time),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
EnableLegacyInstanceScope: DefEnableLegacyInstanceScope,
RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery,
EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg,
MaxAllowedPacket: DefMaxAllowedPacket,
TiFlashFastScan: DefTiFlashFastScan,
systems: make(map[string]string),
stmtVars: make(map[string]string),
PreparedStmts: make(map[uint32]interface{}),
PreparedStmtNameToID: make(map[string]uint32),
PreparedParams: make([]types.Datum, 0, 10),
TxnCtx: &TransactionContext{},
RetryInfo: &RetryInfo{},
ActiveRoles: make([]*auth.RoleIdentity, 0, 10),
StrictSQLMode: true,
AutoIncrementIncrement: DefAutoIncrementIncrement,
AutoIncrementOffset: DefAutoIncrementOffset,
Status: mysql.ServerStatusAutocommit,
StmtCtx: new(stmtctx.StatementContext),
AllowAggPushDown: false,
AllowCartesianBCJ: DefOptCartesianBCJ,
MPPOuterJoinFixedBuildSide: DefOptMPPOuterJoinFixedBuildSide,
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
EnableOuterJoinReorder: DefTiDBEnableOuterJoinReorder,
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
preferRangeScan: DefOptPreferRangeScan,
EnableCorrelationAdjustment: DefOptEnableCorrelationAdjustment,
LimitPushDownThreshold: DefOptLimitPushDownThreshold,
CorrelationThreshold: DefOptCorrelationThreshold,
CorrelationExpFactor: DefOptCorrelationExpFactor,
cpuFactor: DefOptCPUFactor,
copCPUFactor: DefOptCopCPUFactor,
CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor,
networkFactor: DefOptNetworkFactor,
scanFactor: DefOptScanFactor,
descScanFactor: DefOptDescScanFactor,
seekFactor: DefOptSeekFactor,
memoryFactor: DefOptMemoryFactor,
diskFactor: DefOptDiskFactor,
concurrencyFactor: DefOptConcurrencyFactor,
EnableVectorizedExpression: DefEnableVectorizedExpression,
CommandValue: uint32(mysql.ComSleep),
TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold,
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
enableIndexMerge: DefTiDBEnableIndexMerge,
NoopFuncsMode: TiDBOptOnOffWarn(DefTiDBEnableNoopFuncs),
replicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
EvolvePlanBaselines: DefTiDBEvolvePlanBaselines,
EnableExtendedStats: false,
IsolationReadEngines: make(map[kv.StoreType]struct{}),
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
MetricSchemaStep: DefTiDBMetricSchemaStep,
MetricSchemaRangeDuration: DefTiDBMetricSchemaRangeDuration,
SequenceState: NewSequenceState(),
WindowingUseHighPrecision: true,
PrevFoundInPlanCache: DefTiDBFoundInPlanCache,
FoundInPlanCache: DefTiDBFoundInPlanCache,
PrevFoundInBinding: DefTiDBFoundInBinding,
FoundInBinding: DefTiDBFoundInBinding,
SelectLimit: math.MaxUint64,
AllowAutoRandExplicitInsert: DefTiDBAllowAutoRandExplicitInsert,
EnableClusteredIndex: DefTiDBEnableClusteredIndex,
EnableParallelApply: DefTiDBEnableParallelApply,
ShardAllocateStep: DefTiDBShardAllocateStep,
EnableAmendPessimisticTxn: DefTiDBEnableAmendPessimisticTxn,
PartitionPruneMode: *atomic2.NewString(DefTiDBPartitionPruneMode),
TxnScope: kv.NewDefaultTxnScopeVar(),
EnabledRateLimitAction: DefTiDBEnableRateLimitAction,
EnableAsyncCommit: DefTiDBEnableAsyncCommit,
Enable1PC: DefTiDBEnable1PC,
GuaranteeLinearizability: DefTiDBGuaranteeLinearizability,
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreLastFailTime: make(map[string]time.Time),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
EnableLegacyInstanceScope: DefEnableLegacyInstanceScope,
RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery,
EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg,
MaxAllowedPacket: DefMaxAllowedPacket,
TiFlashFastScan: DefTiFlashFastScan,
EnableTiFlashReadForWriteStmt: DefTiDBEnableTiFlashReadForWriteStmt,
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,10 @@ var defaultSysVars = []*SysVar{
}
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashReadForWriteStmt, Value: BoolToOnOff(DefTiDBEnableTiFlashReadForWriteStmt), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableTiFlashReadForWriteStmt = TiDBOptOn(val)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
6 changes: 5 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,11 @@ const (
// RequireSecureTransport indicates the secure mode for data transport
RequireSecureTransport = "require_secure_transport"

// TiFlashFastScan indicates whether use fast scan in tiflash .
// TiFlashFastScan indicates whether use fast scan in tiflash.
TiFlashFastScan = "tiflash_fastscan"

// TiDBEnableTiFlashReadForWriteStmt indicates whether to enable TiFlash to read for write statements.
TiDBEnableTiFlashReadForWriteStmt = "tidb_enable_tiflash_read_for_write_stmt"
)

// TiDB system variable names that both in session and global scope.
Expand Down Expand Up @@ -1023,6 +1026,7 @@ const (
DefExecutorConcurrency = 5
DefTiDBEnableGeneralPlanCache = false
DefTiDBGeneralPlanCacheSize = 100
DefTiDBEnableTiFlashReadForWriteStmt = false
// MaxDDLReorgBatchSize is exported for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
Expand Down