diff --git a/cdc/processor/sinkmanager/redo_cache.go b/cdc/processor/sinkmanager/redo_cache.go index a7e54eaa967..147d4ef6772 100644 --- a/cdc/processor/sinkmanager/redo_cache.go +++ b/cdc/processor/sinkmanager/redo_cache.go @@ -57,11 +57,20 @@ type popResult struct { events []*model.RowChangedEvent size uint64 // size of events. releaseSize uint64 // size of all released events. - pushCount int - success bool - // If success, boundary is the upperBound of poped events. - // Otherwise, boundary is the lowerBound of cached events. - boundary engine.Position + + // many RowChangedEvent can come from one same PolymorphicEvent. + // pushCount indicates the count of raw PolymorphicEvents. + pushCount int + + // success indicates whether there is a gap between cached events and required events. + success bool + + // If success, upperBoundIfSuccess is the upperBound of poped events. + // The caller should fetch events (upperBoundIfSuccess, upperBound] from engine. + upperBoundIfSuccess engine.Position + // If fail, lowerBoundIfFail is the lowerBound of cached events. + // The caller should fetch events [lowerBound, lowerBoundIfFail) from engine. + lowerBoundIfFail engine.Position } // newRedoEventCache creates a redoEventCache instance. @@ -142,26 +151,28 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu // NOTE: the caller will fetch events [lowerBound, res.boundary) from engine. res.success = false if e.lowerBound.Compare(upperBound.Next()) <= 0 { - res.boundary = e.lowerBound + res.lowerBoundIfFail = e.lowerBound } else { - res.boundary = upperBound.Next() + res.lowerBoundIfFail = upperBound.Next() } return } + if !e.upperBound.Valid() { - // if e.upperBound is invalid, it means there are no resolved transactions - // in the cache. + // It means there are no resolved cached transactions in the required range. // NOTE: the caller will fetch events [lowerBound, res.boundary) from engine. res.success = false - res.boundary = upperBound.Next() + res.lowerBoundIfFail = upperBound.Next() return } res.success = true - if upperBound.Compare(e.upperBound) > 0 { - res.boundary = e.upperBound + if lowerBound.Compare(e.upperBound) > 0 { + res.upperBoundIfSuccess = lowerBound.Prev() + } else if upperBound.Compare(e.upperBound) > 0 { + res.upperBoundIfSuccess = e.upperBound } else { - res.boundary = upperBound + res.upperBoundIfSuccess = upperBound } startIdx := sort.Search(e.readyCount, func(i int) bool { @@ -173,28 +184,23 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu res.releaseSize += e.sizes[i] } - var endIdx int - if startIdx == e.readyCount { - endIdx = startIdx - } else { - endIdx = sort.Search(e.readyCount, func(i int) bool { - pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs} - return pos.Compare(res.boundary) > 0 - }) - res.events = e.events[startIdx:endIdx] - for i := startIdx; i < endIdx; i++ { - res.size += e.sizes[i] - res.pushCount += int(e.pushCounts[i]) - } - res.releaseSize += res.size + endIdx := sort.Search(e.readyCount, func(i int) bool { + pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs} + return pos.Compare(res.upperBoundIfSuccess) > 0 + }) + res.events = e.events[startIdx:endIdx] + for i := startIdx; i < endIdx; i++ { + res.size += e.sizes[i] + res.pushCount += int(e.pushCounts[i]) } + res.releaseSize += res.size e.events = e.events[endIdx:] e.sizes = e.sizes[endIdx:] e.pushCounts = e.pushCounts[endIdx:] e.readyCount -= endIdx // Update boundaries. Set upperBound to invalid if the range has been drained. - e.lowerBound = res.boundary.Next() + e.lowerBound = res.upperBoundIfSuccess.Next() if e.lowerBound.Compare(e.upperBound) > 0 { e.upperBound = engine.Position{} } diff --git a/cdc/processor/sinkmanager/redo_cache_test.go b/cdc/processor/sinkmanager/redo_cache_test.go index 98b60fe9f4e..ef4c1c2558c 100644 --- a/cdc/processor/sinkmanager/redo_cache_test.go +++ b/cdc/processor/sinkmanager/redo_cache_test.go @@ -46,14 +46,14 @@ func TestRedoEventCache(t *testing.T) { // Try to pop [{0,1}, {0,4}], shoud fail. And the returned boundary should be {1,4}. popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 0, CommitTs: 4}) require.False(t, popRes.success) - require.Equal(t, uint64(1), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(1), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) // Try to pop [{0,2}, {0,4}], shoud fail. And the returned boundary should be {3,4}. popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 5, CommitTs: 6}) require.False(t, popRes.success) - require.Equal(t, uint64(3), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(3), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) // Try to pop [{3,4}, {3,4}], should success. popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4}) @@ -61,22 +61,22 @@ func TestRedoEventCache(t *testing.T) { require.Equal(t, 2, len(popRes.events)) require.Equal(t, uint64(300), popRes.size) require.Equal(t, 2, popRes.pushCount) - require.Equal(t, uint64(3), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(3), popRes.upperBoundIfSuccess.StartTs) + require.Equal(t, uint64(4), popRes.upperBoundIfSuccess.CommitTs) // Try to pop [{3,4}, {3,4}] again, shoud fail. And the returned boundary should be {4,4}. popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4}) require.False(t, popRes.success) - require.Equal(t, uint64(4), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) popRes = appender.pop(engine.Position{StartTs: 4, CommitTs: 4}, engine.Position{StartTs: 9, CommitTs: 10}) require.True(t, popRes.success) require.Equal(t, 1, len(popRes.events)) require.Equal(t, uint64(300), popRes.size) require.Equal(t, 1, popRes.pushCount) - require.Equal(t, uint64(5), popRes.boundary.StartTs) - require.Equal(t, uint64(6), popRes.boundary.CommitTs) + require.Equal(t, uint64(5), popRes.upperBoundIfSuccess.StartTs) + require.Equal(t, uint64(6), popRes.upperBoundIfSuccess.CommitTs) require.Equal(t, 0, len(appender.events)) require.True(t, appender.broken) @@ -85,3 +85,64 @@ func TestRedoEventCache(t *testing.T) { require.Equal(t, uint64(0), appender.upperBound.StartTs) require.Equal(t, uint64(0), appender.upperBound.CommitTs) } + +func TestRedoEventCacheAllPopBranches(t *testing.T) { + cache := newRedoEventCache(model.ChangeFeedID{}, 1000) + appender := cache.maybeCreateAppender(1, engine.Position{StartTs: 101, CommitTs: 111}) + var batch []*model.RowChangedEvent + var ok bool + var popRes popResult + + batch = []*model.RowChangedEvent{{StartTs: 1, CommitTs: 11}, {StartTs: 1, CommitTs: 11}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + + batch = []*model.RowChangedEvent{{StartTs: 2, CommitTs: 12}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.lowerBoundIfFail) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.lowerBoundIfFail) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.lowerBoundIfFail) + + batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111}) + require.True(t, ok) + + batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + require.Equal(t, 5, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112}) + require.True(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.upperBoundIfSuccess) + require.Equal(t, 2, len(popRes.events)) + require.Equal(t, 1, popRes.pushCount) + require.Equal(t, uint64(101), popRes.events[1].StartTs) + require.Equal(t, 0, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.lowerBoundIfFail) + + batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 102, CommitTs: 102}) + require.True(t, ok) + require.Equal(t, 2, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 501, CommitTs: 502}, engine.Position{StartTs: 701, CommitTs: 702}) + require.True(t, popRes.success) + require.Equal(t, 0, len(popRes.events)) + require.Equal(t, engine.Position{StartTs: 500, CommitTs: 502}, popRes.upperBoundIfSuccess) + require.Equal(t, 0, appender.readyCount) + require.Equal(t, 0, len(appender.events)) +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index d90a35ce3ac..0ed6d47c930 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -212,8 +212,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if err != nil { return errors.Trace(err) } + lastPos = lowerBound.Prev() if drained { - performCallback(lowerBound.Prev()) + performCallback(lastPos) return nil } } @@ -405,7 +406,7 @@ func (w *sinkWorker) fetchFromCache( } popRes := cache.pop(*lowerBound, *upperBound) if popRes.success { - newLowerBound = popRes.boundary.Next() + newLowerBound = popRes.upperBoundIfSuccess.Next() if len(popRes.events) > 0 { metrics.OutputEventCount.WithLabelValues( task.tableSink.changefeed.Namespace, @@ -420,9 +421,9 @@ func (w *sinkWorker) fetchFromCache( // Get a resolvedTs so that we can record it into sink memory quota. var resolvedTs model.ResolvedTs - isCommitFence := popRes.boundary.IsCommitFence() + isCommitFence := popRes.upperBoundIfSuccess.IsCommitFence() if w.splitTxn { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs) if !isCommitFence { resolvedTs.Mode = model.BatchResolvedMode resolvedTs.BatchID = *batchID @@ -430,9 +431,9 @@ func (w *sinkWorker) fetchFromCache( } } else { if isCommitFence { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs) } else { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs - 1) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs - 1) } } // Transfer the memory usage from redoMemQuota to sinkMemQuota. @@ -448,7 +449,7 @@ func (w *sinkWorker) fetchFromCache( zap.Any("resolvedTs", resolvedTs), zap.Error(err)) } else { - newUpperBound = popRes.boundary.Prev() + newUpperBound = popRes.lowerBoundIfFail.Prev() } cacheDrained = newLowerBound.Compare(newUpperBound) > 0 log.Debug("fetchFromCache is performed", diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index a2376b8ae9f..4f9891deb9a 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -1231,3 +1231,51 @@ func genUpperBoundGetter(commitTs model.Ts) func(_ model.Ts) engine.Position { } } } + +func (suite *workerSuite) TestHandleTaskWithCache() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(2, 4, 1), + genPolymorphicEvent(2, 4, 1), + genPolymorphicResolvedEvent(4), + } + w, e := createWorker(model.ChangeFeedID{}, 0, true, 1) + w.eventCache = newRedoEventCache(model.ChangeFeedID{}, 1024*1024) + appender := w.eventCache.maybeCreateAppender(1, engine.Position{StartTs: 1, CommitTs: 3}) + appender.pushBatch( + []*model.RowChangedEvent{events[0].Row, events[1].Row}, + uint64(0), engine.Position{StartTs: 2, CommitTs: 4}, + ) + defer w.sinkMemQuota.Close() + addEventsToSortEngine(suite.T(), events, e, 1) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWrittenPos engine.Position) { + require.Equal(suite.T(), engine.Position{StartTs: 2, CommitTs: 4}, lastWrittenPos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + tableID: 1, + lowerBound: engine.Position{StartTs: 1, CommitTs: 3}, + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index c7be3d1dfc6..892b61aa9e3 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -49,6 +49,8 @@ type agent struct { // 1. The capture receives a SIGTERM signal. // 2. The agent receives a stopping heartbeat. liveness *model.Liveness + + lastCheckpointWarn time.Time } type agentInfo struct { @@ -98,6 +100,8 @@ func newAgent( tableM: newTableManager(changeFeedID, tableExecutor), liveness: liveness, compat: compat.New(map[model.CaptureID]*model.CaptureInfo{}), + + lastCheckpointWarn: time.Now(), } etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second)