Skip to content

Commit

Permalink
sink(cdc): avoid sinking redundant events in some rare cases with red…
Browse files Browse the repository at this point in the history
…o enabled (pingcap#10096)

close pingcap#10065
  • Loading branch information
hicqu authored Nov 14, 2023
1 parent c8eed98 commit 8e71949
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 46 deletions.
62 changes: 34 additions & 28 deletions cdc/processor/sinkmanager/redo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,20 @@ type popResult struct {
events []*model.RowChangedEvent
size uint64 // size of events.
releaseSize uint64 // size of all released events.
pushCount int
success bool
// If success, boundary is the upperBound of poped events.
// Otherwise, boundary is the lowerBound of cached events.
boundary engine.Position

// many RowChangedEvent can come from one same PolymorphicEvent.
// pushCount indicates the count of raw PolymorphicEvents.
pushCount int

// success indicates whether there is a gap between cached events and required events.
success bool

// If success, upperBoundIfSuccess is the upperBound of poped events.
// The caller should fetch events (upperBoundIfSuccess, upperBound] from engine.
upperBoundIfSuccess engine.Position
// If fail, lowerBoundIfFail is the lowerBound of cached events.
// The caller should fetch events [lowerBound, lowerBoundIfFail) from engine.
lowerBoundIfFail engine.Position
}

// newRedoEventCache creates a redoEventCache instance.
Expand Down Expand Up @@ -142,26 +151,28 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
if e.lowerBound.Compare(upperBound.Next()) <= 0 {
res.boundary = e.lowerBound
res.lowerBoundIfFail = e.lowerBound
} else {
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
}
return
}

if !e.upperBound.Valid() {
// if e.upperBound is invalid, it means there are no resolved transactions
// in the cache.
// It means there are no resolved cached transactions in the required range.
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
return
}

res.success = true
if upperBound.Compare(e.upperBound) > 0 {
res.boundary = e.upperBound
if lowerBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = lowerBound.Prev()
} else if upperBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = e.upperBound
} else {
res.boundary = upperBound
res.upperBoundIfSuccess = upperBound
}

startIdx := sort.Search(e.readyCount, func(i int) bool {
Expand All @@ -173,28 +184,23 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
res.releaseSize += e.sizes[i]
}

var endIdx int
if startIdx == e.readyCount {
endIdx = startIdx
} else {
endIdx = sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.boundary) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size
endIdx := sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.upperBoundIfSuccess) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size

e.events = e.events[endIdx:]
e.sizes = e.sizes[endIdx:]
e.pushCounts = e.pushCounts[endIdx:]
e.readyCount -= endIdx
// Update boundaries. Set upperBound to invalid if the range has been drained.
e.lowerBound = res.boundary.Next()
e.lowerBound = res.upperBoundIfSuccess.Next()
if e.lowerBound.Compare(e.upperBound) > 0 {
e.upperBound = engine.Position{}
}
Expand Down
81 changes: 71 additions & 10 deletions cdc/processor/sinkmanager/redo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,37 @@ func TestRedoEventCache(t *testing.T) {
// Try to pop [{0,1}, {0,4}], shoud fail. And the returned boundary should be {1,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 0, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(1), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(1), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{0,2}, {0,4}], shoud fail. And the returned boundary should be {3,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 5, CommitTs: 6})
require.False(t, popRes.success)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{3,4}, {3,4}], should success.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.True(t, popRes.success)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 2, popRes.pushCount)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(4), popRes.upperBoundIfSuccess.CommitTs)

// Try to pop [{3,4}, {3,4}] again, shoud fail. And the returned boundary should be {4,4}.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(4), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

popRes = appender.pop(engine.Position{StartTs: 4, CommitTs: 4}, engine.Position{StartTs: 9, CommitTs: 10})
require.True(t, popRes.success)
require.Equal(t, 1, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(5), popRes.boundary.StartTs)
require.Equal(t, uint64(6), popRes.boundary.CommitTs)
require.Equal(t, uint64(5), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(6), popRes.upperBoundIfSuccess.CommitTs)
require.Equal(t, 0, len(appender.events))
require.True(t, appender.broken)

Expand All @@ -85,3 +85,64 @@ func TestRedoEventCache(t *testing.T) {
require.Equal(t, uint64(0), appender.upperBound.StartTs)
require.Equal(t, uint64(0), appender.upperBound.CommitTs)
}

func TestRedoEventCacheAllPopBranches(t *testing.T) {
cache := newRedoEventCache(model.ChangeFeedID{}, 1000)
appender := cache.maybeCreateAppender(1, engine.Position{StartTs: 101, CommitTs: 111})
var batch []*model.RowChangedEvent
var ok bool
var popRes popResult

batch = []*model.RowChangedEvent{{StartTs: 1, CommitTs: 11}, {StartTs: 1, CommitTs: 11}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 2, CommitTs: 12}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)
require.Equal(t, 5, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.True(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.upperBoundIfSuccess)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(101), popRes.events[1].StartTs)
require.Equal(t, 0, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 102, CommitTs: 102})
require.True(t, ok)
require.Equal(t, 2, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 501, CommitTs: 502}, engine.Position{StartTs: 701, CommitTs: 702})
require.True(t, popRes.success)
require.Equal(t, 0, len(popRes.events))
require.Equal(t, engine.Position{StartTs: 500, CommitTs: 502}, popRes.upperBoundIfSuccess)
require.Equal(t, 0, appender.readyCount)
require.Equal(t, 0, len(appender.events))
}
15 changes: 8 additions & 7 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if err != nil {
return errors.Trace(err)
}
lastPos = lowerBound.Prev()
if drained {
performCallback(lowerBound.Prev())
performCallback(lastPos)
return nil
}
}
Expand Down Expand Up @@ -405,7 +406,7 @@ func (w *sinkWorker) fetchFromCache(
}
popRes := cache.pop(*lowerBound, *upperBound)
if popRes.success {
newLowerBound = popRes.boundary.Next()
newLowerBound = popRes.upperBoundIfSuccess.Next()
if len(popRes.events) > 0 {
metrics.OutputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
Expand All @@ -420,19 +421,19 @@ func (w *sinkWorker) fetchFromCache(

// Get a resolvedTs so that we can record it into sink memory quota.
var resolvedTs model.ResolvedTs
isCommitFence := popRes.boundary.IsCommitFence()
isCommitFence := popRes.upperBoundIfSuccess.IsCommitFence()
if w.splitTxn {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
if !isCommitFence {
resolvedTs.Mode = model.BatchResolvedMode
resolvedTs.BatchID = *batchID
*batchID += 1
}
} else {
if isCommitFence {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
} else {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs - 1)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs - 1)
}
}
// Transfer the memory usage from redoMemQuota to sinkMemQuota.
Expand All @@ -448,7 +449,7 @@ func (w *sinkWorker) fetchFromCache(
zap.Any("resolvedTs", resolvedTs),
zap.Error(err))
} else {
newUpperBound = popRes.boundary.Prev()
newUpperBound = popRes.lowerBoundIfFail.Prev()
}
cacheDrained = newLowerBound.Compare(newUpperBound) > 0
log.Debug("fetchFromCache is performed",
Expand Down
48 changes: 48 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,3 +1231,51 @@ func genUpperBoundGetter(commitTs model.Ts) func(_ model.Ts) engine.Position {
}
}
}

func (suite *workerSuite) TestHandleTaskWithCache() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(2, 4, 1),
genPolymorphicEvent(2, 4, 1),
genPolymorphicResolvedEvent(4),
}
w, e := createWorker(model.ChangeFeedID{}, 0, true, 1)
w.eventCache = newRedoEventCache(model.ChangeFeedID{}, 1024*1024)
appender := w.eventCache.maybeCreateAppender(1, engine.Position{StartTs: 1, CommitTs: 3})
appender.pushBatch(
[]*model.RowChangedEvent{events[0].Row, events[1].Row},
uint64(0), engine.Position{StartTs: 2, CommitTs: 4},
)
defer w.sinkMemQuota.Close()
addEventsToSortEngine(suite.T(), events, e, 1)

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWrittenPos engine.Position) {
require.Equal(suite.T(), engine.Position{StartTs: 2, CommitTs: 4}, lastWrittenPos)
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
tableID: 1,
lowerBound: engine.Position{StartTs: 1, CommitTs: 3},
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type agent struct {
// 1. The capture receives a SIGTERM signal.
// 2. The agent receives a stopping heartbeat.
liveness *model.Liveness

lastCheckpointWarn time.Time
}

type agentInfo struct {
Expand Down Expand Up @@ -98,6 +100,8 @@ func newAgent(
tableM: newTableManager(changeFeedID, tableExecutor),
liveness: liveness,
compat: compat.New(map[model.CaptureID]*model.CaptureInfo{}),

lastCheckpointWarn: time.Now(),
}

etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down Expand Up @@ -247,11 +251,13 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedule

for tableID, table := range allTables {
status := table.getTableStatus(request.CollectStats)
if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs {
if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs &&
time.Since(a.lastCheckpointWarn) > 30*time.Second {
log.Warn("schedulerv3: CheckpointTs is greater than ResolvedTs",
zap.String("namespace", a.ChangeFeedID.Namespace),
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Int64("tableID", tableID))
a.lastCheckpointWarn = time.Now()
}
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
Expand Down

0 comments on commit 8e71949

Please sign in to comment.