Skip to content

Commit

Permalink
cdc: ignore redo log meta regress for restarting (#6320)
Browse files Browse the repository at this point in the history
ref #6277
  • Loading branch information
hicqu authored Jul 18, 2022
1 parent a4d2b78 commit e351bac
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
5 changes: 3 additions & 2 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,20 +508,21 @@ func (l *LogWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er
l.metaLock.Lock()
defer l.metaLock.Unlock()

// NOTE: both checkpoint and resolved can regress if a cdc instance restarts.
hasChange := false
if checkpointTs > l.meta.CheckpointTs {
l.meta.CheckpointTs = checkpointTs
hasChange = true
} else if checkpointTs > 0 && checkpointTs != l.meta.CheckpointTs {
log.Panic("flushLogMeta with a regressed checkpoint ts",
log.Warn("flushLogMeta with a regressed checkpoint ts, ignore",
zap.Uint64("currCheckpointTs", l.meta.CheckpointTs),
zap.Uint64("recvCheckpointTs", checkpointTs))
}
if resolvedTs > l.meta.ResolvedTs {
l.meta.ResolvedTs = resolvedTs
hasChange = true
} else if resolvedTs > 0 && resolvedTs != l.meta.ResolvedTs {
log.Panic("flushLogMeta with a regressed resolved ts",
log.Warn("flushLogMeta with a regressed resolved ts, ignore",
zap.Uint64("currCheckpointTs", l.meta.ResolvedTs),
zap.Uint64("recvCheckpointTs", resolvedTs))
}
Expand Down
17 changes: 17 additions & 0 deletions cdc/redo/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,23 @@ func TestLogWriterFlushLog(t *testing.T) {
}
}

// checkpoint or meta regress should be ignored correctly.
func TestLogWriterRegress(t *testing.T) {
dir := t.TempDir()
writer, err := NewLogWriter(context.Background(), &LogWriterConfig{
Dir: dir,
ChangeFeedID: model.DefaultChangeFeedID("test-log-writer-regress"),
CaptureID: "cp",
S3Storage: false,
})
require.Nil(t, err)
require.Nil(t, writer.FlushLog(context.Background(), 2, 4))
require.Nil(t, writer.FlushLog(context.Background(), 1, 3))
require.Equal(t, uint64(2), writer.meta.CheckpointTs)
require.Equal(t, uint64(4), writer.meta.ResolvedTs)
_ = writer.Close()
}

func TestNewLogWriter(t *testing.T) {
_, err := NewLogWriter(context.Background(), nil)
require.NotNil(t, err)
Expand Down

0 comments on commit e351bac

Please sign in to comment.