Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a tidb system variable tidb_hash_join_concurrency #6244

Merged
merged 9 commits into from
Apr 12, 2018
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ type Performance struct {
MaxProcs uint `toml:"max-procs" json:"max-procs"`
TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"`
RetryLimit uint `toml:"retry-limit" json:"retry-limit"`
JoinConcurrency uint `toml:"join-concurrency" json:"join-concurrency"`
CrossJoin bool `toml:"cross-join" json:"cross-join"`
StatsLease string `toml:"stats-lease" json:"stats-lease"`
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
Expand Down Expand Up @@ -252,7 +251,6 @@ var defaultConf = Config{
Performance: Performance{
TCPKeepAlive: true,
RetryLimit: 10,
JoinConcurrency: 5,
CrossJoin: true,
StatsLease: "3s",
RunAutoAnalyze: true,
Expand Down
3 changes: 0 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ tcp-keep-alive = true
# The maximum number of retries when commit a transaction.
retry-limit = 10

# The number of goroutines that participate joining.
join-concurrency = 5

# Whether support cartesian product.
cross-join = true

Expand Down
7 changes: 3 additions & 4 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ import (
)

func (s *testSuite) TestAggregation(c *C) {
plan.JoinConcurrency = 1
defer func() {
plan.JoinConcurrency = 5
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_hash_join_concurrency=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c int, d int)")
Expand Down Expand Up @@ -296,6 +293,8 @@ func (s *testSuite) TestAggregation(c *C) {
tk.MustExec(`insert into t values (6, '{"i": 0, "n": "n6"}')`)
tk.MustExec(`insert into t values (7, '{"i": -1, "n": "n7"}')`)
tk.MustQuery("select sum(tags->'$.i') from t").Check(testkit.Rows("14"))

tk.MustExec("set @@tidb_hash_join_concurrency=5")
}

func (s *testSuite) TestStreamAggPushDown(c *C) {
Expand Down
19 changes: 7 additions & 12 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,14 @@ func (s *testSuite) TestJoin(c *C) {
// This case is for testing:
// when the main thread calls Executor.Close() while the out data fetch worker and join workers are still working,
// we need to stop the goroutines as soon as possible to avoid unexpected error.
savedConcurrency := plan.JoinConcurrency
plan.JoinConcurrency = 5
tk.MustExec("set @@tidb_hash_join_concurrency=5")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int)")
for i := 0; i < 100; i++ {
tk.MustExec("insert into t value(1)")
}
result = tk.MustQuery("select /*+ TIDB_HJ(s, r) */ * from t as s join t as r on s.a = r.a limit 1;")
result.Check(testkit.Rows("1 1"))
plan.JoinConcurrency = savedConcurrency
}

func (s *testSuite) TestJoinCast(c *C) {
Expand Down Expand Up @@ -494,11 +492,8 @@ func (s *testSuite) TestSubquerySameTable(c *C) {
}

func (s *testSuite) TestSubquery(c *C) {
plan.JoinConcurrency = 1
defer func() {
plan.JoinConcurrency = 5
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_hash_join_concurrency=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c int, d int)")
Expand Down Expand Up @@ -659,6 +654,8 @@ func (s *testSuite) TestSubquery(c *C) {
tk.MustExec("insert into t1 values(1)")
tk.MustExec("insert into t2 values(1)")
tk.MustQuery("select * from t1 where a in (select a from t2)").Check(testkit.Rows("1"))

tk.MustExec("set @@tidb_hash_join_concurrency=5")
}

func (s *testSuite) TestInSubquery(c *C) {
Expand Down Expand Up @@ -732,12 +729,8 @@ func (s *testSuite) TestInSubquery(c *C) {
}

func (s *testSuite) TestJoinLeak(c *C) {
savedConcurrency := plan.JoinConcurrency
plan.JoinConcurrency = 1
defer func() {
plan.JoinConcurrency = savedConcurrency
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_hash_join_concurrency=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (d int)")
Expand All @@ -753,6 +746,8 @@ func (s *testSuite) TestJoinLeak(c *C) {
c.Assert(err, IsNil)
time.Sleep(100 * time.Millisecond)
result.Close()

tk.MustExec("set @@tidb_hash_join_concurrency=5")
}

func (s *testSuite) TestHashJoinExecEncodeDecodeRow(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion plan/gen_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (p *LogicalJoin) getHashJoin(prop *requiredProp, innerIdx int) *PhysicalHas
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
JoinType: p.JoinType,
Concurrency: JoinConcurrency,
Concurrency: uint(p.ctx.GetSessionVars().HashJoinConcurrency),
DefaultValues: p.DefaultValues,
InnerChildIdx: innerIdx,
}.init(p.ctx, p.stats.scaleByExpectCnt(prop.expectedCnt), chReqProps...)
Expand Down
3 changes: 3 additions & 0 deletions plan/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) {
stmt, err := s.ParseOneStmt(ca.sql, "", "")
c.Assert(err, IsNil, comment)

s.ctx.GetSessionVars().HashJoinConcurrency = 1
Preprocess(s.ctx, stmt, s.is, false)
p, err := BuildLogicalPlan(s.ctx, stmt, s.is)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1361,6 +1362,7 @@ func (s *testPlanSuite) TestVisitInfo(c *C) {
ctx: mockContext(),
is: s.is,
}
builder.ctx.GetSessionVars().HashJoinConcurrency = 1
builder.build(stmt)
c.Assert(builder.err, IsNil, comment)

Expand Down Expand Up @@ -1570,6 +1572,7 @@ func (s *testPlanSuite) TestNameResolver(c *C) {
comment := Commentf("for %s", t.sql)
stmt, err := s.ParseOneStmt(t.sql, "", "")
c.Assert(err, IsNil, comment)
s.ctx.GetSessionVars().HashJoinConcurrency = 1

_, err = BuildLogicalPlan(s.ctx, stmt, s.is)
if t.err == "" {
Expand Down
3 changes: 0 additions & 3 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ const (
cpuFactor = 0.9
)

// JoinConcurrency means the number of goroutines that participate in joining.
var JoinConcurrency uint = 5

// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get
// these tasks one by one.
var wholeTaskTypes = [...]taskType{copSingleReadTaskType, copDoubleReadTaskType, rootTaskType}
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBIndexLookupConcurrency + quoteCommaQuote +
variable.TiDBIndexLookupJoinConcurrency + quoteCommaQuote +
variable.TiDBIndexSerialScanConcurrency + quoteCommaQuote +
variable.TiDBHashJoinConcurrency + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + "')"

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ type SessionVars struct {
// DistSQLScanConcurrency is the number of concurrent dist SQL scan worker.
DistSQLScanConcurrency int

// HashJoinConcurrency is the number of concurrent hash join outer worker.
HashJoinConcurrency int

// IndexSerialScanConcurrency is the number of concurrent index serial scan worker.
IndexSerialScanConcurrency int

Expand Down Expand Up @@ -511,6 +514,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexJoinBatchSize = tidbOptPositiveInt(val, DefIndexJoinBatchSize)
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
s.HashJoinConcurrency = tidbOptPositiveInt(val, DefTiDBHashJoinConcurrency)
case TiDBDistSQLScanConcurrency:
s.DistSQLScanConcurrency = tidbOptPositiveInt(val, DefDistSQLScanConcurrency)
case TiDBIndexSerialScanConcurrency:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TIDBMemQuotaNestedLoopApply, strconv.FormatInt(DefTiDBMemQuotaNestedLoopApply, 10)},
{ScopeSession, TiDBEnableStreaming, "0"},
{ScopeSession, TxnIsolationOneShot, ""},
{ScopeGlobal | ScopeSession, TiDBHashJoinConcurrency, strconv.Itoa(DefTiDBHashJoinConcurrency)},
/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
{ScopeSession, TiDBConfig, ""},
Expand Down
94 changes: 49 additions & 45 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ package variable
8. Use this variable to control the behavior in code.
*/

// TiDB system variable names.
// TiDB system variable names that only in session scope.
const (
/* Session only */

// tidb_snapshot is used for reading history data, the default value is empty string.
// When the value is set to a datetime string like '2017-11-11 20:20:20', the session reads history data of that time.
TiDBSnapshot = "tidb_snapshot"
Expand All @@ -38,7 +36,7 @@ const (
// When the value is set to true, unique index constraint is not checked.
TiDBImportingData = "tidb_import_data"

// tidb_opt_agg_push_down is used to endable/disable the optimizer rule of aggregation push down.
// tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down.
TiDBOptAggPushDown = "tidb_opt_agg_push_down"

// tidb_opt_insubquery_unfold is used to enable/disable the optimizer rule of in subquery unfold.
Expand All @@ -60,47 +58,6 @@ const (
// tidb_config is a read-only variable that shows the config of the current server.
TiDBConfig = "tidb_config"

/* Session and global */

// tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task.
// A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes.
// Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact.
// If the query has a LIMIT clause, high concurrency makes the system do much more work than needed.
TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency"

// tidb_index_join_batch_size is used to set the batch size of a index lookup join.
// The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor.
// This value controls how much of data in a batch to do the index join.
// Large value may reduce the latency but consumes more system resource.
TiDBIndexJoinBatchSize = "tidb_index_join_batch_size"

// tidb_index_lookup_size is used for index lookup executor.
// The index lookup executor first scan a batch of handles from a index, then use those handles to lookup the table
// rows, this value controls how much of handles in a batch to do a lookup task.
// Small value sends more RPCs to TiKV, consume more system resource.
// Large value may do more work than needed if the query has a limit.
TiDBIndexLookupSize = "tidb_index_lookup_size"

// tidb_index_lookup_concurrency is used for index lookup executor.
// A lookup task may have 'tidb_index_lookup_size' of handles at maximun, the handles may be distributed
// in many TiKV nodes, we executes multiple concurrent index lookup tasks concurrently to reduce the time
// waiting for a task to finish.
// Set this value higher may reduce the latency but consumes more system resource.
TiDBIndexLookupConcurrency = "tidb_index_lookup_concurrency"

// tidb_index_lookup_join_concurrency is used for index lookup join executor.
// IndexLookUpJoin starts "tidb_index_lookup_join_concurrency" inner workers
// to fetch inner rows and join the matched (outer, inner) row pairs.
TiDBIndexLookupJoinConcurrency = "tidb_index_lookup_join_concurrency"

// tidb_index_serial_scan_concurrency is used for controlling the concurrency of index scan operation
// when we need to keep the data output order the same as the order of index data.
TiDBIndexSerialScanConcurrency = "tidb_index_serial_scan_concurrency"

// tidb_skip_utf8_check skips the UTF8 validate process, validate UTF8 has performance cost, if we can make sure
// the input string values are valid, we can skip the check.
TiDBSkipUTF8Check = "tidb_skip_utf8_check"

// tidb_batch_insert is used to enable/disable auto-split insert data. If set this option on, insert executor will automatically
// insert data into multiple batches and use a single txn for each batch. This will be helpful when inserting large data.
TiDBBatchInsert = "tidb_batch_insert"
Expand Down Expand Up @@ -143,6 +100,52 @@ const (
TiDBEnableStreaming = "tidb_enable_streaming"
)

// TiDB system variable names that both in session and global scope.
const (
// tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task.
// A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes.
// Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact.
// If the query has a LIMIT clause, high concurrency makes the system do much more work than needed.
TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency"

// tidb_index_join_batch_size is used to set the batch size of a index lookup join.
// The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor.
// This value controls how much of data in a batch to do the index join.
// Large value may reduce the latency but consumes more system resource.
TiDBIndexJoinBatchSize = "tidb_index_join_batch_size"

// tidb_index_lookup_size is used for index lookup executor.
// The index lookup executor first scan a batch of handles from a index, then use those handles to lookup the table
// rows, this value controls how much of handles in a batch to do a lookup task.
// Small value sends more RPCs to TiKV, consume more system resource.
// Large value may do more work than needed if the query has a limit.
TiDBIndexLookupSize = "tidb_index_lookup_size"

// tidb_index_lookup_concurrency is used for index lookup executor.
// A lookup task may have 'tidb_index_lookup_size' of handles at maximun, the handles may be distributed
// in many TiKV nodes, we executes multiple concurrent index lookup tasks concurrently to reduce the time
// waiting for a task to finish.
// Set this value higher may reduce the latency but consumes more system resource.
TiDBIndexLookupConcurrency = "tidb_index_lookup_concurrency"

// tidb_index_lookup_join_concurrency is used for index lookup join executor.
// IndexLookUpJoin starts "tidb_index_lookup_join_concurrency" inner workers
// to fetch inner rows and join the matched (outer, inner) row pairs.
TiDBIndexLookupJoinConcurrency = "tidb_index_lookup_join_concurrency"

// tidb_index_serial_scan_concurrency is used for controlling the concurrency of index scan operation
// when we need to keep the data output order the same as the order of index data.
TiDBIndexSerialScanConcurrency = "tidb_index_serial_scan_concurrency"

// tidb_skip_utf8_check skips the UTF8 validate process, validate UTF8 has performance cost, if we can make sure
// the input string values are valid, we can skip the check.
TiDBSkipUTF8Check = "tidb_skip_utf8_check"

// tidb_hash_join_concurrency is used for hash join executor.
// The hash join outer executor starts multiple concurrent join workers to probe the hash table.
TiDBHashJoinConcurrency = "tidb_hash_join_concurrency"
)

// Default TiDB system variable values.
const (
DefIndexLookupConcurrency = 4
Expand Down Expand Up @@ -170,6 +173,7 @@ const (
DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB.
DefTiDBMemQuotaNestedLoopApply = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBHashJoinConcurrency = 5
)

// Process global variables.
Expand Down
1 change: 0 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func setGlobalVars() {
ddl.RunWorker = cfg.RunDDL
ddl.EnableSplitTableRegion = cfg.SplitTable
session.SetCommitRetryLimit(cfg.Performance.RetryLimit)
plan.JoinConcurrency = cfg.Performance.JoinConcurrency
plan.AllowCartesianProduct = cfg.Performance.CrossJoin
privileges.SkipWithGrant = cfg.Security.SkipGrantTable

Expand Down