From 1f86e50c345a8d4cd8812c52c87073626400dda5 Mon Sep 17 00:00:00 2001 From: XuHuaiyu <391585975@qq.com> Date: Mon, 9 Apr 2018 16:04:16 +0800 Subject: [PATCH 1/4] *: add a tidb system variable for hash join concurrency --- config/config.go | 2 - config/config.toml.example | 3 -- executor/aggregate_test.go | 7 ++- executor/join_test.go | 19 +++----- plan/gen_physical_plans.go | 3 +- plan/physical_plan_builder.go | 3 -- session/session.go | 1 + sessionctx/variable/session.go | 15 ++++++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 84 +++++++++++++++++--------------- tidb-server/main.go | 1 - 11 files changed, 72 insertions(+), 67 deletions(-) diff --git a/config/config.go b/config/config.go index 5ef4d0d823ac6..d2ca1310123d6 100644 --- a/config/config.go +++ b/config/config.go @@ -144,7 +144,6 @@ type Performance struct { MaxProcs uint `toml:"max-procs" json:"max-procs"` TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"` RetryLimit uint `toml:"retry-limit" json:"retry-limit"` - JoinConcurrency uint `toml:"join-concurrency" json:"join-concurrency"` CrossJoin bool `toml:"cross-join" json:"cross-join"` StatsLease string `toml:"stats-lease" json:"stats-lease"` RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` @@ -250,7 +249,6 @@ var defaultConf = Config{ Performance: Performance{ TCPKeepAlive: true, RetryLimit: 10, - JoinConcurrency: 5, CrossJoin: true, StatsLease: "3s", RunAutoAnalyze: true, diff --git a/config/config.toml.example b/config/config.toml.example index 6b082406e0c87..7505f4d163845 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -124,9 +124,6 @@ tcp-keep-alive = true # The maximum number of retries when commit a transaction. retry-limit = 10 -# The number of goroutines that participate joining. -join-concurrency = 5 - # Whether support cartesian product. cross-join = true diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 27ffe5e90b957..41bbf6cc064e9 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -22,11 +22,8 @@ import ( ) func (s *testSuite) TestAggregation(c *C) { - plan.JoinConcurrency = 1 - defer func() { - plan.JoinConcurrency = 5 - }() tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_hash_join_concurrency=1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c int, d int)") @@ -296,6 +293,8 @@ func (s *testSuite) TestAggregation(c *C) { tk.MustExec(`insert into t values (6, '{"i": 0, "n": "n6"}')`) tk.MustExec(`insert into t values (7, '{"i": -1, "n": "n7"}')`) tk.MustQuery("select sum(tags->'$.i') from t").Check(testkit.Rows("14")) + + tk.MustExec("set @@tidb_hash_join_concurrency=5") } func (s *testSuite) TestStreamAggPushDown(c *C) { diff --git a/executor/join_test.go b/executor/join_test.go index e036b81bbbd46..20e79d11c0ab3 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -237,8 +237,7 @@ func (s *testSuite) TestJoin(c *C) { // This case is for testing: // when the main thread calls Executor.Close() while the out data fetch worker and join workers are still working, // we need to stop the goroutines as soon as possible to avoid unexpected error. - savedConcurrency := plan.JoinConcurrency - plan.JoinConcurrency = 5 + tk.MustExec("set @@tidb_hash_join_concurrency=5") tk.MustExec("drop table if exists t;") tk.MustExec("create table t(a int)") for i := 0; i < 100; i++ { @@ -246,7 +245,6 @@ func (s *testSuite) TestJoin(c *C) { } result = tk.MustQuery("select /*+ TIDB_HJ(s, r) */ * from t as s join t as r on s.a = r.a limit 1;") result.Check(testkit.Rows("1 1")) - plan.JoinConcurrency = savedConcurrency } func (s *testSuite) TestJoinCast(c *C) { @@ -487,11 +485,8 @@ func (s *testSuite) TestSubquerySameTable(c *C) { } func (s *testSuite) TestSubquery(c *C) { - plan.JoinConcurrency = 1 - defer func() { - plan.JoinConcurrency = 5 - }() tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_hash_join_concurrency=1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c int, d int)") @@ -652,6 +647,8 @@ func (s *testSuite) TestSubquery(c *C) { tk.MustExec("insert into t1 values(1)") tk.MustExec("insert into t2 values(1)") tk.MustQuery("select * from t1 where a in (select a from t2)").Check(testkit.Rows("1")) + + tk.MustExec("set @@tidb_hash_join_concurrency=5") } func (s *testSuite) TestInSubquery(c *C) { @@ -725,12 +722,8 @@ func (s *testSuite) TestInSubquery(c *C) { } func (s *testSuite) TestJoinLeak(c *C) { - savedConcurrency := plan.JoinConcurrency - plan.JoinConcurrency = 1 - defer func() { - plan.JoinConcurrency = savedConcurrency - }() tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_hash_join_concurrency=1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (d int)") @@ -746,6 +739,8 @@ func (s *testSuite) TestJoinLeak(c *C) { c.Assert(err, IsNil) time.Sleep(100 * time.Millisecond) result.Close() + + tk.MustExec("set @@tidb_hash_join_concurrency=5") } func (s *testSuite) TestHashJoinExecEncodeDecodeRow(c *C) { diff --git a/plan/gen_physical_plans.go b/plan/gen_physical_plans.go index 5befde20905ef..aa0cf67ad69df 100644 --- a/plan/gen_physical_plans.go +++ b/plan/gen_physical_plans.go @@ -170,11 +170,10 @@ func (p *LogicalJoin) getHashJoin(prop *requiredProp, innerIdx int) *PhysicalHas RightConditions: p.RightConditions, OtherConditions: p.OtherConditions, JoinType: p.JoinType, - Concurrency: JoinConcurrency, + Concurrency: p.ctx.GetSessionVars().GetHashJoinConcurrency(), DefaultValues: p.DefaultValues, InnerChildIdx: innerIdx, }.init(p.ctx, p.stats.scaleByExpectCnt(prop.expectedCnt), chReqProps...) - hashJoin.SetSchema(p.schema) return hashJoin } diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 842401a429c84..e14986be58512 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -39,9 +39,6 @@ const ( cpuFactor = 0.9 ) -// JoinConcurrency means the number of goroutines that participate in joining. -var JoinConcurrency uint = 5 - // wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get // these tasks one by one. var wholeTaskTypes = [...]taskType{copSingleReadTaskType, copDoubleReadTaskType, rootTaskType} diff --git a/session/session.go b/session/session.go index c0bddf038d1a2..d6c7cc53cc630 100644 --- a/session/session.go +++ b/session/session.go @@ -1268,6 +1268,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBIndexLookupSize + quoteCommaQuote + variable.TiDBIndexLookupConcurrency + quoteCommaQuote + variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote + + variable.TiDBHashJoinConcurrency + quoteCommaQuote + variable.TiDBDistSQLScanConcurrency + "')" // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a1cad5610a74b..bd1539475b854 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -15,6 +15,7 @@ package variable import ( "crypto/tls" + "strconv" "sync" "sync/atomic" "time" @@ -268,6 +269,9 @@ type SessionVars struct { // DistSQLScanConcurrency is the number of concurrent dist SQL scan worker. DistSQLScanConcurrency int + // HashJoinConcurrency is the number of concurrent hash join outer worker. + HashJoinConcurrency int + // IndexSerialScanConcurrency is the number of concurrent index serial scan worker. IndexSerialScanConcurrency int @@ -406,6 +410,15 @@ func (s *SessionVars) GetStatusFlag(flag uint16) bool { return s.Status&flag > 0 } +// GetHashJoinConcurrency gets the value of SessionVars.HashJoinConcurrency. +func (s *SessionVars) GetHashJoinConcurrency() uint { + concurrency, err := strconv.Atoi(s.systems[TiDBHashJoinConcurrency]) + if err != nil { + return uint(DefTiDBHashJoinConcurrency) + } + return uint(concurrency) +} + // InTxn returns if the session is in transaction. func (s *SessionVars) InTxn() bool { return s.GetStatusFlag(mysql.ServerStatusInTrans) @@ -505,6 +518,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.IndexJoinBatchSize = tidbOptPositiveInt(val, DefIndexJoinBatchSize) case TiDBIndexLookupSize: s.IndexLookupSize = tidbOptPositiveInt(val, DefIndexLookupSize) + case TiDBHashJoinConcurrency: + s.HashJoinConcurrency = tidbOptPositiveInt(val, DefTiDBHashJoinConcurrency) case TiDBDistSQLScanConcurrency: s.DistSQLScanConcurrency = tidbOptPositiveInt(val, DefDistSQLScanConcurrency) case TiDBIndexSerialScanConcurrency: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 7f42f4bc07542..f9ddecdf664aa 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -632,6 +632,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TIDBMemQuotaNestedLoopApply, strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10)}, {ScopeSession, TiDBEnableStreaming, "0"}, {ScopeSession, TxnIsolationOneShot, ""}, + {ScopeGlobal | ScopeSession, TiDBHashJoinConcurrency, strconv.Itoa(DefTiDBHashJoinConcurrency)}, /* The following variable is defined as session scope but is actually server scope. */ {ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)}, {ScopeSession, TiDBConfig, ""}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3d4d6df29bd04..f886b754bec96 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -26,10 +26,8 @@ package variable 8. Use this variable to control the behavior in code. */ -// TiDB system variable names. +// TiDB session only system variable names. const ( - /* Session only */ - // tidb_snapshot is used for reading history data, the default value is empty string. // When the value is set to a datetime string like '2017-11-11 20:20:20', the session reads history data of that time. TiDBSnapshot = "tidb_snapshot" @@ -38,7 +36,7 @@ const ( // When the value is set to true, unique index constraint is not checked. TiDBImportingData = "tidb_import_data" - // tidb_opt_agg_push_down is used to endable/disable the optimizer rule of aggregation push down. + // tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down. TiDBOptAggPushDown = "tidb_opt_agg_push_down" // tidb_opt_insubquery_unfold is used to enable/disable the optimizer rule of in subquery unfold. @@ -60,42 +58,6 @@ const ( // tidb_config is a read-only variable that shows the config of the current server. TiDBConfig = "tidb_config" - /* Session and global */ - - // tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task. - // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. - // Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact. - // If the query has a LIMIT clause, high concurrency makes the system do much more work than needed. - TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency" - - // tidb_index_join_batch_size is used to set the batch size of a index lookup join. - // The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor. - // This value controls how much of data in a batch to do the index join. - // Large value may reduce the latency but consumes more system resource. - TiDBIndexJoinBatchSize = "tidb_index_join_batch_size" - - // tidb_index_lookup_size is used for index lookup executor. - // The index lookup executor first scan a batch of handles from a index, then use those handles to lookup the table - // rows, this value controls how much of handles in a batch to do a lookup task. - // Small value sends more RPCs to TiKV, consume more system resource. - // Large value may do more work than needed if the query has a limit. - TiDBIndexLookupSize = "tidb_index_lookup_size" - - // tidb_index_lookup_concurrency is used for index lookup executor. - // A lookup task may have 'tidb_index_lookup_size' of handles at maximun, the handles may be distributed - // in many TiKV nodes, we executes multiple concurrent index lookup tasks concurrently to reduce the time - // waiting for a task to finish. - // Set this value higher may reduce the latency but consumes more system resource. - TiDBIndexLookupConcurrency = "tidb_index_lookup_concurrency" - - // tidb_index_serial_scan_concurrency is used for controlling the concurrency of index scan operation - // when we need to keep the data output order the same as the order of index data. - TiDBIndexSerialScanConcurrency = "tidb_index_serial_scan_concurrency" - - // tidb_skip_utf8_check skips the UTF8 validate process, validate UTF8 has performance cost, if we can make sure - // the input string values are valid, we can skip the check. - TiDBSkipUTF8Check = "tidb_skip_utf8_check" - // tidb_batch_insert is used to enable/disable auto-split insert data. If set this option on, insert executor will automatically // insert data into multiple batches and use a single txn for each batch. This will be helpful when inserting large data. TiDBBatchInsert = "tidb_batch_insert" @@ -138,6 +100,47 @@ const ( TiDBEnableStreaming = "tidb_enable_streaming" ) +// TiDB session and global system variable names. +const ( + // tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task. + // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. + // Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact. + // If the query has a LIMIT clause, high concurrency makes the system do much more work than needed. + TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency" + + // tidb_index_join_batch_size is used to set the batch size of a index lookup join. + // The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor. + // This value controls how much of data in a batch to do the index join. + // Large value may reduce the latency but consumes more system resource. + TiDBIndexJoinBatchSize = "tidb_index_join_batch_size" + + // tidb_index_lookup_size is used for index lookup executor. + // The index lookup executor first scan a batch of handles from a index, then use those handles to lookup the table + // rows, this value controls how much of handles in a batch to do a lookup task. + // Small value sends more RPCs to TiKV, consume more system resource. + // Large value may do more work than needed if the query has a limit. + TiDBIndexLookupSize = "tidb_index_lookup_size" + + // tidb_index_lookup_concurrency is used for index lookup executor. + // A lookup task may have 'tidb_index_lookup_size' of handles at maximum, the handles may be distributed + // in many TiKV nodes, we executes multiple concurrent index lookup tasks concurrently to reduce the time + // waiting for a task to finish. + // Set this value higher may reduce the latency but consumes more system resource. + TiDBIndexLookupConcurrency = "tidb_index_lookup_concurrency" + + // tidb_index_serial_scan_concurrency is used for controlling the concurrency of index scan operation + // when we need to keep the data output order the same as the order of index data. + TiDBIndexSerialScanConcurrency = "tidb_index_serial_scan_concurrency" + + // tidb_skip_utf8_check skips the UTF8 validate process, validate UTF8 has performance cost, if we can make sure + // the input string values are valid, we can skip the check. + TiDBSkipUTF8Check = "tidb_skip_utf8_check" + + // tidb_hash_join_concurrency is used for hash join executor. + // The hash join outer executor starts multiple concurrent join workers to probe the hash table. + TiDBHashJoinConcurrency = "tidb_hash_join_concurrency" +) + // Default TiDB system variable values. const ( DefIndexLookupConcurrency = 4 @@ -164,6 +167,7 @@ const ( DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB. DefTiDBMemQuotaNestedLoopApply = 32 << 30 // 32GB. DefTiDBGeneralLog = 0 + DefTiDBHashJoinConcurrency = 5 ) // Process global variables. diff --git a/tidb-server/main.go b/tidb-server/main.go index 7bbfb52fa01b0..974ea9652ad68 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -368,7 +368,6 @@ func setGlobalVars() { ddl.RunWorker = cfg.RunDDL ddl.EnableSplitTableRegion = cfg.SplitTable session.SetCommitRetryLimit(cfg.Performance.RetryLimit) - plan.JoinConcurrency = cfg.Performance.JoinConcurrency plan.AllowCartesianProduct = cfg.Performance.CrossJoin privileges.SkipWithGrant = cfg.Security.SkipGrantTable From 7e6ace51c412260954e402bc6d3970359a29fdb5 Mon Sep 17 00:00:00 2001 From: XuHuaiyu <391585975@qq.com> Date: Mon, 9 Apr 2018 16:53:57 +0800 Subject: [PATCH 2/4] address comment --- sessionctx/variable/tidb_vars.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f886b754bec96..0fb1276eca8c5 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -26,7 +26,7 @@ package variable 8. Use this variable to control the behavior in code. */ -// TiDB session only system variable names. +// TiDB system variable names that only in session scope. const ( // tidb_snapshot is used for reading history data, the default value is empty string. // When the value is set to a datetime string like '2017-11-11 20:20:20', the session reads history data of that time. @@ -100,7 +100,7 @@ const ( TiDBEnableStreaming = "tidb_enable_streaming" ) -// TiDB session and global system variable names. +// TiDB system variable names that both in session and global scope. const ( // tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task. // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. From b989cd332e0197e3d3decb64045c89b0861ebb6d Mon Sep 17 00:00:00 2001 From: XuHuaiyu <391585975@qq.com> Date: Mon, 9 Apr 2018 17:19:26 +0800 Subject: [PATCH 3/4] tiny change --- plan/gen_physical_plans.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plan/gen_physical_plans.go b/plan/gen_physical_plans.go index aa0cf67ad69df..0524ee3d652b2 100644 --- a/plan/gen_physical_plans.go +++ b/plan/gen_physical_plans.go @@ -174,6 +174,7 @@ func (p *LogicalJoin) getHashJoin(prop *requiredProp, innerIdx int) *PhysicalHas DefaultValues: p.DefaultValues, InnerChildIdx: innerIdx, }.init(p.ctx, p.stats.scaleByExpectCnt(prop.expectedCnt), chReqProps...) + hashJoin.SetSchema(p.schema) return hashJoin } From ca5bb6c4ba1e057a4873f8e863c508c27e8f2218 Mon Sep 17 00:00:00 2001 From: XuHuaiyu <391585975@qq.com> Date: Thu, 12 Apr 2018 13:28:52 +0800 Subject: [PATCH 4/4] address comment --- plan/gen_physical_plans.go | 2 +- plan/logical_plan_test.go | 3 +++ sessionctx/variable/session.go | 10 ---------- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/plan/gen_physical_plans.go b/plan/gen_physical_plans.go index 0524ee3d652b2..b96242380e63c 100644 --- a/plan/gen_physical_plans.go +++ b/plan/gen_physical_plans.go @@ -170,7 +170,7 @@ func (p *LogicalJoin) getHashJoin(prop *requiredProp, innerIdx int) *PhysicalHas RightConditions: p.RightConditions, OtherConditions: p.OtherConditions, JoinType: p.JoinType, - Concurrency: p.ctx.GetSessionVars().GetHashJoinConcurrency(), + Concurrency: uint(p.ctx.GetSessionVars().HashJoinConcurrency), DefaultValues: p.DefaultValues, InnerChildIdx: innerIdx, }.init(p.ctx, p.stats.scaleByExpectCnt(prop.expectedCnt), chReqProps...) diff --git a/plan/logical_plan_test.go b/plan/logical_plan_test.go index def55388fef69..1c4e3691e6ab6 100644 --- a/plan/logical_plan_test.go +++ b/plan/logical_plan_test.go @@ -610,6 +610,7 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) { stmt, err := s.ParseOneStmt(ca.sql, "", "") c.Assert(err, IsNil, comment) + s.ctx.GetSessionVars().HashJoinConcurrency = 1 Preprocess(s.ctx, stmt, s.is, false) p, err := BuildLogicalPlan(s.ctx, stmt, s.is) c.Assert(err, IsNil) @@ -1361,6 +1362,7 @@ func (s *testPlanSuite) TestVisitInfo(c *C) { ctx: mockContext(), is: s.is, } + builder.ctx.GetSessionVars().HashJoinConcurrency = 1 builder.build(stmt) c.Assert(builder.err, IsNil, comment) @@ -1570,6 +1572,7 @@ func (s *testPlanSuite) TestNameResolver(c *C) { comment := Commentf("for %s", t.sql) stmt, err := s.ParseOneStmt(t.sql, "", "") c.Assert(err, IsNil, comment) + s.ctx.GetSessionVars().HashJoinConcurrency = 1 _, err = BuildLogicalPlan(s.ctx, stmt, s.is) if t.err == "" { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2ba7b82ac420b..e46269c82e614 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -15,7 +15,6 @@ package variable import ( "crypto/tls" - "strconv" "sync" "sync/atomic" "time" @@ -414,15 +413,6 @@ func (s *SessionVars) GetStatusFlag(flag uint16) bool { return s.Status&flag > 0 } -// GetHashJoinConcurrency gets the value of SessionVars.HashJoinConcurrency. -func (s *SessionVars) GetHashJoinConcurrency() uint { - concurrency, err := strconv.Atoi(s.systems[TiDBHashJoinConcurrency]) - if err != nil { - return uint(DefTiDBHashJoinConcurrency) - } - return uint(concurrency) -} - // InTxn returns if the session is in transaction. func (s *SessionVars) InTxn() bool { return s.GetStatusFlag(mysql.ServerStatusInTrans)