Skip to content

Commit

Permalink
Merge pull request #4533 from ti-chi-bot/cherry-pick-4474-to-release-5.4
Browse files Browse the repository at this point in the history
etcd/client(ticdc): add retry operation for etcd transaction api (#4248) (#4474)
  • Loading branch information
CharlesCheung96 authored Apr 28, 2022
2 parents 650557d + 6a620c6 commit 8e5b95e
Show file tree
Hide file tree
Showing 32 changed files with 1,442 additions and 1,208 deletions.
18 changes: 10 additions & 8 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,17 @@ func NewCapture4Test() *Capture {
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -128,13 +136,7 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

Expand Down
62 changes: 62 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/pkg/etcd"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
)

func TestReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

// init etcd mocker
clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientURL.String()},
Context: ctx,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
defer client.Close()

cp := NewCapture4Test()
cp.etcdClient = &client

// simulate network isolation scenarios
etcdServer.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = cp.reset(ctx)
require.Regexp(t, ".*context canceled.*", err)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
info := cp.Info()
require.NotNil(t, info)
cancel()
wg.Wait()
}
14 changes: 2 additions & 12 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -41,8 +40,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -751,16 +748,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
}
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
owner, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
24 changes: 13 additions & 11 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,43 @@
package entry

import (
"github.com/pingcap/check"
"testing"

ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
timeta "github.com/pingcap/tidb/meta"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

// SchemaTestHelper is a test helper for schema which creates an internal tidb instance to generate DDL jobs with meta information
type SchemaTestHelper struct {
c *check.C
t *testing.T
tk *testkit.TestKit
storage kv.Storage
domain *domain.Domain
}

// NewSchemaTestHelper creates a SchemaTestHelper
func NewSchemaTestHelper(c *check.C) *SchemaTestHelper {
func NewSchemaTestHelper(t *testing.T) *SchemaTestHelper {
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)
require.Nil(t, err)
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
conf.AlterPrimaryKey = true
})
session.SetSchemaLease(0)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
c.Assert(err, check.IsNil)
require.Nil(t, err)
domain.SetStatsUpdating(true)
tk := testkit.NewTestKit(c, store)
tk := testkit.NewTestKit(t, store)
return &SchemaTestHelper{
c: c,
t: t,
tk: tk,
storage: store,
domain: domain,
Expand All @@ -59,8 +61,8 @@ func NewSchemaTestHelper(c *check.C) *SchemaTestHelper {
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(1)
s.c.Assert(err, check.IsNil)
s.c.Assert(jobs, check.HasLen, 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
return jobs[0]
}

Expand All @@ -72,7 +74,7 @@ func (s *SchemaTestHelper) Storage() kv.Storage {
// GetCurrentMeta return the current meta snapshot
func (s *SchemaTestHelper) GetCurrentMeta() *timeta.Meta {
ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
s.c.Assert(err, check.IsNil)
require.Nil(s.t, err)
return timeta.NewSnapshotMeta(s.storage.GetSnapshot(ver))
}

Expand Down
39 changes: 15 additions & 24 deletions cdc/owner/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,41 @@ import (
"math/rand"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&barrierSuite{})

type barrierSuite struct{}

func (s *barrierSuite) TestBarrier(c *check.C) {
defer testleak.AfterTest(c)()
func TestBarrier(t *testing.T) {
b := newBarriers()
b.Update(ddlJobBarrier, 2)
b.Update(syncPointBarrier, 3)
b.Update(finishBarrier, 1)
tp, ts := b.Min()
c.Assert(tp, check.Equals, finishBarrier)
c.Assert(ts, check.Equals, uint64(1))
require.Equal(t, tp, finishBarrier)
require.Equal(t, ts, uint64(1))

b.Update(finishBarrier, 4)
tp, ts = b.Min()
c.Assert(tp, check.Equals, ddlJobBarrier)
c.Assert(ts, check.Equals, uint64(2))
require.Equal(t, tp, ddlJobBarrier)
require.Equal(t, ts, uint64(2))

b.Remove(ddlJobBarrier)
tp, ts = b.Min()
c.Assert(tp, check.Equals, syncPointBarrier)
c.Assert(ts, check.Equals, uint64(3))
require.Equal(t, tp, syncPointBarrier)
require.Equal(t, ts, uint64(3))

b.Update(finishBarrier, 1)
tp, ts = b.Min()
c.Assert(tp, check.Equals, finishBarrier)
c.Assert(ts, check.Equals, uint64(1))
require.Equal(t, tp, finishBarrier)
require.Equal(t, ts, uint64(1))

b.Update(ddlJobBarrier, 5)
tp, ts = b.Min()
c.Assert(tp, check.Equals, finishBarrier)
c.Assert(ts, check.Equals, uint64(1))
require.Equal(t, tp, finishBarrier)
require.Equal(t, ts, uint64(1))
}

func (s *barrierSuite) TestBarrierRandom(c *check.C) {
defer testleak.AfterTest(c)()
func TestBarrierRandom(t *testing.T) {
maxBarrierType := 50
maxBarrierTs := 1000000
b := newBarriers()
Expand Down Expand Up @@ -90,7 +81,7 @@ func (s *barrierSuite) TestBarrierRandom(c *check.C) {
}
}
tp, ts := b.Min()
c.Assert(ts, check.Equals, expectedMinTs)
c.Assert(expectedBarriers[tp], check.Equals, expectedMinTs)
require.Equal(t, ts, expectedMinTs)
require.Equal(t, expectedBarriers[tp], expectedMinTs)
}
}
Loading

0 comments on commit 8e5b95e

Please sign in to comment.