diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 8aa071d0c2d..858e0b47559 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -30,6 +30,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -238,12 +239,16 @@ func (h *OpenAPI) GetChangefeed(c *gin.Context) { }) } } + sinkURI, err := util.MaskSinkURI(info.SinkURI) + if err != nil { + log.Error("failed to mask sink URI", zap.Error(err)) + } changefeedDetail := &model.ChangefeedDetail{ UpstreamID: info.UpstreamID, Namespace: changefeedID.Namespace, ID: changefeedID.ID, - SinkURI: info.SinkURI, + SinkURI: sinkURI, CreateTime: model.JSONTime(info.CreateTime), StartTs: info.StartTs, TargetTs: info.TargetTs, diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index ec13d00c402..350cbf83629 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -127,7 +128,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ID), zap.String("changefeed", infoStr)) - c.JSON(http.StatusCreated, toAPIModel(info)) + c.JSON(http.StatusCreated, toAPIModel(info, true)) } // verifyTable verify table, return ineligibleTables and EligibleTables. @@ -270,7 +271,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { _ = c.Error(errors.Trace(err)) return } - c.JSON(http.StatusOK, toAPIModel(newCfInfo)) + c.JSON(http.StatusOK, toAPIModel(newCfInfo, true)) } // getChangeFeedMetaInfo returns the metaInfo of a changefeed @@ -288,7 +289,7 @@ func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) { _ = c.Error(err) return } - c.JSON(http.StatusOK, toAPIModel(info)) + c.JSON(http.StatusOK, toAPIModel(info, false)) } // resumeChangefeed handles update changefeed request. @@ -375,7 +376,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) { c.Status(http.StatusOK) } -func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo { +func toAPIModel(info *model.ChangeFeedInfo, maskSinkURI bool) *ChangeFeedInfo { var runningError *RunningError if info.Error != nil { runningError = &RunningError{ @@ -384,11 +385,21 @@ func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo { Message: info.Error.Message, } } + + sinkURI := info.SinkURI + var err error + if maskSinkURI { + sinkURI, err = util.MaskSinkURI(sinkURI) + if err != nil { + log.Error("failed to mask sink URI", zap.Error(err)) + } + } + apiInfoModel := &ChangeFeedInfo{ UpstreamID: info.UpstreamID, Namespace: info.Namespace, ID: info.ID, - SinkURI: info.SinkURI, + SinkURI: sinkURI, CreateTime: info.CreateTime, StartTs: info.StartTs, TargetTs: info.TargetTs, diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 55119b7d104..9217c91da98 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/etcd" mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -211,6 +212,8 @@ func TestCreateChangefeed(t *testing.T) { err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, cfConfig.ID, resp.ID) + mysqlSink, err = util.MaskSinkURI(mysqlSink) + require.Nil(t, err) require.Equal(t, mysqlSink, resp.SinkURI) require.Equal(t, http.StatusCreated, w.Code) } diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index f9656d7898e..6463b64f8de 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -189,18 +190,12 @@ func (info *ChangeFeedInfo) String() (str string) { log.Error("failed to unmarshal changefeed info", zap.Error(err)) return } - sinkURIParsed, err := url.Parse(clone.SinkURI) + + clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) - return - } - if sinkURIParsed.User != nil && sinkURIParsed.User.String() != "" { - sinkURIParsed.User = url.UserPassword("username", "password") - } - if sinkURIParsed.Host != "" { - sinkURIParsed.Host = "***" + log.Error("failed to marshal changefeed info", zap.Error(err)) } - clone.SinkURI = sinkURIParsed.String() + str, err = clone.Marshal() if err != nil { log.Error("failed to marshal changefeed info", zap.Error(err)) diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 421c1293ffd..8763fa507c1 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -634,28 +634,28 @@ func TestChangefeedInfoStringer(t *testing.T) { SinkURI: "kafka://127.0.0.1:9092/ticdc-test2", StartTs: 418881574869139457, }, - `.*kafka://\*\*\*/ticdc-test2.*`, + `.*kafka://.*ticdc-test2.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root:124567@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306.*`, }, { &ChangeFeedInfo{ SinkURI: "mysql://root:test%21%23%24%25%5E%26%2A@127.0.0.1:3306/", StartTs: 418881574869139457, }, - `.*mysql://username:password@\*\*\*/.*`, + `.*mysql://root:xxxx@127.0.0.1:3306/.*`, }, } diff --git a/pkg/cmd/cli/cli_changefeed_query.go b/pkg/cmd/cli/cli_changefeed_query.go index d10e8018062..d68bc8a1c33 100644 --- a/pkg/cmd/cli/cli_changefeed_query.go +++ b/pkg/cmd/cli/cli_changefeed_query.go @@ -17,17 +17,42 @@ import ( "context" "github.com/pingcap/errors" + v2 "github.com/pingcap/tiflow/cdc/api/v2" + "github.com/pingcap/tiflow/cdc/model" apiv1client "github.com/pingcap/tiflow/pkg/api/v1" + apiv2client "github.com/pingcap/tiflow/pkg/api/v2" + "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/spf13/cobra" ) +// cfMeta holds changefeed info and changefeed status. +type cfMeta struct { + UpstreamID uint64 `json:"upstream_id"` + Namespace string `json:"namespace"` + ID string `json:"id"` + SinkURI string `json:"sink_uri"` + Config *v2.ReplicaConfig `json:"config"` + CreateTime model.JSONTime `json:"create_time"` + StartTs uint64 `json:"start_ts"` + ResolvedTs uint64 `json:"resolved_ts"` + TargetTs uint64 `json:"target_ts"` + CheckpointTSO uint64 `json:"checkpoint_tso"` + CheckpointTime model.JSONTime `json:"checkpoint_time"` + Engine model.SortEngine `json:"sort_engine,omitempty"` + FeedState model.FeedState `json:"state"` + RunningError *model.RunningError `json:"error"` + ErrorHis []int64 `json:"error_history"` + CreatorVersion string `json:"creator_version"` + TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"` +} + // queryChangefeedOptions defines flags for the `cli changefeed query` command. type queryChangefeedOptions struct { - apiClient apiv1client.APIV1Interface - + apiClient apiv1client.APIV1Interface + apiClientV2 apiv2client.APIV2Interface changefeedID string simplified bool } @@ -47,11 +72,16 @@ func (o *queryChangefeedOptions) addFlags(cmd *cobra.Command) { // complete adapts from the command line args to the data and client required. func (o *queryChangefeedOptions) complete(f factory.Factory) error { - client, err := f.APIV1Client() + clientV1, err := f.APIV1Client() + if err != nil { + return err + } + o.apiClient = clientV1 + clientV2, err := f.APIV2Client() if err != nil { return err } - o.apiClient = client + o.apiClientV2 = clientV2 return nil } @@ -70,11 +100,36 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error { } return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(o.changefeedID) } + detail, err := o.apiClient.Changefeeds().Get(ctx, o.changefeedID) if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { return err } - return util.JSONPrint(cmd, detail) + + info, err := o.apiClientV2.Changefeeds().GetInfo(ctx, o.changefeedID) + if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) { + return err + } + meta := &cfMeta{ + UpstreamID: detail.UpstreamID, + Namespace: detail.Namespace, + ID: detail.ID, + SinkURI: detail.SinkURI, + Config: info.Config, + CreateTime: detail.CreateTime, + StartTs: detail.StartTs, + ResolvedTs: detail.ResolvedTs, + TargetTs: detail.TargetTs, + CheckpointTSO: detail.CheckpointTSO, + CheckpointTime: detail.CheckpointTime, + Engine: detail.Engine, + FeedState: detail.FeedState, + RunningError: detail.RunningError, + ErrorHis: detail.ErrorHis, + CreatorVersion: detail.CreatorVersion, + TaskStatus: detail.TaskStatus, + } + return util.JSONPrint(cmd, meta) } // newCmdQueryChangefeed creates the `cli changefeed query` command. diff --git a/pkg/cmd/cli/cli_changefeed_query_test.go b/pkg/cmd/cli/cli_changefeed_query_test.go index d8f000bbe0a..5a75c1e6e7d 100644 --- a/pkg/cmd/cli/cli_changefeed_query_test.go +++ b/pkg/cmd/cli/cli_changefeed_query_test.go @@ -14,23 +14,30 @@ package cli import ( + "bytes" + "io/ioutil" "os" "testing" "github.com/golang/mock/gomock" "github.com/pingcap/errors" + v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/api/v1/mock" + mock_v1 "github.com/pingcap/tiflow/pkg/api/v1/mock" + mock_v2 "github.com/pingcap/tiflow/pkg/api/v2/mock" "github.com/stretchr/testify/require" ) func TestChangefeedQueryCli(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - cf := mock.NewMockChangefeedInterface(ctrl) - f := &mockFactory{changefeeds: cf} + cfV1 := mock_v1.NewMockChangefeedInterface(ctrl) + cfV2 := mock_v2.NewMockChangefeedInterface(ctrl) + + f := &mockFactory{changefeeds: cfV1, changefeedsv2: cfV2} + cmd := newCmdQueryChangefeed(f) - cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ + cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ { UpstreamID: 1, Namespace: "default", @@ -41,7 +48,7 @@ func TestChangefeedQueryCli(t *testing.T) { }, nil) os.Args = []string{"query", "--simple=true", "--changefeed-id=abc"} require.Nil(t, cmd.Execute()) - cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ + cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{ { UpstreamID: 1, Namespace: "default", @@ -53,15 +60,26 @@ func TestChangefeedQueryCli(t *testing.T) { os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"} require.NotNil(t, cmd.Execute()) - cf.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test")) + cfV1.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test")) os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"} require.NotNil(t, cmd.Execute()) - cf.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil) + // query success + cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil) + cfV2.EXPECT().GetInfo(gomock.Any(), gomock.Any()).Return(&v2.ChangeFeedInfo{ + Config: v2.GetDefaultReplicaConfig(), + }, nil) os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"} + b := bytes.NewBufferString("") + cmd.SetOut(b) require.Nil(t, cmd.Execute()) + out, err := ioutil.ReadAll(b) + require.Nil(t, err) + // make sure config is printed + require.Contains(t, string(out), "config") - cf.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test")) + // query failed + cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test")) os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"} require.NotNil(t, cmd.Execute()) } diff --git a/pkg/util/uri.go b/pkg/util/uri.go index 3b0854a8ca0..94f94c56b40 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -15,7 +15,11 @@ package util import ( "net" + "net/url" "strings" + + "github.com/pingcap/log" + "go.uber.org/zap" ) // IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI. @@ -61,3 +65,17 @@ func validOptionalPort(port string) bool { } return true } + +// MaskSinkURI returns a sink uri that sensitive infos has been masked. +func MaskSinkURI(uri string) (string, error) { + uriParsed, err := url.Parse(uri) + if err != nil { + log.Error("failed to parse sink URI", zap.Error(err)) + return "", err + } + if uriParsed.User != nil && uriParsed.User.String() != "" { + uriParsed.User = url.UserPassword(uriParsed.User.Username(), "xxxx") + } + + return uriParsed.String(), nil +} diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index dc1c8bce983..ad45b685d08 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -66,3 +66,10 @@ func TestIsIPv6Address(t *testing.T) { }) } } + +func TestMaskSinkURI(t *testing.T) { + uri := "mysql://root:123456@127.0.0.1:3306/?time-zone=Asia/Shanghai" + maskedURI, err := MaskSinkURI(uri) + require.NoError(t, err) + require.Equal(t, "mysql://root:xxxx@127.0.0.1:3306/?time-zone=Asia/Shanghai", maskedURI) +}