Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionctx: change variable tidb_enable_tiflash_pipeline_model to global level #46709

Merged
merged 6 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func SetTiFlashConfVarsInContext(ctx context.Context, sctx sessionctx.Context) c
if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort, 10))
}
if sctx.GetSessionVars().TiFlashEnablePipelineMode {
if variable.TiFlashEnablePipelineMode.Load() {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBEnableTiFlashPipelineMode, "1")
} else {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBEnableTiFlashPipelineMode, "0")
Expand Down
6 changes: 0 additions & 6 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,11 +892,6 @@ type SessionVars struct {
// TiFlashQuerySpillRatio is the percentage threshold to trigger auto spill in TiFlash if TiFlashMaxQueryMemoryPerNode is set
TiFlashQuerySpillRatio float64

// TiFlashEnablePipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiFlashEnablePipelineMode bool

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool

Expand Down Expand Up @@ -2053,7 +2048,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
vars.TiFlashMaxBytesBeforeExternalSort = DefTiFlashMaxBytesBeforeExternalSort
vars.TiFlashMaxQueryMemoryPerNode = DefTiFlashMemQuotaQueryPerNode
vars.TiFlashQuerySpillRatio = DefTiFlashQuerySpillRatio
vars.TiFlashEnablePipelineMode = DefTiDBEnableTiFlashPipelineMode
vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL
vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1)
vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery)
Expand Down
6 changes: 4 additions & 2 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ var defaultSysVars = []*SysVar{
s.TiFlashQuerySpillRatio = tidbOptFloat64(val, DefTiFlashQuerySpillRatio)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashPipelineMode, Type: TypeBool, Value: BoolToOnOff(DefTiDBEnableTiFlashPipelineMode), SetSession: func(s *SessionVars, val string) error {
s.TiFlashEnablePipelineMode = TiDBOptOn(val)
{Scope: ScopeGlobal, Name: TiDBEnableTiFlashPipelineMode, Type: TypeBool, Value: BoolToOnOff(DefTiDBEnableTiFlashPipelineMode), SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
TiFlashEnablePipelineMode.Store(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(TiFlashEnablePipelineMode.Load()), nil
}},
{Scope: ScopeSession, Name: TiDBSnapshot, Value: "", skipInit: true, SetSession: func(s *SessionVars, val string) error {
err := setSnapshotTS(s, val)
Expand Down
22 changes: 22 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,28 @@ func TestTiDBTiFlashReplicaRead(t *testing.T) {
require.Equal(t, DefTiFlashReplicaRead, val)
}

func TestSetEnableTiFlashPipeline(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
enablePipeline := GetSysVar(TiDBEnableTiFlashPipelineMode)
// Check default value
require.Equal(t, "ON", enablePipeline.Value)

err := mock.SetGlobalSysVar(context.Background(), TiDBEnableTiFlashPipelineMode, "OFF")
require.NoError(t, err)
val, err := mock.GetGlobalSysVar(TiDBEnableTiFlashPipelineMode)
require.NoError(t, err)
require.Equal(t, "OFF", val)

err = mock.SetGlobalSysVar(context.Background(), TiDBEnableTiFlashPipelineMode, "ON")
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBEnableTiFlashPipelineMode)
require.NoError(t, err)
require.Equal(t, "ON", val)
}

func TestSetTiDBCloudStorageURI(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ const (
// TiFlashQuerySpillRatio is the threshold that TiFlash will trigger auto spill when the memory usage is above this percentage
TiFlashQuerySpillRatio = "tiflash_query_spill_ratio"

// TiDBEnableTiFlashPipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiDBEnableTiFlashPipelineMode = "tidb_enable_tiflash_pipeline_model"

// TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to
// TiFlash even though the failed TiFlash node has been recovered.
TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl"
Expand Down Expand Up @@ -1101,6 +1096,10 @@ const (
TiDBServiceScope = "tidb_service_scope"
// TiDBSchemaVersionCacheLimit defines the capacity size of domain infoSchema cache.
TiDBSchemaVersionCacheLimit = "tidb_schema_version_cache_limit"
// TiDBEnableTiFlashPipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiDBEnableTiFlashPipelineMode = "tiflash_enable_pipeline_model"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1504,6 +1503,7 @@ var (
EnableResourceControl = atomic.NewBool(false)
EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint)
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
TiFlashEnablePipelineMode = atomic.NewBool(DefTiDBEnableTiFlashPipelineMode)
ServiceScope = atomic.NewString("")
SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit)
CloudStorageURI = atomic.NewString("")
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,13 @@ func TestSkipSysvarCache(t *testing.T) {
require.False(t, GetSysVar(TiDBEnableAsyncCommit).SkipSysvarCache())
}

func TestTiFlashEnablePipeline(t *testing.T) {
require.True(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasGlobalScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasSessionScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasInstanceScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasNoneScope())
}

func TestTimeValidationWithTimezone(t *testing.T) {
sv := SysVar{Scope: ScopeSession, Name: "mynewsysvar", Value: "23:59 +0000", Type: TypeTime}
vars := NewSessionVars(nil)
Expand Down