Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): check the number of connections before starting #5185

Merged
merged 30 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1e6a05f
fix: too many connection error
buchuitoudegou Apr 15, 2022
415bb5f
add ut for checker
buchuitoudegou Apr 18, 2022
b42a9a4
feat: add dumper checker
buchuitoudegou Apr 18, 2022
0136483
fix: comment
buchuitoudegou Apr 20, 2022
f3dfd15
Merge branch 'master' of https://github.com/pingcap/tiflow into many-…
buchuitoudegou May 10, 2022
6686b68
feat: check connection in it
buchuitoudegou May 11, 2022
be668b6
fix: ut
buchuitoudegou May 11, 2022
bfb0028
fix: lint
buchuitoudegou May 11, 2022
eb401cc
fix: it typo
buchuitoudegou May 11, 2022
ce0814c
fix: it
buchuitoudegou May 12, 2022
d9e7448
Merge branch 'master' of https://github.com/pingcap/tiflow into many-…
buchuitoudegou May 23, 2022
077bf4c
fix(partial): sharding ddl connection check
buchuitoudegou May 23, 2022
6a8061c
Merge branch 'master' of https://github.com/pingcap/tiflow into many-…
buchuitoudegou Jul 18, 2022
8edaedc
revert syncer checker
buchuitoudegou Jul 19, 2022
f6927f4
add warning when using lightning
buchuitoudegou Jul 19, 2022
5d4a196
fix it: 0 max_connections
buchuitoudegou Jul 19, 2022
bd23f69
remove warning when connections is 0
buchuitoudegou Jul 19, 2022
ce394dc
recover tidb connections
buchuitoudegou Jul 19, 2022
20edb4b
fix comment
buchuitoudegou Jul 21, 2022
c569133
Merge branch 'master' of https://github.com/pingcap/tiflow into many-…
buchuitoudegou Jul 26, 2022
c6e1a74
fix broken dep
buchuitoudegou Jul 26, 2022
5827f28
add comment
buchuitoudegou Jul 26, 2022
c742122
Update dm/pkg/checker/conn_checker.go
buchuitoudegou Jul 28, 2022
a855984
Update dm/pkg/checker/conn_checker.go
buchuitoudegou Jul 28, 2022
89e2d9d
fix comment
buchuitoudegou Jul 28, 2022
f27d5fd
fix: defer close
buchuitoudegou Jul 28, 2022
348f7f9
fix lint
buchuitoudegou Jul 28, 2022
b3d3d16
Merge branch 'master' into many-conn-err
ti-chi-bot Jul 29, 2022
c235fb0
Merge branch 'master' into many-conn-err
ti-chi-bot Jul 29, 2022
c48b5cb
Merge branch 'master' into many-conn-err
ti-chi-bot Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Checker struct {
warnCnt int64

onlineDDL onlineddl.OnlinePlugin

stCfgs []*config.SubTaskConfig
}

// NewChecker returns a checker.
Expand All @@ -97,6 +99,7 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e
checkingItems: checkingItems,
errCnt: errCnt,
warnCnt: warnCnt,
stCfgs: cfgs,
}

for _, cfg := range cfgs {
Expand Down Expand Up @@ -144,6 +147,28 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if egErr := eg.Wait(); egErr != nil {
return egErr
}
// check connections
if _, ok := c.checkingItems[config.ConnNumberChecking]; ok {
if len(c.stCfgs) > 0 {
// only check the first subtask's config
// because the Mode is the same across all the subtasks
// as long as they are derived from the same task config.
switch c.stCfgs[0].Mode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we just check the first item?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we just check the first item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode is a global and unique parameter in a task. It's should be the same across all the subtask derived from the same task.

Copy link
Contributor

@lonng lonng Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any code to ensure this behavior? It's better to leave some comments here to make the code clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. will leave a comment here.

FYI:

cfg.Mode = c.TaskMode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case config.ModeAll:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can left a TODO for syncer here, otherwise reviewer may get confused why the two cases are same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: check the connections for syncer
// TODO: check for incremental mode
c.checkList = append(c.checkList, checker.NewLoaderConnNumberChecker(c.instances[0].targetDB, c.stCfgs))
for i, inst := range c.instances {
c.checkList = append(c.checkList, checker.NewDumperConnNumberChecker(inst.sourceDB, c.stCfgs[i].MydumperConfig.Threads))
}
case config.ModeFull:
c.checkList = append(c.checkList, checker.NewLoaderConnNumberChecker(c.instances[0].targetDB, c.stCfgs))
for i, inst := range c.instances {
c.checkList = append(c.checkList, checker.NewDumperConnNumberChecker(inst.sourceDB, c.stCfgs[i].MydumperConfig.Threads))
}
}
}
}
// targetTableID => source => [tables]
sharding := make(map[string]map[string][]*filter.Table)
shardingCounter := make(map[string]int)
Expand Down
2 changes: 2 additions & 0 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
ShardAutoIncrementIDChecking = "auto_increment_ID"
OnlineDDLChecking = "online_ddl"
BinlogDBChecking = "binlog_db"
ConnNumberChecking = "conn_number"
)

// AllCheckingItems contains all checking items.
Expand All @@ -52,6 +53,7 @@ var AllCheckingItems = map[string]string{
ShardAutoIncrementIDChecking: "conflict auto increment ID of shard tables checking item",
OnlineDDLChecking: "online ddl checking item",
BinlogDBChecking: "binlog db checking item",
ConnNumberChecking: "connection number checking item",
}

// MaxSourceIDLength is the max length for dm-worker source id.
Expand Down
1 change: 0 additions & 1 deletion dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {

newCtx, cancel := context.WithCancel(ctx)
var dumpling *export.Dumper

if dumpling, err = export.NewDumper(newCtx, m.dumpConfig); err == nil {
m.mu.Lock()
m.core = dumpling
Expand Down
7 changes: 6 additions & 1 deletion dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,12 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
errs = append(errs, err)
}
}()

failpoint.Inject("longLoadProcess", func(val failpoint.Value) {
if sec, ok := val.(int); ok {
l.logger.Info("long loader unit", zap.Int("second", sec))
time.Sleep(time.Duration(sec) * time.Second)
}
})
err = l.Restore(newCtx)
close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan
cancel() // cancel the goroutines created in `Restore`.
Expand Down
197 changes: 197 additions & 0 deletions dm/pkg/checker/conn_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"context"
"database/sql"

"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
)

type connNumberChecker struct {
toCheckDB *conn.BaseDB
stCfgs []*config.SubTaskConfig

getConfigConn func(stCfgs []*config.SubTaskConfig) int
workerName string
unlimitedConn bool
}

func newConnNumberChecker(toCheckDB *conn.BaseDB, stCfgs []*config.SubTaskConfig, fn func(stCfgs []*config.SubTaskConfig) int, workerName string) connNumberChecker {
return connNumberChecker{
toCheckDB: toCheckDB,
stCfgs: stCfgs,
getConfigConn: fn,
workerName: workerName,
}
}

func (c *connNumberChecker) check(ctx context.Context, checkerName string) *Result {
result := &Result{
Name: checkerName,
Desc: "check if connetion concurrency exceeds database's maximum connection limit",
State: StateFailure,
}
baseConn, err := c.toCheckDB.GetBaseConn(ctx)
if err != nil {
markCheckError(result, err)
return result
}
defer c.toCheckDB.CloseBaseConn(baseConn)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
markCheckError(result, err)
return result
}
var rows, processRows *sql.Rows
rows, err = baseConn.QuerySQL(tcontext.NewContext(ctx, log.L()), "SHOW GLOBAL VARIABLES LIKE 'max_connections'")
if err != nil {
markCheckError(result, err)
return result
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer rows.Close(). Close() can be called for many times

defer rows.Close()
var (
maxConn int
variable string
)
for rows.Next() {
err = rows.Scan(&variable, &maxConn)
if err != nil {
markCheckError(result, err)
return result
}
}
rows.Close()
if maxConn == 0 {
c.unlimitedConn = true
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
result.State = StateSuccess
return result
}
processRows, err = baseConn.QuerySQL(tcontext.NewContext(ctx, log.L()), "SHOW PROCESSLIST")
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
markCheckError(result, err)
return result
}
defer processRows.Close()
usedConn := 0
for processRows.Next() {
usedConn++
}
usedConn -= 1 // exclude the connection used for show processlist
log.L().Debug("connection checker", zap.Int("maxConnections", maxConn), zap.Int("usedConnections", usedConn))
neededConn := c.getConfigConn(c.stCfgs)
if neededConn > maxConn {
// nonzero max_connections and needed connections exceed max_connections
// FYI: https://github.com/pingcap/tidb/pull/35453
// currently, TiDB's max_connections is set to 0 representing unlimited connections,
// while for MySQL, 0 is not a legal value (never retrieve from it).
result.Errors = append(
result.Errors,
NewError(
"checked database's max_connections: %d is less than the number %s needs: %d",
maxConn,
c.workerName,
neededConn,
),
)
result.Instruction = "set larger max_connections or adjust the configuration of dm"
result.State = StateFailure
} else {
// nonzero max_connections and needed connections are less than or equal to max_connections
result.State = StateSuccess
if maxConn-usedConn < neededConn {
result.State = StateWarning
result.Instruction = "set larger max_connections or adjust the configuration of dm"
result.Errors = append(
result.Errors,
NewError(
"database's max_connections: %d, used_connections: %d, available_connections: %d is less than %s needs: %d",
maxConn,
usedConn,
maxConn-usedConn,
c.workerName,
neededConn,
),
)
}
}
return result
}

type LoaderConnNumberChecker struct {
connNumberChecker
}

func NewLoaderConnNumberChecker(targetDB *conn.BaseDB, stCfgs []*config.SubTaskConfig) RealChecker {
return &LoaderConnNumberChecker{
connNumberChecker: newConnNumberChecker(targetDB, stCfgs, func(stCfgs []*config.SubTaskConfig) int {
loaderConn := 0
for _, stCfg := range stCfgs {
// loader's worker and checkpoint (always keeps one db connection)
buchuitoudegou marked this conversation as resolved.
Show resolved Hide resolved
loaderConn += stCfg.LoaderConfig.PoolSize + 1
}
return loaderConn
}, "loader"),
}
}

func (l *LoaderConnNumberChecker) Name() string {
return "loader_conn_number_checker"
}

func (l *LoaderConnNumberChecker) Check(ctx context.Context) *Result {
result := l.check(ctx, l.Name())
if !l.unlimitedConn && result.State == StateFailure {
// if the max_connections is set as a specific number
// and we failed because of the number connecions needed is smaller than max_connections
for _, stCfg := range l.stCfgs {
if stCfg.NeedUseLightning() {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// if we're using lightning, this error should be omitted
// because lightning doesn't need to keep connections while restoring.
result.Errors = append(
result.Errors,
NewWarn("task precheck cannot accurately check the number of connection needed for Lightning, please set a sufficiently large connections for TiDB"),
)
result.State = StateWarning
break
}
}
}
return result
}

func NewDumperConnNumberChecker(sourceDB *conn.BaseDB, dumperThreads int) RealChecker {
return &DumperConnNumberChecker{
connNumberChecker: newConnNumberChecker(sourceDB, nil, func(_ []*config.SubTaskConfig) int {
// one for generating SQL, another for consistency control
return dumperThreads + 2
}, "dumper"),
}
}

type DumperConnNumberChecker struct {
connNumberChecker
}

func (d *DumperConnNumberChecker) Check(ctx context.Context) *Result {
return d.check(ctx, d.Name())
}

func (d *DumperConnNumberChecker) Name() string {
return "dumper_conn_number_checker"
}
68 changes: 68 additions & 0 deletions dm/pkg/checker/conn_checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/stretchr/testify/require"
)

func TestConnNumberChecker(t *testing.T) {
var err error
db, dbMock, err := sqlmock.New()
require.NoError(t, err)
stCfgs := []*config.SubTaskConfig{
{
SyncerConfig: config.SyncerConfig{
WorkerCount: 10,
},
LoaderConfig: config.LoaderConfig{
PoolSize: 16,
},
},
}
baseDB := conn.NewBaseDB(db, func() {})
// test loader: fail
dbMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("max_connections", 16))
dbMock.ExpectQuery("SHOW PROCESSLIST").WillReturnRows(sqlmock.NewRows(
[]string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}).
AddRow(1, "root", "localhost", "test", "Query", 0, "init", ""),
)
loaderChecker := NewLoaderConnNumberChecker(baseDB, stCfgs)
result := loaderChecker.Check(context.Background())
require.Equal(t, 1, len(result.Errors))
require.Equal(t, StateFailure, result.State)
require.Regexp(t, "(.|\n)*is less than the number loader(.|\n)*", result.Errors[0].ShortErr)

// test loader: success
db, dbMock, err = sqlmock.New()
require.NoError(t, err)
baseDB = conn.NewBaseDB(db, func() {})
dbMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("max_connections", 17))
dbMock.ExpectQuery("SHOW PROCESSLIST").WillReturnRows(sqlmock.NewRows(
[]string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}).
AddRow(1, "root", "localhost", "test", "Query", 0, "init", ""),
)
loaderChecker = NewLoaderConnNumberChecker(baseDB, stCfgs)
result = loaderChecker.Check(context.Background())
require.Equal(t, 0, len(result.Errors))
require.Equal(t, StateSuccess, result.State)
}
37 changes: 37 additions & 0 deletions dm/tests/dmctl_command/conf/dm-task3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
name: test
task-mode: full
meta-schema: "dm_meta"
enable-heartbeat: false
shard-mode: "pessimistic"
target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"

block-allow-list:
instance:
do-dbs: ["dmctl_conn"]
do-tables:
- db-name: "dmctl_conn"
tbl-name: "*"

mydumpers:
global:
threads: 2
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 5
import-mode: "loader"
dir: "./dumped_data"
Loading