From ff90ad07edca2451d53deee8bcdcfd80c4b8bd67 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 20 Jul 2022 19:02:58 +0800 Subject: [PATCH 1/8] cli (ticdc): fix query changefeed no config info --- cdc/model/http_model.go | 1 + pkg/cmd/cli/cli_changefeed_query.go | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/cdc/model/http_model.go b/cdc/model/http_model.go index 7b10279f6e8..c72e126a142 100644 --- a/cdc/model/http_model.go +++ b/cdc/model/http_model.go @@ -140,6 +140,7 @@ type ChangefeedDetail struct { CheckpointTime JSONTime `json:"checkpoint_time"` Engine SortEngine `json:"sort_engine,omitempty"` FeedState FeedState `json:"state"` + Config any `json:"config,omitempty"` RunningError *RunningError `json:"error"` ErrorHis []int64 `json:"error_history"` CreatorVersion string `json:"creator_version"` diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index d10e8018062..485b0af8deb 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -18,6 +18,8 @@ import ( "github.com/pingcap/errors" apiv1client "github.com/pingcap/tiflow/pkg/api/v1" + apiv2client "github.com/pingcap/tiflow/pkg/api/v2" + "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -26,8 +28,8 @@ import ( // queryChangefeedOptions defines flags for the `cli changefeed query` command. type queryChangefeedOptions struct { - apiClient apiv1client.APIV1Interface - + apiClient apiv1client.APIV1Interface + apiClientV2 apiv2client.APIV2Interface changefeedID string simplified bool } @@ -47,11 +49,16 @@ func (o *queryChangefeedOptions) addFlags(cmd *cobra.Command) { // complete adapts from the command line args to the data and client required. func (o *queryChangefeedOptions) complete(f factory.Factory) error { - client, err := f.APIV1Client() + clientV1, err := f.APIV1Client() + if err != nil { + return err + } + o.apiClient = clientV1 + clientV2, err := f.APIV2Client() if err != nil { return err } - o.apiClient = client + o.apiClientV2 = clientV2 return nil } @@ -74,6 +81,13 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err } + + info, err := o.apiClientV2.Changefeeds().GetInfo(ctx, o.changefeedID) + if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { + return err + } + detail.Config = info.Config + return util.JSONPrint(cmd, detail) } From f7efdecdc3ac523542b8d26e37934ea138391258 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 20 Jul 2022 19:02:58 +0800 Subject: [PATCH 2/8] fix ut error --- pkg/cmd/cli/cli_changefeed_query_test.go | 34 ++++++++++++++++++------ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pkg/cmd/cli/cli_changefeed_query_test.go b/pkg/cmd/cli/cli_changefeed_query_test.go index d8f000bbe0a..f371f9395d0 100644 --- a/pkg/cmd/cli/cli_changefeed_query_test.go +++ b/pkg/cmd/cli/cli_changefeed_query_test.go @@ -14,23 +14,30 @@ package cli import ( + "bytes" + "io/ioutil" "os" "testing" "github.com/golang/mock/gomock" "github.com/pingcap/errors" + v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/api/v1/mock" + mock_v1 "github.com/pingcap/tiflow/pkg/api/v1/mock" + mock_v2 "github.com/pingcap/tiflow/pkg/api/v2/mock" "github.com/stretchr/testify/require" ) func TestChangefeedQueryCli(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - cf := mock.NewMockChangefeedInterface(ctrl) - f := &mockFactory{changefeeds: cf} + cfV1 := mock_v1.NewMockChangefeedInterface(ctrl) + cfV2 := mock_v2.NewMockChangefeedInterface(ctrl) + + f := &mockFactory{changefeeds: cfV1, changefeedsv2: cfV2} + cmd := newCmdQueryChangefeed(f) - cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ + cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ { UpstreamID: 1, Namespace: "default", @@ -41,7 +48,7 @@ func TestChangefeedQueryCli(t *testing.T) { }, nil) os.Args = []string{"query", "--simple=true", "--changefeed-id=abc"} require.Nil(t, cmd.Execute()) - cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ + cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ { UpstreamID: 1, Namespace: "default", @@ -53,15 +60,26 @@ func TestChangefeedQueryCli(t *testing.T) { os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"} require.NotNil(t, cmd.Execute()) - cf.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test")) + cfV1.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test")) os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"} require.NotNil(t, cmd.Execute()) - cf.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil) + // query success + cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil) + cfV2.EXPECT().GetInfo(gomock.Any(), gomock.Any()).Return(&v2.ChangeFeedInfo{ + Config: v2.GetDefaultReplicaConfig(), + }, nil) os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"} + b := bytes.NewBufferString("") + cmd.SetOut(b) require.Nil(t, cmd.Execute()) + out, err := ioutil.ReadAll(b) + require.Nil(t, err) + // make suer config is printed + require.Contains(t, string(out), "config") - cf.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test")) + // query failed + cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test")) os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"} require.NotNil(t, cmd.Execute()) } From 6a3887582216adac5478dc12f32f0301fe2c2f79 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 20 Jul 2022 23:26:25 +0800 Subject: [PATCH 3/8] fix lint check error --- cdc/model/http_model.go | 2 +- docs/swagger/docs.go | 1 + docs/swagger/swagger.json | 1 + docs/swagger/swagger.yaml | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cdc/model/http_model.go b/cdc/model/http_model.go index c72e126a142..b1f7184a998 100644 --- a/cdc/model/http_model.go +++ b/cdc/model/http_model.go @@ -140,7 +140,7 @@ type ChangefeedDetail struct { CheckpointTime JSONTime `json:"checkpoint_time"` Engine SortEngine `json:"sort_engine,omitempty"` FeedState FeedState `json:"state"` - Config any `json:"config,omitempty"` + Config interface{} `json:"config,omitempty"` RunningError *RunningError `json:"error"` ErrorHis []int64 `json:"error_history"` CreatorVersion string `json:"creator_version"` diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 5334fc03edf..820c60809fd 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -931,6 +931,7 @@ var doc = `{ "checkpoint_tso": { "type": "integer" }, + "config": {}, "create_time": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 049589e6ae7..7b4a6fbffe2 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -912,6 +912,7 @@ "checkpoint_tso": { "type": "integer" }, + "config": {}, "create_time": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 32ff952e8bb..01fb941b42f 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -127,6 +127,7 @@ definitions: type: string checkpoint_tso: type: integer + config: {} create_time: type: string creator_version: From 5fd010c15cf9cb1426a8c79f97f4977c6c564d7d Mon Sep 17 00:00:00 2001 From: sdojjy Date: Thu, 21 Jul 2022 13:53:33 +0800 Subject: [PATCH 4/8] add filter --- cdc/model/http_model.go | 1 - pkg/cmd/cli/cli_changefeed_query.go | 43 +++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/cdc/model/http_model.go b/cdc/model/http_model.go index b1f7184a998..7b10279f6e8 100644 --- a/cdc/model/http_model.go +++ b/cdc/model/http_model.go @@ -140,7 +140,6 @@ type ChangefeedDetail struct { CheckpointTime JSONTime `json:"checkpoint_time"` Engine SortEngine `json:"sort_engine,omitempty"` FeedState FeedState `json:"state"` - Config interface{} `json:"config,omitempty"` RunningError *RunningError `json:"error"` ErrorHis []int64 `json:"error_history"` CreatorVersion string `json:"creator_version"` diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index 485b0af8deb..4ed65cbcd6c 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" apiv1client "github.com/pingcap/tiflow/pkg/api/v1" apiv2client "github.com/pingcap/tiflow/pkg/api/v2" @@ -26,6 +27,12 @@ import ( "github.com/spf13/cobra" ) +// cfMeta holds changefeed info and changefeed status. +type cfMeta struct { + Info *model.ChangeFeedInfo `json:"info"` + Status *model.ChangeFeedStatus `json:"status"` +} + // queryChangefeedOptions defines flags for the `cli changefeed query` command. type queryChangefeedOptions struct { apiClient apiv1client.APIV1Interface @@ -86,9 +93,39 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err } - detail.Config = info.Config - - return util.JSONPrint(cmd, detail) + var runningError *model.RunningError + if info.Error != nil { + runningError = &model.RunningError{ + Addr: info.Error.Addr, + Code: info.Error.Code, + Message: info.Error.Message, + } + } + meta := &cfMeta{ + Info: &model.ChangeFeedInfo{ + UpstreamID: info.UpstreamID, + Namespace: info.Namespace, + ID: info.ID, + SinkURI: info.SinkURI, + CreateTime: info.CreateTime, + StartTs: info.StartTs, + TargetTs: info.TargetTs, + AdminJobType: info.AdminJobType, + Engine: info.Engine, + Config: info.Config.ToInternalReplicaConfig(), + State: info.State, + Error: runningError, + SyncPointEnabled: info.SyncPointEnabled, + SyncPointInterval: info.SyncPointInterval, + CreatorVersion: info.CreatorVersion, + }, + Status: &model.ChangeFeedStatus{ + ResolvedTs: detail.ResolvedTs, + CheckpointTs: detail.CheckpointTSO, + AdminJobType: info.AdminJobType, + }, + } + return util.JSONPrint(cmd, meta) } // newCmdQueryChangefeed creates the `cli changefeed query` command. From f6ec723356de4c32e71d353445d10ea8630a8a59 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 21 Jul 2022 15:37:23 +0800 Subject: [PATCH 5/8] mask sink uri --- cdc/model/changefeed.go | 15 +++++---------- docs/swagger/docs.go | 1 - docs/swagger/swagger.json | 1 - docs/swagger/swagger.yaml | 1 - pkg/cmd/cli/cli_changefeed_query.go | 9 ++++++++- pkg/util/uri.go | 20 ++++++++++++++++++++ pkg/util/uri_test.go | 7 +++++++ 7 files changed, 40 insertions(+), 14 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index f9656d7898e..6463b64f8de 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -189,18 +190,12 @@ func (info *ChangeFeedInfo) String() (str string) { log.Error("failed to unmarshal changefeed info", zap.Error(err)) return } - sinkURIParsed, err := url.Parse(clone.SinkURI) + + clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) - return - } - if sinkURIParsed.User != nil && sinkURIParsed.User.String() != "" { - sinkURIParsed.User = url.UserPassword("username", "password") - } - if sinkURIParsed.Host != "" { - sinkURIParsed.Host = "***" + log.Error("failed to marshal changefeed info", zap.Error(err)) } - clone.SinkURI = sinkURIParsed.String() + str, err = clone.Marshal() if err != nil { log.Error("failed to marshal changefeed info", zap.Error(err)) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 820c60809fd..5334fc03edf 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -931,7 +931,6 @@ var doc = `{ "checkpoint_tso": { "type": "integer" }, - "config": {}, "create_time": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 7b4a6fbffe2..049589e6ae7 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -912,7 +912,6 @@ "checkpoint_tso": { "type": "integer" }, - "config": {}, "create_time": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 01fb941b42f..32ff952e8bb 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -127,7 +127,6 @@ definitions: type: string checkpoint_tso: type: integer - config: {} create_time: type: string creator_version: diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index 4ed65cbcd6c..ee3f8282485 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" cerror "github.com/pingcap/tiflow/pkg/errors" + cutil "github.com/pingcap/tiflow/pkg/util" "github.com/spf13/cobra" ) @@ -101,12 +102,18 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { Message: info.Error.Message, } } + + sinkURI, err := cutil.MaskSinkURI(info.SinkURI) + if err != nil { + return err + } + meta := &cfMeta{ Info: &model.ChangeFeedInfo{ UpstreamID: info.UpstreamID, Namespace: info.Namespace, ID: info.ID, - SinkURI: info.SinkURI, + SinkURI: sinkURI, CreateTime: info.CreateTime, StartTs: info.StartTs, TargetTs: info.TargetTs, diff --git a/pkg/util/uri.go b/pkg/util/uri.go index 3b0854a8ca0..b827ff5db3d 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -15,7 +15,11 @@ package util import ( "net" + "net/url" "strings" + + "github.com/ngaut/log" + "go.uber.org/zap" ) // IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI. @@ -61,3 +65,19 @@ func validOptionalPort(port string) bool { } return true } + +// MaskSinkURI returns a sink uri that sensitive infos has been masked. +func MaskSinkURI(uri string) (string, error) { + uriParsed, err := url.Parse(uri) + if err != nil { + log.Error("failed to parse sink URI", zap.Error(err)) + return "", err + } + if uriParsed.User != nil && uriParsed.User.String() != "" { + uriParsed.User = url.UserPassword("username", "password") + } + if uriParsed.Host != "" { + uriParsed.Host = "***" + } + return uriParsed.String(), nil +} diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index dc1c8bce983..261b412394c 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -66,3 +66,10 @@ func TestIsIPv6Address(t *testing.T) { }) } } + +func TestMaskSinkURI(t *testing.T) { + uri := "mysql://root:123456@127.0.0.1:3306/?time-zone=Asia/Shanghai" + maskedURI, err := MaskSinkURI(uri) + require.NoError(t, err) + require.Equal(t, "mysql://username:password@***/?time-zone=Asia/Shanghai", maskedURI) +} From c1a3539b109fb64f0b55c73666ab974b3ca07bc5 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 21 Jul 2022 17:26:34 +0800 Subject: [PATCH 6/8] fix error --- cdc/api/v1/api.go | 7 ++- cdc/api/v2/changefeed.go | 21 ++++++-- pkg/cmd/cli/cli_changefeed_query.go | 75 ++++++++++++++--------------- 3 files changed, 58 insertions(+), 45 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 8aa071d0c2d..858e0b47559 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -30,6 +30,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -238,12 +239,16 @@ func (h *OpenAPI) GetChangefeed(c *gin.Context) { }) } } + sinkURI, err := util.MaskSinkURI(info.SinkURI) + if err != nil { + log.Error("failed to mask sink URI", zap.Error(err)) + } changefeedDetail := &model.ChangefeedDetail{ UpstreamID: info.UpstreamID, Namespace: changefeedID.Namespace, ID: changefeedID.ID, - SinkURI: info.SinkURI, + SinkURI: sinkURI, CreateTime: model.JSONTime(info.CreateTime), StartTs: info.StartTs, TargetTs: info.TargetTs, diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index ec13d00c402..350cbf83629 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -127,7 +128,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ID), zap.String("changefeed", infoStr)) - c.JSON(http.StatusCreated, toAPIModel(info)) + c.JSON(http.StatusCreated, toAPIModel(info, true)) } // verifyTable verify table, return ineligibleTables and EligibleTables. @@ -270,7 +271,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { _ = c.Error(errors.Trace(err)) return } - c.JSON(http.StatusOK, toAPIModel(newCfInfo)) + c.JSON(http.StatusOK, toAPIModel(newCfInfo, true)) } // getChangeFeedMetaInfo returns the metaInfo of a changefeed @@ -288,7 +289,7 @@ func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) { _ = c.Error(err) return } - c.JSON(http.StatusOK, toAPIModel(info)) + c.JSON(http.StatusOK, toAPIModel(info, false)) } // resumeChangefeed handles update changefeed request. @@ -375,7 +376,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { c.Status(http.StatusOK) } -func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo { +func toAPIModel(info *model.ChangeFeedInfo, maskSinkURI bool) *ChangeFeedInfo { var runningError *RunningError if info.Error != nil { runningError = &RunningError{ @@ -384,11 +385,21 @@ func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo { Message: info.Error.Message, } } + + sinkURI := info.SinkURI + var err error + if maskSinkURI { + sinkURI, err = util.MaskSinkURI(sinkURI) + if err != nil { + log.Error("failed to mask sink URI", zap.Error(err)) + } + } + apiInfoModel := &ChangeFeedInfo{ UpstreamID: info.UpstreamID, Namespace: info.Namespace, ID: info.ID, - SinkURI: info.SinkURI, + SinkURI: sinkURI, CreateTime: info.CreateTime, StartTs: info.StartTs, TargetTs: info.TargetTs, diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index ee3f8282485..d68bc8a1c33 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/errors" + v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/cdc/model" apiv1client "github.com/pingcap/tiflow/pkg/api/v1" apiv2client "github.com/pingcap/tiflow/pkg/api/v2" @@ -24,14 +25,28 @@ import ( "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" cerror "github.com/pingcap/tiflow/pkg/errors" - cutil "github.com/pingcap/tiflow/pkg/util" "github.com/spf13/cobra" ) // cfMeta holds changefeed info and changefeed status. type cfMeta struct { - Info *model.ChangeFeedInfo `json:"info"` - Status *model.ChangeFeedStatus `json:"status"` + UpstreamID uint64 `json:"upstream_id"` + Namespace string `json:"namespace"` + ID string `json:"id"` + SinkURI string `json:"sink_uri"` + Config *v2.ReplicaConfig `json:"config"` + CreateTime model.JSONTime `json:"create_time"` + StartTs uint64 `json:"start_ts"` + ResolvedTs uint64 `json:"resolved_ts"` + TargetTs uint64 `json:"target_ts"` + CheckpointTSO uint64 `json:"checkpoint_tso"` + CheckpointTime model.JSONTime `json:"checkpoint_time"` + Engine model.SortEngine `json:"sort_engine,omitempty"` + FeedState model.FeedState `json:"state"` + RunningError *model.RunningError `json:"error"` + ErrorHis []int64 `json:"error_history"` + CreatorVersion string `json:"creator_version"` + TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"` } // queryChangefeedOptions defines flags for the `cli changefeed query` command. @@ -85,6 +100,7 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { } return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(o.changefeedID) } + detail, err := o.apiClient.Changefeeds().Get(ctx, o.changefeedID) if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err @@ -94,43 +110,24 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err } - var runningError *model.RunningError - if info.Error != nil { - runningError = &model.RunningError{ - Addr: info.Error.Addr, - Code: info.Error.Code, - Message: info.Error.Message, - } - } - - sinkURI, err := cutil.MaskSinkURI(info.SinkURI) - if err != nil { - return err - } - meta := &cfMeta{ - Info: &model.ChangeFeedInfo{ - UpstreamID: info.UpstreamID, - Namespace: info.Namespace, - ID: info.ID, - SinkURI: sinkURI, - CreateTime: info.CreateTime, - StartTs: info.StartTs, - TargetTs: info.TargetTs, - AdminJobType: info.AdminJobType, - Engine: info.Engine, - Config: info.Config.ToInternalReplicaConfig(), - State: info.State, - Error: runningError, - SyncPointEnabled: info.SyncPointEnabled, - SyncPointInterval: info.SyncPointInterval, - CreatorVersion: info.CreatorVersion, - }, - Status: &model.ChangeFeedStatus{ - ResolvedTs: detail.ResolvedTs, - CheckpointTs: detail.CheckpointTSO, - AdminJobType: info.AdminJobType, - }, + UpstreamID: detail.UpstreamID, + Namespace: detail.Namespace, + ID: detail.ID, + SinkURI: detail.SinkURI, + Config: info.Config, + CreateTime: detail.CreateTime, + StartTs: detail.StartTs, + ResolvedTs: detail.ResolvedTs, + TargetTs: detail.TargetTs, + CheckpointTSO: detail.CheckpointTSO, + CheckpointTime: detail.CheckpointTime, + Engine: detail.Engine, + FeedState: detail.FeedState, + RunningError: detail.RunningError, + ErrorHis: detail.ErrorHis, + CreatorVersion: detail.CreatorVersion, + TaskStatus: detail.TaskStatus, } return util.JSONPrint(cmd, meta) } From 0a6f4c8af3119148e4c171790de35a879bce0b30 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 21 Jul 2022 18:18:51 +0800 Subject: [PATCH 7/8] address comment --- pkg/util/uri.go | 6 ++---- pkg/util/uri_test.go | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/util/uri.go b/pkg/util/uri.go index b827ff5db3d..f982aa19ef1 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -74,10 +74,8 @@ func MaskSinkURI(uri string) (string, error) { return "", err } if uriParsed.User != nil && uriParsed.User.String() != "" { - uriParsed.User = url.UserPassword("username", "password") - } - if uriParsed.Host != "" { - uriParsed.Host = "***" + uriParsed.User = url.UserPassword(uriParsed.User.Username(), "xxxx") } + return uriParsed.String(), nil } diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index 261b412394c..ad45b685d08 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -71,5 +71,5 @@ func TestMaskSinkURI(t *testing.T) { uri := "mysql://root:123456@127.0.0.1:3306/?time-zone=Asia/Shanghai" maskedURI, err := MaskSinkURI(uri) require.NoError(t, err) - require.Equal(t, "mysql://username:password@***/?time-zone=Asia/Shanghai", maskedURI) + require.Equal(t, "mysql://root:xxxx@127.0.0.1:3306/?time-zone=Asia/Shanghai", maskedURI) } From 4f768ba4bde877c821ddb0ee5fa0feb2ae3ca91b Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 21 Jul 2022 18:30:29 +0800 Subject: [PATCH 8/8] fix error --- cdc/api/v2/changefeed_test.go | 3 +++ cdc/model/changefeed_test.go | 8 ++++---- pkg/cmd/cli/cli_changefeed_query_test.go | 2 +- pkg/util/uri.go | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 55119b7d104..9217c91da98 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/etcd" mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -211,6 +212,8 @@ func TestCreateChangefeed(t *testing.T) { err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, cfConfig.ID, resp.ID) + mysqlSink, err = util.MaskSinkURI(mysqlSink) + require.Nil(t, err) require.Equal(t, mysqlSink, resp.SinkURI) require.Equal(t, http.StatusCreated, w.Code) } diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 421c1293ffd..8763fa507c1 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -634,28 +634,28 @@ func TestChangefeedInfoStringer(t *testing.T) { SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", StartTs: 418881574869139457, }, - `.*kafka://\*\*\*/ticdc-test2.*`, + `.*kafka://.*ticdc-test2.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root:124567@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root:test%21%23%24%25%5E%26%2A@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306/.*`, }, } diff --git a/pkg/cmd/cli/cli_changefeed_query_test.go b/pkg/cmd/cli/cli_changefeed_query_test.go index f371f9395d0..5a75c1e6e7d 100644 --- a/pkg/cmd/cli/cli_changefeed_query_test.go +++ b/pkg/cmd/cli/cli_changefeed_query_test.go @@ -75,7 +75,7 @@ func TestChangefeedQueryCli(t *testing.T) { require.Nil(t, cmd.Execute()) out, err := ioutil.ReadAll(b) require.Nil(t, err) - // make suer config is printed + // make sure config is printed require.Contains(t, string(out), "config") // query failed diff --git a/pkg/util/uri.go b/pkg/util/uri.go index f982aa19ef1..94f94c56b40 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -18,7 +18,7 @@ import ( "net/url" "strings" - "github.com/ngaut/log" + "github.com/pingcap/log" "go.uber.org/zap" )