Skip to content

Commit

Permalink
cli (ticdc): fix query changefeed does not contain config info and ma…
Browse files Browse the repository at this point in the history
…sk sink uri. (#6374)

close #6362
  • Loading branch information
asddongmen authored Jul 21, 2022
1 parent 666952a commit 934824b
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 33 deletions.
7 changes: 6 additions & 1 deletion cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -238,12 +239,16 @@ func (h *OpenAPI) GetChangefeed(c *gin.Context) {
})
}
}
sinkURI, err := util.MaskSinkURI(info.SinkURI)
if err != nil {
log.Error("failed to mask sink URI", zap.Error(err))
}

changefeedDetail := &model.ChangefeedDetail{
UpstreamID: info.UpstreamID,
Namespace: changefeedID.Namespace,
ID: changefeedID.ID,
SinkURI: info.SinkURI,
SinkURI: sinkURI,
CreateTime: model.JSONTime(info.CreateTime),
StartTs: info.StartTs,
TargetTs: info.TargetTs,
Expand Down
21 changes: 16 additions & 5 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -127,7 +128,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
c.JSON(http.StatusCreated, toAPIModel(info))
c.JSON(http.StatusCreated, toAPIModel(info, true))
}

// verifyTable verify table, return ineligibleTables and EligibleTables.
Expand Down Expand Up @@ -270,7 +271,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
_ = c.Error(errors.Trace(err))
return
}
c.JSON(http.StatusOK, toAPIModel(newCfInfo))
c.JSON(http.StatusOK, toAPIModel(newCfInfo, true))
}

// getChangeFeedMetaInfo returns the metaInfo of a changefeed
Expand All @@ -288,7 +289,7 @@ func (h *OpenAPIV2) getChangeFeedMetaInfo(c *gin.Context) {
_ = c.Error(err)
return
}
c.JSON(http.StatusOK, toAPIModel(info))
c.JSON(http.StatusOK, toAPIModel(info, false))
}

// resumeChangefeed handles update changefeed request.
Expand Down Expand Up @@ -375,7 +376,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
c.Status(http.StatusOK)
}

func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo {
func toAPIModel(info *model.ChangeFeedInfo, maskSinkURI bool) *ChangeFeedInfo {
var runningError *RunningError
if info.Error != nil {
runningError = &RunningError{
Expand All @@ -384,11 +385,21 @@ func toAPIModel(info *model.ChangeFeedInfo) *ChangeFeedInfo {
Message: info.Error.Message,
}
}

sinkURI := info.SinkURI
var err error
if maskSinkURI {
sinkURI, err = util.MaskSinkURI(sinkURI)
if err != nil {
log.Error("failed to mask sink URI", zap.Error(err))
}
}

apiInfoModel := &ChangeFeedInfo{
UpstreamID: info.UpstreamID,
Namespace: info.Namespace,
ID: info.ID,
SinkURI: info.SinkURI,
SinkURI: sinkURI,
CreateTime: info.CreateTime,
StartTs: info.StartTs,
TargetTs: info.TargetTs,
Expand Down
3 changes: 3 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -211,6 +212,8 @@ func TestCreateChangefeed(t *testing.T) {
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, cfConfig.ID, resp.ID)
mysqlSink, err = util.MaskSinkURI(mysqlSink)
require.Nil(t, err)
require.Equal(t, mysqlSink, resp.SinkURI)
require.Equal(t, http.StatusCreated, w.Code)
}
Expand Down
15 changes: 5 additions & 10 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -189,18 +190,12 @@ func (info *ChangeFeedInfo) String() (str string) {
log.Error("failed to unmarshal changefeed info", zap.Error(err))
return
}
sinkURIParsed, err := url.Parse(clone.SinkURI)

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return
}
if sinkURIParsed.User != nil && sinkURIParsed.User.String() != "" {
sinkURIParsed.User = url.UserPassword("username", "password")
}
if sinkURIParsed.Host != "" {
sinkURIParsed.Host = "***"
log.Error("failed to marshal changefeed info", zap.Error(err))
}
clone.SinkURI = sinkURIParsed.String()

str, err = clone.Marshal()
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
Expand Down
8 changes: 4 additions & 4 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,28 @@ func TestChangefeedInfoStringer(t *testing.T) {
SinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
StartTs: 418881574869139457,
},
`.*kafka://\*\*\*/ticdc-test2.*`,
`.*kafka://.*ticdc-test2.*`,
},
{
&ChangeFeedInfo{
SinkURI: "mysql://root:124567@127.0.0.1:3306/",
StartTs: 418881574869139457,
},
`.*mysql://username:password@\*\*\*/.*`,
`.*mysql://root:xxxx@127.0.0.1:3306.*`,
},
{
&ChangeFeedInfo{
SinkURI: "mysql://root@127.0.0.1:3306/",
StartTs: 418881574869139457,
},
`.*mysql://username:password@\*\*\*/.*`,
`.*mysql://root:xxxx@127.0.0.1:3306.*`,
},
{
&ChangeFeedInfo{
SinkURI: "mysql://root:test%21%23%24%25%5E%26%2A@127.0.0.1:3306/",
StartTs: 418881574869139457,
},
`.*mysql://username:password@\*\*\*/.*`,
`.*mysql://root:xxxx@127.0.0.1:3306/.*`,
},
}

Expand Down
65 changes: 60 additions & 5 deletions pkg/cmd/cli/cli_changefeed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,42 @@ import (
"context"

"github.com/pingcap/errors"
v2 "github.com/pingcap/tiflow/cdc/api/v2"
"github.com/pingcap/tiflow/cdc/model"
apiv1client "github.com/pingcap/tiflow/pkg/api/v1"
apiv2client "github.com/pingcap/tiflow/pkg/api/v2"

"github.com/pingcap/tiflow/pkg/cmd/factory"
"github.com/pingcap/tiflow/pkg/cmd/util"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/spf13/cobra"
)

// cfMeta holds changefeed info and changefeed status.
type cfMeta struct {
UpstreamID uint64 `json:"upstream_id"`
Namespace string `json:"namespace"`
ID string `json:"id"`
SinkURI string `json:"sink_uri"`
Config *v2.ReplicaConfig `json:"config"`
CreateTime model.JSONTime `json:"create_time"`
StartTs uint64 `json:"start_ts"`
ResolvedTs uint64 `json:"resolved_ts"`
TargetTs uint64 `json:"target_ts"`
CheckpointTSO uint64 `json:"checkpoint_tso"`
CheckpointTime model.JSONTime `json:"checkpoint_time"`
Engine model.SortEngine `json:"sort_engine,omitempty"`
FeedState model.FeedState `json:"state"`
RunningError *model.RunningError `json:"error"`
ErrorHis []int64 `json:"error_history"`
CreatorVersion string `json:"creator_version"`
TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"`
}

// queryChangefeedOptions defines flags for the `cli changefeed query` command.
type queryChangefeedOptions struct {
apiClient apiv1client.APIV1Interface

apiClient apiv1client.APIV1Interface
apiClientV2 apiv2client.APIV2Interface
changefeedID string
simplified bool
}
Expand All @@ -47,11 +72,16 @@ func (o *queryChangefeedOptions) addFlags(cmd *cobra.Command) {

// complete adapts from the command line args to the data and client required.
func (o *queryChangefeedOptions) complete(f factory.Factory) error {
client, err := f.APIV1Client()
clientV1, err := f.APIV1Client()
if err != nil {
return err
}
o.apiClient = clientV1
clientV2, err := f.APIV2Client()
if err != nil {
return err
}
o.apiClient = client
o.apiClientV2 = clientV2
return nil
}

Expand All @@ -70,11 +100,36 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error {
}
return cerror.ErrChangeFeedNotExists.GenWithStackByArgs(o.changefeedID)
}

detail, err := o.apiClient.Changefeeds().Get(ctx, o.changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return err
}
return util.JSONPrint(cmd, detail)

info, err := o.apiClientV2.Changefeeds().GetInfo(ctx, o.changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return err
}
meta := &cfMeta{
UpstreamID: detail.UpstreamID,
Namespace: detail.Namespace,
ID: detail.ID,
SinkURI: detail.SinkURI,
Config: info.Config,
CreateTime: detail.CreateTime,
StartTs: detail.StartTs,
ResolvedTs: detail.ResolvedTs,
TargetTs: detail.TargetTs,
CheckpointTSO: detail.CheckpointTSO,
CheckpointTime: detail.CheckpointTime,
Engine: detail.Engine,
FeedState: detail.FeedState,
RunningError: detail.RunningError,
ErrorHis: detail.ErrorHis,
CreatorVersion: detail.CreatorVersion,
TaskStatus: detail.TaskStatus,
}
return util.JSONPrint(cmd, meta)
}

// newCmdQueryChangefeed creates the `cli changefeed query` command.
Expand Down
34 changes: 26 additions & 8 deletions pkg/cmd/cli/cli_changefeed_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,30 @@
package cli

import (
"bytes"
"io/ioutil"
"os"
"testing"

"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
v2 "github.com/pingcap/tiflow/cdc/api/v2"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/api/v1/mock"
mock_v1 "github.com/pingcap/tiflow/pkg/api/v1/mock"
mock_v2 "github.com/pingcap/tiflow/pkg/api/v2/mock"
"github.com/stretchr/testify/require"
)

func TestChangefeedQueryCli(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cf := mock.NewMockChangefeedInterface(ctrl)
f := &mockFactory{changefeeds: cf}
cfV1 := mock_v1.NewMockChangefeedInterface(ctrl)
cfV2 := mock_v2.NewMockChangefeedInterface(ctrl)

f := &mockFactory{changefeeds: cfV1, changefeedsv2: cfV2}

cmd := newCmdQueryChangefeed(f)
cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{
cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{
{
UpstreamID: 1,
Namespace: "default",
Expand All @@ -41,7 +48,7 @@ func TestChangefeedQueryCli(t *testing.T) {
}, nil)
os.Args = []string{"query", "--simple=true", "--changefeed-id=abc"}
require.Nil(t, cmd.Execute())
cf.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{
cfV1.EXPECT().List(gomock.Any(), "all").Return(&[]model.ChangefeedCommonInfo{
{
UpstreamID: 1,
Namespace: "default",
Expand All @@ -53,15 +60,26 @@ func TestChangefeedQueryCli(t *testing.T) {
os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"}
require.NotNil(t, cmd.Execute())

cf.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test"))
cfV1.EXPECT().List(gomock.Any(), "all").Return(nil, errors.New("test"))
os.Args = []string{"query", "--simple=true", "--changefeed-id=abcd"}
require.NotNil(t, cmd.Execute())

cf.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil)
// query success
cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(&model.ChangefeedDetail{}, nil)
cfV2.EXPECT().GetInfo(gomock.Any(), gomock.Any()).Return(&v2.ChangeFeedInfo{
Config: v2.GetDefaultReplicaConfig(),
}, nil)
os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"}
b := bytes.NewBufferString("")
cmd.SetOut(b)
require.Nil(t, cmd.Execute())
out, err := ioutil.ReadAll(b)
require.Nil(t, err)
// make sure config is printed
require.Contains(t, string(out), "config")

cf.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test"))
// query failed
cfV1.EXPECT().Get(gomock.Any(), "bcd").Return(nil, errors.New("test"))
os.Args = []string{"query", "--simple=false", "--changefeed-id=bcd"}
require.NotNil(t, cmd.Execute())
}
18 changes: 18 additions & 0 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ package util

import (
"net"
"net/url"
"strings"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI.
Expand Down Expand Up @@ -61,3 +65,17 @@ func validOptionalPort(port string) bool {
}
return true
}

// MaskSinkURI returns a sink uri that sensitive infos has been masked.
func MaskSinkURI(uri string) (string, error) {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return "", err
}
if uriParsed.User != nil && uriParsed.User.String() != "" {
uriParsed.User = url.UserPassword(uriParsed.User.Username(), "xxxx")
}

return uriParsed.String(), nil
}
7 changes: 7 additions & 0 deletions pkg/util/uri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ func TestIsIPv6Address(t *testing.T) {
})
}
}

func TestMaskSinkURI(t *testing.T) {
uri := "mysql://root:123456@127.0.0.1:3306/?time-zone=Asia/Shanghai"
maskedURI, err := MaskSinkURI(uri)
require.NoError(t, err)
require.Equal(t, "mysql://root:xxxx@127.0.0.1:3306/?time-zone=Asia/Shanghai", maskedURI)
}

0 comments on commit 934824b

Please sign in to comment.