-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
checker(dm): check the number of connections before starting #5185
Changes from all commits
1e6a05f
415bb5f
b42a9a4
0136483
f3dfd15
6686b68
be668b6
bfb0028
eb401cc
ce0814c
d9e7448
077bf4c
6a8061c
8edaedc
f6927f4
5d4a196
bd23f69
ce394dc
20edb4b
c569133
c6e1a74
5827f28
c742122
a855984
89e2d9d
f27d5fd
348f7f9
b3d3d16
c235fb0
c48b5cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -88,6 +88,8 @@ type Checker struct { | |||
warnCnt int64 | ||||
|
||||
onlineDDL onlineddl.OnlinePlugin | ||||
|
||||
stCfgs []*config.SubTaskConfig | ||||
} | ||||
|
||||
// NewChecker returns a checker. | ||||
|
@@ -97,6 +99,7 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e | |||
checkingItems: checkingItems, | ||||
errCnt: errCnt, | ||||
warnCnt: warnCnt, | ||||
stCfgs: cfgs, | ||||
} | ||||
|
||||
for _, cfg := range cfgs { | ||||
|
@@ -144,6 +147,28 @@ func (c *Checker) Init(ctx context.Context) (err error) { | |||
if egErr := eg.Wait(); egErr != nil { | ||||
return egErr | ||||
} | ||||
// check connections | ||||
if _, ok := c.checkingItems[config.ConnNumberChecking]; ok { | ||||
if len(c.stCfgs) > 0 { | ||||
// only check the first subtask's config | ||||
// because the Mode is the same across all the subtasks | ||||
// as long as they are derived from the same task config. | ||||
switch c.stCfgs[0].Mode { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we just check the first item? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any code to ensure this behavior? It's better to leave some comments here to make the code clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. will leave a comment here. FYI: tiflow/dm/config/task_converters.go Line 49 in bd0cc36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
case config.ModeAll: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can left a TODO for syncer here, otherwise reviewer may get confused why the two cases are same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
// TODO: check the connections for syncer | ||||
// TODO: check for incremental mode | ||||
c.checkList = append(c.checkList, checker.NewLoaderConnNumberChecker(c.instances[0].targetDB, c.stCfgs)) | ||||
for i, inst := range c.instances { | ||||
c.checkList = append(c.checkList, checker.NewDumperConnNumberChecker(inst.sourceDB, c.stCfgs[i].MydumperConfig.Threads)) | ||||
} | ||||
case config.ModeFull: | ||||
c.checkList = append(c.checkList, checker.NewLoaderConnNumberChecker(c.instances[0].targetDB, c.stCfgs)) | ||||
for i, inst := range c.instances { | ||||
c.checkList = append(c.checkList, checker.NewDumperConnNumberChecker(inst.sourceDB, c.stCfgs[i].MydumperConfig.Threads)) | ||||
} | ||||
} | ||||
} | ||||
} | ||||
// targetTableID => source => [tables] | ||||
sharding := make(map[string]map[string][]*filter.Table) | ||||
shardingCounter := make(map[string]int) | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package checker | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
|
||
"github.com/pingcap/tiflow/dm/config" | ||
"github.com/pingcap/tiflow/dm/pkg/conn" | ||
tcontext "github.com/pingcap/tiflow/dm/pkg/context" | ||
"github.com/pingcap/tiflow/dm/pkg/log" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type connNumberChecker struct { | ||
toCheckDB *conn.BaseDB | ||
stCfgs []*config.SubTaskConfig | ||
|
||
getConfigConn func(stCfgs []*config.SubTaskConfig) int | ||
workerName string | ||
unlimitedConn bool | ||
} | ||
|
||
func newConnNumberChecker(toCheckDB *conn.BaseDB, stCfgs []*config.SubTaskConfig, fn func(stCfgs []*config.SubTaskConfig) int, workerName string) connNumberChecker { | ||
return connNumberChecker{ | ||
toCheckDB: toCheckDB, | ||
stCfgs: stCfgs, | ||
getConfigConn: fn, | ||
workerName: workerName, | ||
} | ||
} | ||
|
||
func (c *connNumberChecker) check(ctx context.Context, checkerName string) *Result { | ||
result := &Result{ | ||
Name: checkerName, | ||
Desc: "check if connetion concurrency exceeds database's maximum connection limit", | ||
State: StateFailure, | ||
} | ||
baseConn, err := c.toCheckDB.GetBaseConn(ctx) | ||
if err != nil { | ||
markCheckError(result, err) | ||
return result | ||
} | ||
//nolint:errcheck | ||
defer c.toCheckDB.CloseBaseConn(baseConn) | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
markCheckError(result, err) | ||
return result | ||
} | ||
var rows, processRows *sql.Rows | ||
rows, err = baseConn.QuerySQL(tcontext.NewContext(ctx, log.L()), "SHOW GLOBAL VARIABLES LIKE 'max_connections'") | ||
if err != nil { | ||
markCheckError(result, err) | ||
return result | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. defer rows.Close(). Close() can be called for many times |
||
defer rows.Close() | ||
var ( | ||
maxConn int | ||
variable string | ||
) | ||
for rows.Next() { | ||
err = rows.Scan(&variable, &maxConn) | ||
if err != nil { | ||
markCheckError(result, err) | ||
return result | ||
} | ||
} | ||
rows.Close() | ||
if maxConn == 0 { | ||
c.unlimitedConn = true | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result.State = StateSuccess | ||
return result | ||
} | ||
processRows, err = baseConn.QuerySQL(tcontext.NewContext(ctx, log.L()), "SHOW PROCESSLIST") | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
markCheckError(result, err) | ||
return result | ||
} | ||
defer processRows.Close() | ||
usedConn := 0 | ||
for processRows.Next() { | ||
usedConn++ | ||
} | ||
usedConn -= 1 // exclude the connection used for show processlist | ||
log.L().Debug("connection checker", zap.Int("maxConnections", maxConn), zap.Int("usedConnections", usedConn)) | ||
neededConn := c.getConfigConn(c.stCfgs) | ||
if neededConn > maxConn { | ||
// nonzero max_connections and needed connections exceed max_connections | ||
// FYI: https://github.com/pingcap/tidb/pull/35453 | ||
// currently, TiDB's max_connections is set to 0 representing unlimited connections, | ||
// while for MySQL, 0 is not a legal value (never retrieve from it). | ||
result.Errors = append( | ||
result.Errors, | ||
NewError( | ||
"checked database's max_connections: %d is less than the number %s needs: %d", | ||
maxConn, | ||
c.workerName, | ||
neededConn, | ||
), | ||
) | ||
result.Instruction = "set larger max_connections or adjust the configuration of dm" | ||
result.State = StateFailure | ||
} else { | ||
// nonzero max_connections and needed connections are less than or equal to max_connections | ||
result.State = StateSuccess | ||
if maxConn-usedConn < neededConn { | ||
result.State = StateWarning | ||
result.Instruction = "set larger max_connections or adjust the configuration of dm" | ||
result.Errors = append( | ||
result.Errors, | ||
NewError( | ||
"database's max_connections: %d, used_connections: %d, available_connections: %d is less than %s needs: %d", | ||
maxConn, | ||
usedConn, | ||
maxConn-usedConn, | ||
c.workerName, | ||
neededConn, | ||
), | ||
) | ||
} | ||
} | ||
return result | ||
} | ||
|
||
type LoaderConnNumberChecker struct { | ||
connNumberChecker | ||
} | ||
|
||
func NewLoaderConnNumberChecker(targetDB *conn.BaseDB, stCfgs []*config.SubTaskConfig) RealChecker { | ||
return &LoaderConnNumberChecker{ | ||
connNumberChecker: newConnNumberChecker(targetDB, stCfgs, func(stCfgs []*config.SubTaskConfig) int { | ||
loaderConn := 0 | ||
for _, stCfg := range stCfgs { | ||
// loader's worker and checkpoint (always keeps one db connection) | ||
buchuitoudegou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
loaderConn += stCfg.LoaderConfig.PoolSize + 1 | ||
} | ||
return loaderConn | ||
}, "loader"), | ||
} | ||
} | ||
|
||
func (l *LoaderConnNumberChecker) Name() string { | ||
return "loader_conn_number_checker" | ||
} | ||
|
||
func (l *LoaderConnNumberChecker) Check(ctx context.Context) *Result { | ||
result := l.check(ctx, l.Name()) | ||
if !l.unlimitedConn && result.State == StateFailure { | ||
// if the max_connections is set as a specific number | ||
// and we failed because of the number connecions needed is smaller than max_connections | ||
for _, stCfg := range l.stCfgs { | ||
if stCfg.NeedUseLightning() { | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// if we're using lightning, this error should be omitted | ||
// because lightning doesn't need to keep connections while restoring. | ||
result.Errors = append( | ||
result.Errors, | ||
NewWarn("task precheck cannot accurately check the number of connection needed for Lightning, please set a sufficiently large connections for TiDB"), | ||
) | ||
result.State = StateWarning | ||
break | ||
} | ||
} | ||
} | ||
return result | ||
} | ||
|
||
func NewDumperConnNumberChecker(sourceDB *conn.BaseDB, dumperThreads int) RealChecker { | ||
return &DumperConnNumberChecker{ | ||
connNumberChecker: newConnNumberChecker(sourceDB, nil, func(_ []*config.SubTaskConfig) int { | ||
// one for generating SQL, another for consistency control | ||
return dumperThreads + 2 | ||
}, "dumper"), | ||
} | ||
} | ||
|
||
type DumperConnNumberChecker struct { | ||
connNumberChecker | ||
} | ||
|
||
func (d *DumperConnNumberChecker) Check(ctx context.Context) *Result { | ||
return d.check(ctx, d.Name()) | ||
} | ||
|
||
func (d *DumperConnNumberChecker) Name() string { | ||
return "dumper_conn_number_checker" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package checker | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/DATA-DOG/go-sqlmock" | ||
"github.com/pingcap/tidb/dumpling/context" | ||
"github.com/pingcap/tiflow/dm/config" | ||
"github.com/pingcap/tiflow/dm/pkg/conn" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestConnNumberChecker(t *testing.T) { | ||
var err error | ||
db, dbMock, err := sqlmock.New() | ||
require.NoError(t, err) | ||
stCfgs := []*config.SubTaskConfig{ | ||
{ | ||
SyncerConfig: config.SyncerConfig{ | ||
WorkerCount: 10, | ||
}, | ||
LoaderConfig: config.LoaderConfig{ | ||
PoolSize: 16, | ||
}, | ||
}, | ||
} | ||
baseDB := conn.NewBaseDB(db, func() {}) | ||
// test loader: fail | ||
dbMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). | ||
AddRow("max_connections", 16)) | ||
dbMock.ExpectQuery("SHOW PROCESSLIST").WillReturnRows(sqlmock.NewRows( | ||
[]string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}). | ||
AddRow(1, "root", "localhost", "test", "Query", 0, "init", ""), | ||
) | ||
loaderChecker := NewLoaderConnNumberChecker(baseDB, stCfgs) | ||
result := loaderChecker.Check(context.Background()) | ||
require.Equal(t, 1, len(result.Errors)) | ||
require.Equal(t, StateFailure, result.State) | ||
require.Regexp(t, "(.|\n)*is less than the number loader(.|\n)*", result.Errors[0].ShortErr) | ||
|
||
// test loader: success | ||
db, dbMock, err = sqlmock.New() | ||
require.NoError(t, err) | ||
baseDB = conn.NewBaseDB(db, func() {}) | ||
dbMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). | ||
AddRow("max_connections", 17)) | ||
dbMock.ExpectQuery("SHOW PROCESSLIST").WillReturnRows(sqlmock.NewRows( | ||
[]string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}). | ||
AddRow(1, "root", "localhost", "test", "Query", 0, "init", ""), | ||
) | ||
loaderChecker = NewLoaderConnNumberChecker(baseDB, stCfgs) | ||
result = loaderChecker.Check(context.Background()) | ||
require.Equal(t, 0, len(result.Errors)) | ||
require.Equal(t, StateSuccess, result.State) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
--- | ||
name: test | ||
task-mode: full | ||
meta-schema: "dm_meta" | ||
enable-heartbeat: false | ||
shard-mode: "pessimistic" | ||
target-database: | ||
host: "127.0.0.1" | ||
port: 4000 | ||
user: "root" | ||
password: "" | ||
|
||
mysql-instances: | ||
- source-id: "mysql-replica-01" | ||
block-allow-list: "instance" | ||
mydumper-config-name: "global" | ||
loader-config-name: "global" | ||
|
||
block-allow-list: | ||
instance: | ||
do-dbs: ["dmctl_conn"] | ||
do-tables: | ||
- db-name: "dmctl_conn" | ||
tbl-name: "*" | ||
|
||
mydumpers: | ||
global: | ||
threads: 2 | ||
chunk-filesize: 64 | ||
skip-tz-utc: true | ||
extra-args: "" | ||
|
||
loaders: | ||
global: | ||
pool-size: 5 | ||
import-mode: "loader" | ||
dir: "./dumped_data" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we just check the first item?