diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index d2946f85197..1b2dc9f5602 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -116,6 +116,7 @@ func newMySQLSink( } // create test db used for parameter detection + // Refer https://github.com/go-sql-driver/mysql#parameters if dsn.Params == nil { dsn.Params = make(map[string]string, 1) } diff --git a/cdc/sink/mysql_params.go b/cdc/sink/mysql_params.go index a374dbb2538..9f0fec92a41 100644 --- a/cdc/sink/mysql_params.go +++ b/cdc/sink/mysql_params.go @@ -35,6 +35,10 @@ const ( // expose these two variables for redo log applier DefaultWorkerCount = 16 DefaultMaxTxnRow = 256 + // The upper limit of max worker counts. + maxWorkerCount = 1024 + // The upper limit of max txn rows. + maxMaxTxnRow = 2048 defaultDMLMaxRetryTime = 8 defaultDDLMaxRetryTime = 20 @@ -113,9 +117,16 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string if err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) } - if c > 0 { - params.workerCount = c + if c <= 0 { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, + fmt.Errorf("invalid worker-count %d, which must be greater than 0", c)) } + if c > maxWorkerCount { + log.Warn("worker-count too large", + zap.Int("original", c), zap.Int("override", maxWorkerCount)) + c = maxWorkerCount + } + params.workerCount = c } s = sinkURI.Query().Get("max-txn-row") if s != "" { @@ -123,6 +134,15 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string if err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) } + if c <= 0 { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, + fmt.Errorf("invalid max-txn-row %d, which must be greater than 0", c)) + } + if c > maxMaxTxnRow { + log.Warn("max-txn-row too large", + zap.Int("original", c), zap.Int("override", maxMaxTxnRow)) + c = maxMaxTxnRow + } params.maxTxnRow = c } s = sinkURI.Query().Get("tidb-txn-mode") @@ -182,6 +202,14 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string if s == "" { params.timezone = "" } else { + value, err := url.QueryUnescape(s) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + _, err = time.LoadLocation(value) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } params.timezone = fmt.Sprintf(`"%s"`, s) } } else { @@ -195,14 +223,26 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string // To keep the same style with other sink parameters, we use dash as word separator. s = sinkURI.Query().Get("read-timeout") if s != "" { + _, err := time.ParseDuration(s) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } params.readTimeout = s } s = sinkURI.Query().Get("write-timeout") if s != "" { + _, err := time.ParseDuration(s) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } params.writeTimeout = s } s = sinkURI.Query().Get("timeout") if s != "" { + _, err := time.ParseDuration(s) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } params.dialTimeout = s } diff --git a/cdc/sink/mysql_params_test.go b/cdc/sink/mysql_params_test.go index 91f8a0e6d00..075e5d41d26 100644 --- a/cdc/sink/mysql_params_test.go +++ b/cdc/sink/mysql_params_test.go @@ -172,17 +172,63 @@ func TestParseSinkURITimezone(t *testing.T) { } } +func TestParseSinkURIOverride(t *testing.T) { + defer testleak.AfterTestT(t)() + cases := []struct { + uri string + checker func(*sinkParams) + }{{ + uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max + checker: func(sp *sinkParams) { + require.EqualValues(t, sp.workerCount, maxWorkerCount) + }, + }, { + uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max + checker: func(sp *sinkParams) { + require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow) + }, + }, { + uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode", + checker: func(sp *sinkParams) { + require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode) + }, + }} + ctx := context.TODO() + opts := map[string]string{OptChangefeedID: "changefeed-01"} + var uri *url.URL + var err error + for _, cs := range cases { + if cs.uri != "" { + uri, err = url.Parse(cs.uri) + require.Nil(t, err) + } else { + uri = nil + } + p, err := parseSinkURIToParams(ctx, uri, opts) + require.Nil(t, err) + cs.checker(p) + } +} + func TestParseSinkURIBadQueryString(t *testing.T) { defer testleak.AfterTestT(t)() uris := []string{ "", "postgre://127.0.0.1:3306", "mysql://127.0.0.1:3306/?worker-count=not-number", + "mysql://127.0.0.1:3306/?worker-count=-1", + "mysql://127.0.0.1:3306/?worker-count=0", "mysql://127.0.0.1:3306/?max-txn-row=not-number", + "mysql://127.0.0.1:3306/?max-txn-row=-1", + "mysql://127.0.0.1:3306/?max-txn-row=0", "mysql://127.0.0.1:3306/?ssl-ca=only-ca-exists", "mysql://127.0.0.1:3306/?batch-replace-enable=not-bool", "mysql://127.0.0.1:3306/?batch-replace-enable=true&batch-replace-size=not-number", "mysql://127.0.0.1:3306/?safe-mode=not-bool", + "mysql://127.0.0.1:3306/?time-zone=badtz", + "mysql://127.0.0.1:3306/?write-timeout=badduration", + "mysql://127.0.0.1:3306/?read-timeout=badduration", + "mysql://127.0.0.1:3306/?timeout=badduration", } ctx := context.TODO() opts := map[string]string{OptChangefeedID: "changefeed-01"} @@ -196,7 +242,7 @@ func TestParseSinkURIBadQueryString(t *testing.T) { uri = nil } _, err = parseSinkURIToParams(ctx, uri, opts) - require.NotNil(t, err) + require.Error(t, err) } } diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 9f8667026f0..1f819a5ced8 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -163,17 +163,18 @@ func (o *createChangefeedOptions) complete(ctx context.Context, f factory.Factor } o.startTs = oracle.ComposeTS(ts, logical) } - - return o.completeCfg(ctx, cmd) -} - -// completeCfg complete the replica config from file and cmd flags. -func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Command) error { _, captureInfos, err := o.etcdClient.GetCaptures(ctx) if err != nil { return err } + return o.completeCfg(cmd, captureInfos) +} + +// completeCfg complete the replica config from file and cmd flags. +func (o *createChangefeedOptions) completeCfg( + cmd *cobra.Command, captureInfos []*model.CaptureInfo, +) error { cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return errors.Trace(err) @@ -227,6 +228,16 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co } } + switch o.commonChangefeedOptions.sortEngine { + case model.SortInMemory: + case model.SortInFile: + case model.SortUnified: + default: + log.Warn("invalid sort-engine, use Unified Sorter by default", + zap.String("invalidSortEngine", o.commonChangefeedOptions.sortEngine)) + o.commonChangefeedOptions.sortEngine = model.SortUnified + } + if o.commonChangefeedOptions.sortEngine == model.SortUnified && !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() { o.commonChangefeedOptions.sortEngine = model.SortInMemory log.Warn("The TiCDC cluster is built from an older version, disabling Unified Sorter by default", diff --git a/pkg/cmd/cli/cli_changefeed_create_test.go b/pkg/cmd/cli/cli_changefeed_create_test.go index 658346d38e6..6e877f617a9 100644 --- a/pkg/cmd/cli/cli_changefeed_create_test.go +++ b/pkg/cmd/cli/cli_changefeed_create_test.go @@ -20,8 +20,10 @@ import ( "testing" "github.com/pingcap/check" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/pingcap/tiflow/pkg/version" "github.com/spf13/cobra" ) @@ -65,3 +67,35 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) { c.Assert(err, check.NotNil) c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*") } + +func (s *changefeedSuite) TestInvalidSortEngine(c *check.C) { + defer testleak.AfterTest(c)() + + cases := []struct { + input string + expect model.SortEngine + }{{ + input: "invalid", + expect: model.SortUnified, + }, { + input: "memory", + expect: model.SortInMemory, + }, { + input: "file", + expect: model.SortInFile, + }, { + input: "unified", + expect: model.SortUnified, + }} + for _, cs := range cases { + cmd := new(cobra.Command) + o := newChangefeedCommonOptions() + o.addFlags(cmd) + c.Assert(cmd.ParseFlags([]string{"--sort-engine=" + cs.input}), check.IsNil) + opt := newCreateChangefeedOptions(o) + err := opt.completeCfg(cmd, + []*model.CaptureInfo{{Version: version.MinTiCDCVersion.String()}}) + c.Assert(err, check.IsNil) + c.Assert(opt.commonChangefeedOptions.sortEngine, check.Equals, cs.expect) + } +} diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index fc4e9c77518..737e5e8f3ed 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -93,7 +93,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) { } logConfig.Level = logLevel - pdEndpoints := strings.Split(f.GetPdAddr(), ",") + pdAddr := f.GetPdAddr() + pdEndpoints := strings.Split(pdAddr, ",") etcdClient, err := clientv3.New(clientv3.Config{ Context: ctx, @@ -118,7 +119,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) { }, }) if err != nil { - return nil, err + return nil, errors.Annotatef(err, + "fail to open PD client, please check pd address \"%s\"", pdAddr) } client := etcd.NewCDCEtcdClient(ctx, etcdClient) @@ -156,7 +158,8 @@ func (f factoryImpl) PdClient() (pd.Client, error) { }), )) if err != nil { - return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr) + return nil, errors.Annotatef(err, + "fail to open PD client, please check pd address \"%s\"", pdAddr) } err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true) diff --git a/pkg/version/check.go b/pkg/version/check.go index 436e6306927..6647bc10ec0 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -35,7 +35,6 @@ import ( var ( // minPDVersion is the version of the minimal compatible PD. - // TODO bump 5.2.0-alpha once PD releases. minPDVersion *semver.Version = semver.New("5.1.0-alpha") // maxPDVersion is the version of the maximum compatible PD. // Compatible versions are in [minPDVersion, maxPDVersion) @@ -43,16 +42,14 @@ var ( maxPDVersion *semver.Version = semver.New("9999.0.0") // MinTiKVVersion is the version of the minimal compatible TiKV. - // TODO bump 5.2.0-alpha once TiKV releases. MinTiKVVersion *semver.Version = semver.New("5.1.0-alpha") // maxTiKVVersion is the version of the maximum compatible TiKV. // Compatible versions are in [MinTiKVVersion, maxTiKVVersion) // 9999.0.0 disables the check effectively in the master branch. maxTiKVVersion *semver.Version = semver.New("9999.0.0") - // minTiCDCVersion is the version of the minimal compatible TiCDC. - // TODO bump 5.2.0-alpha once TiCDC releases. - minTiCDCVersion *semver.Version = semver.New("5.1.0-alpha") + // MinTiCDCVersion is the version of the minimal compatible TiCDC. + MinTiCDCVersion *semver.Version = semver.New("5.1.0-alpha") // Compatible versions are in [MinTiCDCVersion, MaxTiCDCVersion) // 9999.0.0 disables the check effectively in the master branch. maxTiCDCVersion *semver.Version = semver.New("9999.0.0") @@ -266,11 +263,11 @@ func CheckTiCDCClusterVersion(cdcClusterVer TiCDCClusterVersion) (unknown bool, return true, nil } ver := cdcClusterVer.Version - minOrd := ver.Compare(*minTiCDCVersion) + minOrd := ver.Compare(*MinTiCDCVersion) if minOrd < 0 { arg := fmt.Sprintf("TiCDC %s is not supported, the minimal compatible version is %s"+ "try tiup ctl:%s cdc [COMMAND]", - ver, minTiCDCVersion, ver) + ver, MinTiCDCVersion, ver) return false, cerror.ErrVersionIncompatible.GenWithStackByArgs(arg) } maxOrd := ver.Compare(*maxTiCDCVersion) diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 99878761e32..feaae574ca3 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -338,7 +338,7 @@ func TestCheckTiCDCClusterVersion(t *testing.T) { expectedUnknown: true, }, { - cdcClusterVersion: TiCDCClusterVersion{Version: minTiCDCVersion}, + cdcClusterVersion: TiCDCClusterVersion{Version: MinTiCDCVersion}, expectedErr: "", expectedUnknown: false, },