Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: send resovled ts in advance when region is locked (#2038) #2046

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,29 @@ func (s *eventFeedSession) dispatchRequest(

log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID()))

// Send a resolved ts to event channel first, for two reasons:
// 1. Since we have locked the region range, and have maintained correct
// checkpoint ts for the range, it is safe to report the resolved ts
// to puller at this moment.
// 2. Before the kv client gets region rpcCtx, sends request to TiKV and
// receives the first kv event from TiKV, the region could split or
// merge in advance, which should cause the change of resolved ts
// distribution in puller, so this resolved ts event is needed.
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := &model.RegionFeedEvent{
RegionID: sri.verID.GetID(),
Resolved: &model.ResolvedSpan{
Span: sri.span,
ResolvedTs: sri.ts,
},
}
select {
case s.eventCh <- resolvedEv:
case <-ctx.Done():
return errors.Trace(ctx.Err())
}

rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1464,18 +1487,6 @@ func (s *eventFeedSession) singleEventFeed(
return nil
}

select {
case s.eventCh <- &model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: span,
ResolvedTs: startTs,
},
}:
case <-ctx.Done():
err = errors.Trace(ctx.Err())
return
}
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
Expand Down
87 changes: 47 additions & 40 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
c.Assert(err, check.IsNil)
ch2 <- makeEvent(ts.Ver)
var event *model.RegionFeedEvent
// consume the first resolved ts event, which is sent before region starts
<-eventCh
select {
case event = <-eventCh:
case <-time.After(time.Second):
Expand Down Expand Up @@ -594,12 +596,6 @@ consumePreResolvedTs:
waitRequestID(c, baseAllocatedID+5)
initialized := mockInitializedEvent(3 /* regionID */, currentRequestID())
ch2 <- initialized
select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("recving message takes too long")
}
c.Assert(event, check.NotNil)

makeEvent := func(ts uint64) *cdcpb.ChangeDataEvent {
return &cdcpb.ChangeDataEvent{
Expand Down Expand Up @@ -1232,8 +1228,13 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
baseAllocatedID := currentRequestID()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
<<<<<<< HEAD
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
=======
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 40)
>>>>>>> 1c3653e2 (kv/client: send resovled ts in advance when region is locked (#2038))
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down Expand Up @@ -1279,13 +1280,6 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
ch1 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
},
RegionID: regionID,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
Expand All @@ -1302,15 +1296,25 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
},
}

for _, expectedEv := range expected {
events := make([]*model.RegionFeedEvent, 0, 2)
eventLoop:
for {
select {
<<<<<<< HEAD
case event := <-eventCh:
log.Info("receive event", zap.Reflect("event", event), zap.Reflect("expected", expectedEv))
c.Assert(event, check.DeepEquals, expectedEv)
=======
case ev := <-eventCh:
if ev.Resolved.ResolvedTs != uint64(100) {
events = append(events, ev)
}
>>>>>>> 1c3653e2 (kv/client: send resovled ts in advance when region is locked (#2038))
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
break eventLoop
}
}
c.Assert(events, check.DeepEquals, expected)
cancel()
}

Expand Down Expand Up @@ -2850,36 +2854,25 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
}}
ch2 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
expected := &model.RegionFeedEvent{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
}

for _, expectedEv := range expected {
eventLoop:
for {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case ev := <-eventCh:
if ev.Resolved != nil && ev.Resolved.ResolvedTs == uint64(100) {
continue
}
c.Assert(ev, check.DeepEquals, expected)
break eventLoop
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
c.Errorf("expected event %v not received", expected)
}
}

Expand Down Expand Up @@ -2957,8 +2950,13 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
<<<<<<< HEAD
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
=======
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 100)
>>>>>>> 1c3653e2 (kv/client: send resovled ts in advance when region is locked (#2038))
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down Expand Up @@ -3075,8 +3073,17 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {

for _, expectedEv := range expected {
select {
<<<<<<< HEAD
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
=======
case <-eventCh:
resolvedCount++
log.Info("receive resolved count", zap.Int("count", resolvedCount))
if resolvedCount == regionNum*2 {
break checkEvent
}
>>>>>>> 1c3653e2 (kv/client: send resovled ts in advance when region is locked (#2038))
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,6 +86,7 @@ func (s *eventFeedSession) sendRegionChangeEventV2(

state.start()
worker.setRegionState(event.RegionId, state)
<<<<<<< HEAD
// TODO: If a region doesn't receive any event from TiKV, this region
// can't be reconnected since the region state is not initialized.
worker.notifyEvTimeUpdate(event.RegionId, false /* isDelete */)
Expand All @@ -103,6 +103,8 @@ func (s *eventFeedSession) sendRegionChangeEventV2(
},
}:
}
=======
>>>>>>> 1c3653e2 (kv/client: send resovled ts in advance when region is locked (#2038))
} else if state.isStopped() {
log.Warn("drop event due to region feed stopped",
zap.Uint64("regionID", event.RegionId),
Expand Down
42 changes: 42 additions & 0 deletions cdc/puller/frontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,48 @@ func (s *spanFrontierSuite) TestSpanFrontier(c *check.C) {
checkFrontier(c, f)
}

func (s *spanFrontierSuite) TestSpanFrontierFallback(c *check.C) {
defer testleak.AfterTest(c)()
keyA := []byte("a")
keyB := []byte("b")
keyC := []byte("c")
keyD := []byte("d")
keyE := []byte("e")

spAB := regionspan.ComparableSpan{Start: keyA, End: keyB}
spBC := regionspan.ComparableSpan{Start: keyB, End: keyC}
spCD := regionspan.ComparableSpan{Start: keyC, End: keyD}
spDE := regionspan.ComparableSpan{Start: keyD, End: keyE}

f := NewFrontier(20, spAB).(*spanFrontier)
f.Forward(spBC, 20)
f.Forward(spCD, 10)
f.Forward(spDE, 20)

// [A, B) [B, C) [C, D) [D, E)
// 20 20 10 20
c.Assert(f.Frontier(), check.Equals, uint64(10))
c.Assert(f.String(), check.Equals, `[a @ 20] [b @ 20] [c @ 10] [d @ 20] [e @ Max] `)
checkFrontier(c, f)

// [A, B) [B, D) [D, E)
// 20 10 10
// [B, D) does not forward, because of split to [B, C) and [C, D) immediately

// [A, B) [B, C) [C, D) [D, E)
// 20 10 10 20
// [B, C) does not forward, because of merge into [A, C) immediately
f.Forward(spCD, 20)
c.Assert(f.Frontier(), check.Equals, uint64(20))
// the frontier stoes [A, B) and [B, C) but they are not correct exactly
c.Assert(f.String(), check.Equals, `[a @ 20] [b @ 20] [c @ 20] [d @ 20] [e @ Max] `)
checkFrontier(c, f)

// Bump, here we meet resolved ts fall back, where 10 is less than f.Frontier()
// But there is no data loss actually.
// f.Forward(spAC, 10)
}

func (s *spanFrontierSuite) TestMinMax(c *check.C) {
defer testleak.AfterTest(c)()
var keyMin []byte
Expand Down