Skip to content

Commit

Permalink
cherry pick pingcap#1185 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
amyangfei authored and ti-srebot committed Dec 11, 2020
1 parent 1cfde41 commit 57b9ae6
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 20 deletions.
24 changes: 20 additions & 4 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ func newProcessor(
sinkEmittedResolvedNotifier := new(notify.Notifier)
localResolvedNotifier := new(notify.Notifier)
localCheckpointTsNotifier := new(notify.Notifier)
sinkEmittedResolvedReceiver, err := sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localResolvedReceiver, err := localResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localCheckpointTsReceiver, err := localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}

p := &processor{
id: uuid.New().String(),
limitter: limitter,
Expand All @@ -216,14 +229,14 @@ func newProcessor(
output: make(chan *model.PolymorphicEvent, defaultOutputChanSize),

sinkEmittedResolvedNotifier: sinkEmittedResolvedNotifier,
sinkEmittedResolvedReceiver: sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond),
sinkEmittedResolvedReceiver: sinkEmittedResolvedReceiver,

localResolvedNotifier: localResolvedNotifier,
localResolvedReceiver: localResolvedNotifier.NewReceiver(50 * time.Millisecond),
localResolvedReceiver: localResolvedReceiver,

checkpointTs: checkpointTs,
localCheckpointTsNotifier: localCheckpointTsNotifier,
localCheckpointTsReceiver: localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond),
localCheckpointTsReceiver: localCheckpointTsReceiver,

tables: make(map[int64]*tableInfo),
markTableIDs: make(map[int64]struct{}),
Expand Down Expand Up @@ -674,9 +687,12 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
lastResolvedTs uint64
watchKey = kv.GetEtcdKeyJob(p.changefeedID)
globalResolvedTsNotifier = new(notify.Notifier)
globalResolvedTsReceiver = globalResolvedTsNotifier.NewReceiver(1 * time.Second)
)
defer globalResolvedTsNotifier.Close()
globalResolvedTsReceiver, err := globalResolvedTsNotifier.NewReceiver(1 * time.Second)
if err != nil {
return err
}

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
Expand Down
5 changes: 4 additions & 1 deletion cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ func (es *EntrySorter) Run(ctx context.Context) error {
}
}
})
receiver := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
receiver, err := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
if err != nil {
return err
}
defer es.resolvedNotifier.Close()
errg.Go(func() error {
var sorted []*model.PolymorphicEvent
Expand Down
219 changes: 219 additions & 0 deletions cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package puller

import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/txnutil"
"github.com/pingcap/ticdc/pkg/util/testleak"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
pd "github.com/tikv/pd/client"
)

type pullerSuite struct {
}

var _ = check.Suite(&pullerSuite{})

type mockPdClientForPullerTest struct {
pd.Client
clusterID uint64
}

func (mc *mockPdClientForPullerTest) GetClusterID(ctx context.Context) uint64 {
return mc.clusterID
}

type mockCDCKVClient struct {
expectations chan *model.RegionFeedEvent
}

type mockInjectedPuller struct {
Puller
cli *mockCDCKVClient
}

func newMockCDCKVClient(
ctx context.Context,
pd pd.Client,
kvStorage tikv.Storage,
credential *security.Credential,
) kv.CDCKVClient {
return &mockCDCKVClient{
expectations: make(chan *model.RegionFeedEvent, 1024),
}
}

func (mc *mockCDCKVClient) EventFeed(
ctx context.Context,
span regionspan.ComparableSpan,
ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit kv.PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-mc.expectations:
if ev == nil {
return nil
}
eventCh <- ev
}
}
}

func (mc *mockCDCKVClient) Close() error {
close(mc.expectations)
if len(mc.expectations) > 0 {
buf := bytes.NewBufferString("mockCDCKVClient: not all expectations were satisfied! Still waiting\n")
for e := range mc.expectations {
_, _ = buf.WriteString(fmt.Sprintf("%s", e.GetValue()))
}
return errors.New(buf.String())
}
return nil
}

func (mc *mockCDCKVClient) Returns(ev *model.RegionFeedEvent) {
mc.expectations <- ev
}

func (s *pullerSuite) newPullerForTest(
c *check.C,
spans []regionspan.Span,
checkpointTs uint64,
) (*mockInjectedPuller, context.CancelFunc, *sync.WaitGroup, tidbkv.Storage) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)
enableOldValue := true
backupNewCDCKVClient := kv.NewCDCKVClient
kv.NewCDCKVClient = newMockCDCKVClient
defer func() {
kv.NewCDCKVClient = backupNewCDCKVClient
}()
pdCli := &mockPdClientForPullerTest{clusterID: uint64(1)}
plr := NewPuller(ctx, pdCli, nil /* credential */, store, checkpointTs, spans, nil /* limitter */, enableOldValue)
wg.Add(1)
go func() {
defer wg.Done()
err := plr.Run(ctx)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
}()
c.Assert(err, check.IsNil)
mockPlr := &mockInjectedPuller{
Puller: plr,
cli: plr.(*pullerImpl).kvCli.(*mockCDCKVClient),
}
return mockPlr, cancel, &wg, store
}

func (s *pullerSuite) TestPullerResolvedForward(c *check.C) {
defer testleak.AfterTest(c)()
spans := []regionspan.Span{
{Start: []byte("t_a"), End: []byte("t_e")},
}
checkpointTs := uint64(996)
plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs)

plr.cli.Returns(&model.RegionFeedEvent{
Resolved: &model.ResolvedSpan{
Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}),
ResolvedTs: uint64(1001),
},
})
plr.cli.Returns(&model.RegionFeedEvent{
Resolved: &model.ResolvedSpan{
Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}),
ResolvedTs: uint64(1002),
},
})
plr.cli.Returns(&model.RegionFeedEvent{
Resolved: &model.ResolvedSpan{
Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}),
ResolvedTs: uint64(1000),
},
})
ev := <-plr.Output()
c.Assert(ev.OpType, check.Equals, model.OpTypeResolved)
c.Assert(ev.CRTs, check.Equals, uint64(1000))
c.Assert(plr.IsInitialized(), check.IsTrue)
err := retry.Run(time.Millisecond*10, 10, func() error {
ts := plr.GetResolvedTs()
if ts != uint64(1000) {
return errors.Errorf("resolved ts %d of puller does not forward to 1000", ts)
}
return nil
})
c.Assert(err, check.IsNil)

store.Close()
cancel()
wg.Wait()
}

func (s *pullerSuite) TestPullerRawKV(c *check.C) {
defer testleak.AfterTest(c)()
spans := []regionspan.Span{
{Start: []byte("c"), End: []byte("e")},
}
checkpointTs := uint64(996)
plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs)

// key not in expected region spans, will be ignored
plr.cli.Returns(&model.RegionFeedEvent{
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte("a"),
Value: []byte("test-value"),
CRTs: uint64(1002),
},
})
plr.cli.Returns(&model.RegionFeedEvent{
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte("d"),
Value: []byte("test-value"),
CRTs: uint64(1003),
},
})
ev := <-plr.Output()
c.Assert(ev.OpType, check.Equals, model.OpTypePut)
c.Assert(ev.Key, check.DeepEquals, []byte("d"))

store.Close()
cancel()
wg.Wait()
}
6 changes: 5 additions & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func newMqSink(
return ret
}

resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand All @@ -137,7 +141,7 @@ func newMqSink(
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),
resolvedNotifier: notifier,
resolvedReceiver: notifier.NewReceiver(50 * time.Millisecond),
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
}
Expand Down
19 changes: 15 additions & 4 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
}

func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) {
receiver := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
receiver, err := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
log.Error("flush row changed events routine starts failed", zap.Error(err))
return
}
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -574,17 +578,23 @@ func newMySQLSink(

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)
sink.createSinkWorkers(ctx)
err = sink.createSinkWorkers(ctx)
if err != nil {
return nil, err
}

go sink.flushRowChangedEvents(ctx)

return sink, nil
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
s.workers = make([]*mysqlSinkWorker, s.params.workerCount)
for i := range s.workers {
receiver := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
receiver, err := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
if err != nil {
return err
}
worker := newMySQLSinkWorker(
s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], receiver, s.execDMLs)
s.workers[i] = worker
Expand All @@ -598,6 +608,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}()
}
return nil
}

func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
Expand Down
Loading

0 comments on commit 57b9ae6

Please sign in to comment.