Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd,sink(ticdc): validate changefeed params and revise error message #4482

Merged
merged 11 commits into from
Jan 26, 2022
1 change: 1 addition & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func newMySQLSink(
}

// create test db used for parameter detection
// Refer https://github.com/go-sql-driver/mysql#parameters
if dsn.Params == nil {
dsn.Params = make(map[string]string, 1)
}
Expand Down
44 changes: 42 additions & 2 deletions cdc/sink/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
// expose these two variables for redo log applier
DefaultWorkerCount = 16
DefaultMaxTxnRow = 256
// The upper limit of max worker counts.
maxWorkerCount = 1024
// The upper limit of max txn rows.
maxMaxTxnRow = 2048

defaultDMLMaxRetryTime = 8
defaultDDLMaxRetryTime = 20
Expand Down Expand Up @@ -113,16 +117,32 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c > 0 {
params.workerCount = c
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid worker-count %d, which must be greater than 0", c))
}
if c > maxWorkerCount {
log.Warn("worker-count too large",
zap.Int("original", c), zap.Int("override", maxWorkerCount))
c = maxWorkerCount
}
params.workerCount = c
}
s = sinkURI.Query().Get("max-txn-row")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid max-txn-row %d, which must be greater than 0", c))
}
if c > maxMaxTxnRow {
log.Warn("max-txn-row too large",
zap.Int("original", c), zap.Int("override", maxMaxTxnRow))
c = maxMaxTxnRow
}
params.maxTxnRow = c
}
s = sinkURI.Query().Get("tidb-txn-mode")
Expand Down Expand Up @@ -182,6 +202,14 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
if s == "" {
params.timezone = ""
} else {
value, err := url.QueryUnescape(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
_, err = time.LoadLocation(value)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.timezone = fmt.Sprintf(`"%s"`, s)
}
} else {
Expand All @@ -195,14 +223,26 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
// To keep the same style with other sink parameters, we use dash as word separator.
s = sinkURI.Query().Get("read-timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.readTimeout = s
}
s = sinkURI.Query().Get("write-timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.writeTimeout = s
}
s = sinkURI.Query().Get("timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.dialTimeout = s
}

Expand Down
48 changes: 47 additions & 1 deletion cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,63 @@ func TestParseSinkURITimezone(t *testing.T) {
}
}

func TestParseSinkURIOverride(t *testing.T) {
defer testleak.AfterTestT(t)()
cases := []struct {
uri string
checker func(*sinkParams)
}{{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}}
ctx := context.TODO()
opts := map[string]string{OptChangefeedID: "changefeed-01"}
var uri *url.URL
var err error
for _, cs := range cases {
if cs.uri != "" {
uri, err = url.Parse(cs.uri)
require.Nil(t, err)
} else {
uri = nil
}
p, err := parseSinkURIToParams(ctx, uri, opts)
require.Nil(t, err)
cs.checker(p)
}
}

func TestParseSinkURIBadQueryString(t *testing.T) {
defer testleak.AfterTestT(t)()
uris := []string{
"",
"postgre://127.0.0.1:3306",
"mysql://127.0.0.1:3306/?worker-count=not-number",
"mysql://127.0.0.1:3306/?worker-count=-1",
"mysql://127.0.0.1:3306/?worker-count=0",
"mysql://127.0.0.1:3306/?max-txn-row=not-number",
"mysql://127.0.0.1:3306/?max-txn-row=-1",
"mysql://127.0.0.1:3306/?max-txn-row=0",
"mysql://127.0.0.1:3306/?ssl-ca=only-ca-exists",
"mysql://127.0.0.1:3306/?batch-replace-enable=not-bool",
"mysql://127.0.0.1:3306/?batch-replace-enable=true&batch-replace-size=not-number",
"mysql://127.0.0.1:3306/?safe-mode=not-bool",
"mysql://127.0.0.1:3306/?time-zone=badtz",
"mysql://127.0.0.1:3306/?write-timeout=badduration",
"mysql://127.0.0.1:3306/?read-timeout=badduration",
"mysql://127.0.0.1:3306/?timeout=badduration",
}
ctx := context.TODO()
opts := map[string]string{OptChangefeedID: "changefeed-01"}
Expand All @@ -196,7 +242,7 @@ func TestParseSinkURIBadQueryString(t *testing.T) {
uri = nil
}
_, err = parseSinkURIToParams(ctx, uri, opts)
require.NotNil(t, err)
require.Error(t, err)
}
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,18 @@ func (o *createChangefeedOptions) complete(ctx context.Context, f factory.Factor
}
o.startTs = oracle.ComposeTS(ts, logical)
}

return o.completeCfg(ctx, cmd)
}

// completeCfg complete the replica config from file and cmd flags.
func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Command) error {
_, captureInfos, err := o.etcdClient.GetCaptures(ctx)
if err != nil {
return err
}

return o.completeCfg(cmd, captureInfos)
}

// completeCfg complete the replica config from file and cmd flags.
func (o *createChangefeedOptions) completeCfg(
cmd *cobra.Command, captureInfos []*model.CaptureInfo,
) error {
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -227,6 +228,16 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co
}
}

switch o.commonChangefeedOptions.sortEngine {
case model.SortInMemory:
case model.SortInFile:
case model.SortUnified:
default:
log.Warn("invalid sort-engine, use Unified Sorter by default",
zap.String("invalidSortEngine", o.commonChangefeedOptions.sortEngine))
o.commonChangefeedOptions.sortEngine = model.SortUnified
}

if o.commonChangefeedOptions.sortEngine == model.SortUnified && !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
o.commonChangefeedOptions.sortEngine = model.SortInMemory
log.Warn("The TiCDC cluster is built from an older version, disabling Unified Sorter by default",
Expand Down
34 changes: 34 additions & 0 deletions pkg/cmd/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/pingcap/tiflow/pkg/version"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -65,3 +67,35 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*")
}

func (s *changefeedSuite) TestInvalidSortEngine(c *check.C) {
defer testleak.AfterTest(c)()

cases := []struct {
input string
expect model.SortEngine
}{{
input: "invalid",
expect: model.SortUnified,
}, {
input: "memory",
expect: model.SortInMemory,
}, {
input: "file",
expect: model.SortInFile,
}, {
input: "unified",
expect: model.SortUnified,
}}
for _, cs := range cases {
cmd := new(cobra.Command)
o := newChangefeedCommonOptions()
o.addFlags(cmd)
c.Assert(cmd.ParseFlags([]string{"--sort-engine=" + cs.input}), check.IsNil)
opt := newCreateChangefeedOptions(o)
err := opt.completeCfg(cmd,
[]*model.CaptureInfo{{Version: version.MinTiCDCVersion.String()}})
c.Assert(err, check.IsNil)
c.Assert(opt.commonChangefeedOptions.sortEngine, check.Equals, cs.expect)
}
}
9 changes: 6 additions & 3 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
}
logConfig.Level = logLevel

pdEndpoints := strings.Split(f.GetPdAddr(), ",")
pdAddr := f.GetPdAddr()
pdEndpoints := strings.Split(pdAddr, ",")

etcdClient, err := clientv3.New(clientv3.Config{
Context: ctx,
Expand All @@ -118,7 +119,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
},
})
if err != nil {
return nil, err
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

client := etcd.NewCDCEtcdClient(ctx, etcdClient)
Expand Down Expand Up @@ -156,7 +158,8 @@ func (f factoryImpl) PdClient() (pd.Client, error) {
}),
))
if err != nil {
return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr)
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true)
Expand Down
11 changes: 4 additions & 7 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,21 @@ import (

var (
// minPDVersion is the version of the minimal compatible PD.
// TODO bump 5.2.0-alpha once PD releases.
minPDVersion *semver.Version = semver.New("5.1.0-alpha")
// maxPDVersion is the version of the maximum compatible PD.
// Compatible versions are in [minPDVersion, maxPDVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxPDVersion *semver.Version = semver.New("9999.0.0")

// MinTiKVVersion is the version of the minimal compatible TiKV.
// TODO bump 5.2.0-alpha once TiKV releases.
MinTiKVVersion *semver.Version = semver.New("5.1.0-alpha")
// maxTiKVVersion is the version of the maximum compatible TiKV.
// Compatible versions are in [MinTiKVVersion, maxTiKVVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiKVVersion *semver.Version = semver.New("9999.0.0")

// minTiCDCVersion is the version of the minimal compatible TiCDC.
// TODO bump 5.2.0-alpha once TiCDC releases.
minTiCDCVersion *semver.Version = semver.New("5.1.0-alpha")
// MinTiCDCVersion is the version of the minimal compatible TiCDC.
MinTiCDCVersion *semver.Version = semver.New("5.1.0-alpha")
// Compatible versions are in [MinTiCDCVersion, MaxTiCDCVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiCDCVersion *semver.Version = semver.New("9999.0.0")
Expand Down Expand Up @@ -266,11 +263,11 @@ func CheckTiCDCClusterVersion(cdcClusterVer TiCDCClusterVersion) (unknown bool,
return true, nil
}
ver := cdcClusterVer.Version
minOrd := ver.Compare(*minTiCDCVersion)
minOrd := ver.Compare(*MinTiCDCVersion)
if minOrd < 0 {
arg := fmt.Sprintf("TiCDC %s is not supported, the minimal compatible version is %s"+
"try tiup ctl:%s cdc [COMMAND]",
ver, minTiCDCVersion, ver)
ver, MinTiCDCVersion, ver)
return false, cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
maxOrd := ver.Compare(*maxTiCDCVersion)
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestCheckTiCDCClusterVersion(t *testing.T) {
expectedUnknown: true,
},
{
cdcClusterVersion: TiCDCClusterVersion{Version: minTiCDCVersion},
cdcClusterVersion: TiCDCClusterVersion{Version: MinTiCDCVersion},
expectedErr: "",
expectedUnknown: false,
},
Expand Down