diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 500a6c9f065..6d18ff2b893 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -508,12 +508,13 @@ func (l *LogWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er l.metaLock.Lock() defer l.metaLock.Unlock() + // NOTE: both checkpoint and resolved can regress if a cdc instance restarts. hasChange := false if checkpointTs > l.meta.CheckpointTs { l.meta.CheckpointTs = checkpointTs hasChange = true } else if checkpointTs > 0 && checkpointTs != l.meta.CheckpointTs { - log.Panic("flushLogMeta with a regressed checkpoint ts", + log.Warn("flushLogMeta with a regressed checkpoint ts, ignore", zap.Uint64("currCheckpointTs", l.meta.CheckpointTs), zap.Uint64("recvCheckpointTs", checkpointTs)) } @@ -521,7 +522,7 @@ func (l *LogWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er l.meta.ResolvedTs = resolvedTs hasChange = true } else if resolvedTs > 0 && resolvedTs != l.meta.ResolvedTs { - log.Panic("flushLogMeta with a regressed resolved ts", + log.Warn("flushLogMeta with a regressed resolved ts, ignore", zap.Uint64("currCheckpointTs", l.meta.ResolvedTs), zap.Uint64("recvCheckpointTs", resolvedTs)) } diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index 65acad9b67d..fb2b7b8e972 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -346,6 +346,23 @@ func TestLogWriterFlushLog(t *testing.T) { } } +// checkpoint or meta regress should be ignored correctly. +func TestLogWriterRegress(t *testing.T) { + dir := t.TempDir() + writer, err := NewLogWriter(context.Background(), &LogWriterConfig{ + Dir: dir, + ChangeFeedID: model.DefaultChangeFeedID("test-log-writer-regress"), + CaptureID: "cp", + S3Storage: false, + }) + require.Nil(t, err) + require.Nil(t, writer.FlushLog(context.Background(), 2, 4)) + require.Nil(t, writer.FlushLog(context.Background(), 1, 3)) + require.Equal(t, uint64(2), writer.meta.CheckpointTs) + require.Equal(t, uint64(4), writer.meta.ResolvedTs) + _ = writer.Close() +} + func TestNewLogWriter(t *testing.T) { _, err := NewLogWriter(context.Background(), nil) require.NotNil(t, err)