Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into diaodu
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed Jun 19, 2020
2 parents 47d908c + 173f966 commit f61ba3a
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 34 deletions.
20 changes: 17 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,18 @@ func (c *CDCClient) getConn(ctx context.Context, addr string) (*grpc.ClientConn,
return ca.Get(), nil
}

func (c *CDCClient) newStream(ctx context.Context, addr string) (stream cdcpb.ChangeData_EventFeedClient, err error) {
func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream cdcpb.ChangeData_EventFeedClient, err error) {
err = retry.Run(50*time.Millisecond, 3, func() error {
conn, err := c.getConn(ctx, addr)
if err != nil {
log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return errors.Trace(err)
}
err = util.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
return errors.Trace(err)
}
client := cdcpb.NewChangeDataClient(conn)
stream, err = client.EventFeed(ctx)
if err != nil {
Expand Down Expand Up @@ -621,15 +626,24 @@ MainLoop:
stream, ok := streams[rpcCtx.Addr]
// Establish the stream if it has not been connected yet.
if !ok {
storeID := rpcCtx.Peer.GetStoreId()
log.Info("creating new stream to store to send request",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.String("addr", rpcCtx.Addr))
stream, err = s.client.newStream(ctx, rpcCtx.Addr)
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
zap.String("addr", rpcCtx.Addr))
stream, err = s.client.newStream(ctx, rpcCtx.Addr, storeID)
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
zap.String("error", err.Error()))
if errors.Cause(err) == util.ErrVersionIncompatible {
// It often occurs on rolling update. Sleep 20s to reduce logs.
time.Sleep(20 * time.Second)
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, needReloadRegion(sri.failStoreIDs, rpcCtx), err)
// Delete the pendingRegion info from `pendingRegions` and retry connecting and sending the request.
Expand Down
41 changes: 41 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"google.golang.org/grpc"
Expand Down Expand Up @@ -137,6 +140,22 @@ func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sy
return grpcServer
}

type mockPDClient struct {
pd.Client
version string
}

var _ pd.Client = &mockPDClient{}

func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
s, err := m.Client.GetStore(ctx, storeID)
if err != nil {
return nil, err
}
s.Version = m.version
return s, nil
}

// Use etcdSuite to workaround the race. See comments of `TestConnArray`.
func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
wg := &sync.WaitGroup{}
Expand All @@ -150,6 +169,7 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: util.MinTiKVVersion.String()}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -203,6 +223,27 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
cancel()
}

// TODO enable the test
func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) {
rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: "v2.1.0" /* CDC is not compatible with 2.1.0 */}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(1, "localhost:23375")
cluster.Bootstrap(2, []uint64{1}, []uint64{3}, 3)

cdcClient, err := NewCDCClient(pdClient, kvStorage.(tikv.Storage))
c.Assert(err, check.IsNil)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
eventCh := make(chan *model.RegionFeedEvent, 10)
err = cdcClient.EventFeed(ctx, regionspan.Span{Start: []byte("a"), End: []byte("b")}, 1, eventCh)
_ = err
// TODO find a way to verify the error
}

// Use etcdSuite for some special reasons, the embed etcd uses zap as the only candidate
// logger and in the logger initializtion it also initializes the grpclog/loggerv2, which
// is not a thread-safe operation and it must be called before any gRPC functions
Expand Down
5 changes: 4 additions & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (s *Server) Run(ctx context.Context) error {
}
s.pdClient = pdClient

err = util.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints[0])
// To not block CDC server startup, we need to warn instead of error
// when TiKV is incompatible.
errorTiKVIncompatible := false
err = util.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints[0], errorTiKVIncompatible)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func newCliCommand() *cobra.Command {
return errors.Annotate(err, "fail to open PD client")
}
ctx := defaultContext
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr)
errorTiKVIncompatible := true // Error if TiKV is incompatible.
err = util.CheckClusterVersion(ctx, pdCli, cliPdAddr, errorTiKVIncompatible)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/cyclic"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/cyclic"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -212,6 +212,12 @@ func newCreateChangefeedCommand() *cobra.Command {
State: model.StateNormal,
}

tz, err := util.GetTimezone(timezone)
if err != nil {
return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`")
}

ctx = util.PutTimezoneInCtx(ctx, tz)
ineligibleTables, allTables, err := verifyTables(ctx, cfg, startTs)
if err != nil {
return err
Expand Down Expand Up @@ -277,6 +283,7 @@ func newCreateChangefeedCommand() *cobra.Command {
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "memory", "sort engine used for data sort")
command.PersistentFlags().StringVar(&sortDir, "sort-dir", ".", "directory used for file sort")
command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed")
command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed")
command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed")
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
func init() {
rootCmd.AddCommand(serverCmd)

serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use `,` to separate multiple PDs")
serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use ',' to separate multiple PDs")
serverCmd.Flags().StringVar(&address, "addr", "127.0.0.1:8300", "Set the listening address")
serverCmd.Flags().StringVar(&advertiseAddr, "advertise-addr", "", "Set the advertise listening address for client communication")
serverCmd.Flags().StringVar(&timezone, "tz", "System", "Specify time zone of TiCDC cluster")
Expand Down
68 changes: 50 additions & 18 deletions pkg/util/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,42 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"go.uber.org/zap"
)

// minPDVersion is the version of the minimal compatible PD.
var minPDVersion *semver.Version = semver.New("4.0.0-rc.1")
var minTiKVVersion *semver.Version = semver.New("4.0.0-rc.1")

// MinTiKVVersion is the version of the minimal compatible TiKV.
var MinTiKVVersion *semver.Version = semver.New("4.0.0-rc.1")

var versionHash = regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}")

// ErrVersionIncompatible is an error for running CDC on an incompatible Cluster.
var ErrVersionIncompatible = errors.NewNoStackError("version is incompatible")

func removeVAndHash(v string) string {
if v == "" {
return v
}
hash := regexp.MustCompile("-[0-9]+-g[0-9a-f]{8,}")
if hash.Match([]byte(v)) {
v = hash.ReplaceAllLiteralString(v, "")
}
v = versionHash.ReplaceAllLiteralString(v, "")
v = strings.TrimSuffix(v, "-dirty")
return strings.TrimPrefix(v, "v")
}

// CheckClusterVersion check TiKV and PD version.
func CheckClusterVersion(ctx context.Context, client pd.Client, pdHTTP string) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
func CheckClusterVersion(
ctx context.Context, client pd.Client, pdHTTP string, errorTiKVIncompat bool,
) error {
err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */)
if err != nil {
return err
}
for _, s := range stores {
ver, err := semver.NewVersion(removeVAndHash(s.Version))
if err != nil {
if errorTiKVIncompat {
return err
}
ord := ver.Compare(*minTiKVVersion)
if ord < 0 {
return errors.NotSupportedf("TiKV %s is not supported, require minimal version %s",
removeVAndHash(s.Version), minTiKVVersion)
}
log.Warn("check TiKV version failed", zap.Error(err))
}
// See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go
pdVer := struct {
Expand Down Expand Up @@ -92,12 +95,41 @@ func CheckClusterVersion(ctx context.Context, client pd.Client, pdHTTP string) e
}
ord := ver.Compare(*minPDVersion)
if ord < 0 {
return errors.NotSupportedf("PD %s is not supported, require minimal version %s",
return errors.Annotatef(ErrVersionIncompatible, "PD %s is not supported, require minimal version %s",
removeVAndHash(pdVer.Version), minPDVersion)
}
return nil
}

// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
// If storeID is 0, it checks all TiKV.
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
var stores []*metapb.Store
var err error
if storeID == 0 {
stores, err = client.GetAllStores(ctx, pd.WithExcludeTombstone())
} else {
stores = make([]*metapb.Store, 1)
stores[0], err = client.GetStore(ctx, storeID)
}
if err != nil {
return err
}

for _, s := range stores {
ver, err := semver.NewVersion(removeVAndHash(s.Version))
if err != nil {
return err
}
ord := ver.Compare(*MinTiKVVersion)
if ord < 0 {
return errors.Annotatef(ErrVersionIncompatible, "TiKV %s is not supported, require minimal version %s",
removeVAndHash(s.Version), MinTiKVVersion)
}
}
return nil
}

// IsValidUUIDv4 returns true if the uuid is a valid uuid
func IsValidUUIDv4(uuid string) bool {
if len(uuid) != 36 {
Expand Down
23 changes: 18 additions & 5 deletions pkg/util/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, true)
c.Assert(err, check.IsNil)
}

Expand All @@ -89,9 +89,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
return `v1.0.0-alpha-271-g824ae7fd`
}
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: minTiKVVersion.String()}}
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, true)
c.Assert(err, check.ErrorMatches, "PD .* is not supported.*")
}

Expand All @@ -103,8 +103,10 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, true)
c.Assert(err, check.ErrorMatches, "TiKV .* is not supported.*")
err = CheckClusterVersion(context.Background(), &mock, pdHTTP, false)
c.Assert(err, check.IsNil)
}
}

Expand All @@ -113,6 +115,17 @@ func (s *checkSuite) TestCompareVersion(c *check.C) {
c.Assert(semver.New("4.0.0-rc.1").Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
c.Assert(semver.New(removeVAndHash("4.0.0-rc-35-g31dae220")).Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
c.Assert(semver.New(removeVAndHash("4.0.0-9-g30f0b014")).Compare(*semver.New("4.0.0-rc.1")), check.Equals, 1)

c.Assert(semver.New(removeVAndHash("4.0.0-rc-35-g31dae220")).Compare(*semver.New("4.0.0-rc.2")), check.Equals, -1)
c.Assert(semver.New(removeVAndHash("4.0.0-9-g30f0b014")).Compare(*semver.New("4.0.0-rc.1")), check.Equals, 1)
c.Assert(semver.New(removeVAndHash("v3.0.0-beta-211-g09beefbe0-dirty")).
Compare(*semver.New("3.0.0-beta")), check.Equals, 0)
c.Assert(semver.New(removeVAndHash("v3.0.5-dirty")).
Compare(*semver.New("3.0.5")), check.Equals, 0)
c.Assert(semver.New(removeVAndHash("v3.0.5-beta.12-dirty")).
Compare(*semver.New("3.0.5-beta.12")), check.Equals, 0)
c.Assert(semver.New(removeVAndHash("v2.1.0-rc.1-7-g38c939f-dirty")).
Compare(*semver.New("2.1.0-rc.1")), check.Equals, 0)
}

func (s *checkSuite) TestIsValidUUIDv4(c *check.C) {
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function run() {
mysql) ;&
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
Expand Down

0 comments on commit f61ba3a

Please sign in to comment.