-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync-diff-inspector: add session configuration in toml #847
Changes from 10 commits
d0b5e2a
2989448
d41892c
faa428c
985a916
7a4ce3c
5e19c19
a12b6b6
3ea75fe
a419cfe
07629ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -121,7 +121,8 @@ type DataSource struct { | |||||
Router *router.Table | ||||||
RouteTargetSet map[string]struct{} `json:"-"` | ||||||
|
||||||
Conn *sql.DB | ||||||
Conn *sql.DB | ||||||
SessionConfig SessionConfig `toml:"session" json:"session"` | ||||||
} | ||||||
|
||||||
// IsAutoSnapshot returns true if the tidb_snapshot is expected to automatically | ||||||
|
@@ -192,6 +193,14 @@ func (d *DataSource) ToDriverConfig() *mysql.Config { | |||||
cfg.TLSConfig = d.Security.TLSName | ||||||
} | ||||||
|
||||||
for param, value := range d.SessionConfig { | ||||||
switch v := value.(type) { | ||||||
case string: | ||||||
cfg.Params[param] = "\"" + v + "\"" | ||||||
default: | ||||||
cfg.Params[param] = fmt.Sprintf("%v", v) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
return cfg | ||||||
} | ||||||
|
||||||
|
@@ -357,6 +366,9 @@ func (t *TaskConfig) ComputeConfigHash() (string, error) { | |||||
return fmt.Sprintf("%x", sha256.Sum256(hash)), nil | ||||||
} | ||||||
|
||||||
// SessionConfig the the session level configuration for data source. | ||||||
type SessionConfig map[string]any | ||||||
|
||||||
// Config is the configuration. | ||||||
type Config struct { | ||||||
*flag.FlagSet `json:"-"` | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,11 +16,15 @@ package common | |||||
import ( | ||||||
"database/sql" | ||||||
"encoding/base64" | ||||||
"fmt" | ||||||
|
||||||
"github.com/go-sql-driver/mysql" | ||||||
"github.com/pingcap/errors" | ||||||
"github.com/pingcap/failpoint" | ||||||
"github.com/pingcap/log" | ||||||
"github.com/pingcap/tidb-tools/sync_diff_inspector/config" | ||||||
tmysql "github.com/pingcap/tidb/pkg/parser/mysql" | ||||||
"go.uber.org/zap" | ||||||
) | ||||||
|
||||||
func tryConnectMySQL(cfg *mysql.Config) (*sql.DB, error) { | ||||||
|
@@ -43,8 +47,39 @@ func tryConnectMySQL(cfg *mysql.Config) (*sql.DB, error) { | |||||
return db, nil | ||||||
} | ||||||
|
||||||
func verifyParams(db *sql.DB, sessionCfg *config.SessionConfig) error { | ||||||
if sessionCfg == nil { | ||||||
return nil | ||||||
} | ||||||
for param, value := range *sessionCfg { | ||||||
res, err := db.Query(fmt.Sprintf("show session variables like '%s'", param)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
//nolint: errcheck | ||||||
defer res.Close() | ||||||
if res.Next() { | ||||||
var paramName, actual string | ||||||
if err := res.Scan(¶mName, &actual); err != nil { | ||||||
return err | ||||||
} | ||||||
expected := fmt.Sprintf("%v", value) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if actual != expected { | ||||||
log.Warn("The session variable was set, but the database returned a different value", | ||||||
zap.String("variable", param), | ||||||
zap.String("configured_value", expected), | ||||||
zap.String("effective_value", actual), | ||||||
) | ||||||
} | ||||||
} else { | ||||||
return fmt.Errorf("parameter %s not found", param) | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
// ConnectMySQL creates sql.DB used for select data | ||||||
func ConnectMySQL(cfg *mysql.Config, num int) (db *sql.DB, err error) { | ||||||
func ConnectMySQL(sessionCfg *config.SessionConfig, cfg *mysql.Config, num int) (db *sql.DB, err error) { | ||||||
defer func() { | ||||||
if err == nil && db != nil { | ||||||
// SetMaxOpenConns and SetMaxIdleConns for connection to avoid error like | ||||||
|
@@ -55,7 +90,7 @@ func ConnectMySQL(cfg *mysql.Config, num int) (db *sql.DB, err error) { | |||||
}() | ||||||
// Try plain password first. | ||||||
db, firstErr := tryConnectMySQL(cfg) | ||||||
if firstErr == nil { | ||||||
if firstErr == nil && verifyParams(db, sessionCfg) == nil { | ||||||
return db, nil | ||||||
} | ||||||
// If access is denied and password is encoded by base64, try the decoded string as well. | ||||||
|
@@ -64,7 +99,7 @@ func ConnectMySQL(cfg *mysql.Config, num int) (db *sql.DB, err error) { | |||||
if password, decodeErr := base64.StdEncoding.DecodeString(cfg.Passwd); decodeErr == nil && string(password) != cfg.Passwd { | ||||||
cfg.Passwd = string(password) | ||||||
db2, err := tryConnectMySQL(cfg) | ||||||
if err == nil { | ||||||
if err == nil && verifyParams(db2, sessionCfg) == nil { | ||||||
return db2, nil | ||||||
} | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -16,6 +16,7 @@ package source | |||||||||
import ( | ||||||||||
"context" | ||||||||||
"database/sql" | ||||||||||
"fmt" | ||||||||||
"sort" | ||||||||||
"strings" | ||||||||||
"time" | ||||||||||
|
@@ -237,7 +238,7 @@ func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, con | |||||||||
} | ||||||||||
|
||||||||||
func getAutoSnapshotPosition(cfg *mysql.Config) (string, string, error) { | ||||||||||
tmpConn, err := common.ConnectMySQL(cfg, 2) | ||||||||||
tmpConn, err := common.ConnectMySQL(nil, cfg, 2) | ||||||||||
if err != nil { | ||||||||||
return "", "", errors.Annotatef(err, "connecting to auto-position tidb_snapshot failed") | ||||||||||
} | ||||||||||
|
@@ -269,22 +270,36 @@ func initDBConn(ctx context.Context, cfg *config.Config) error { | |||||||||
} | ||||||||||
// we had `cfg.SplitThreadCount` producers and `cfg.CheckThreadCount` consumer to use db connections maybe and `cfg.CheckThreadCount` splitter to split buckets. | ||||||||||
// so the connection count need to be cfg.SplitThreadCount + cfg.CheckThreadCount + cfg.CheckThreadCount. | ||||||||||
targetConn, err := common.ConnectMySQL(cfg.Task.TargetInstance.ToDriverConfig(), cfg.SplitThreadCount+2*cfg.CheckThreadCount) | ||||||||||
targetConn, err := common.ConnectMySQL( | ||||||||||
&cfg.Task.TargetInstance.SessionConfig, | ||||||||||
cfg.Task.TargetInstance.ToDriverConfig(), | ||||||||||
cfg.SplitThreadCount+2*cfg.CheckThreadCount, | ||||||||||
) | ||||||||||
if err != nil { | ||||||||||
log.Error(fmt.Sprintf("failed to configure session for data source '%s'", cfg.Task.Target), | ||||||||||
zap.String("error", err.Error()), | ||||||||||
) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
return errors.Trace(err) | ||||||||||
} | ||||||||||
|
||||||||||
cfg.Task.TargetInstance.Conn = targetConn | ||||||||||
|
||||||||||
for _, source := range cfg.Task.SourceInstances { | ||||||||||
for sourceIdx, source := range cfg.Task.SourceInstances { | ||||||||||
// If it is still set to AUTO it means it was not set on the target. | ||||||||||
// We require it to be set to AUTO on both. | ||||||||||
if source.IsAutoSnapshot() { | ||||||||||
return errors.Errorf("'auto' snapshot should be set on both target and source") | ||||||||||
} | ||||||||||
// connect source db with target db time_zone | ||||||||||
conn, err := common.ConnectMySQL(source.ToDriverConfig(), cfg.SplitThreadCount+2*cfg.CheckThreadCount) | ||||||||||
conn, err := common.ConnectMySQL( | ||||||||||
&source.SessionConfig, | ||||||||||
source.ToDriverConfig(), | ||||||||||
cfg.SplitThreadCount+2*cfg.CheckThreadCount, | ||||||||||
) | ||||||||||
if err != nil { | ||||||||||
log.Error(fmt.Sprintf("failed to configure session for data source '%s'", cfg.Task.Source[sourceIdx]), | ||||||||||
zap.String("error", err.Error()), | ||||||||||
) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
return errors.Trace(err) | ||||||||||
} | ||||||||||
source.Conn = conn | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.