diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 80a32391a07..22a10efdbae 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -317,6 +317,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* default: } + if c.redoMetaMgr.Enabled() { + if !c.redoMetaMgr.Running() { + return nil + } + } + // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { @@ -551,32 +557,21 @@ LOOP2: ctx.Throw(c.ddlPuller.Run(cancelCtx)) }() - stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id) - c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs) - failpoint.Inject("ChangefeedNewRedoManagerError", func() { - err = errors.New("changefeed new redo manager injected error") - }) - if err != nil { - return err - } + c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { defer c.wg.Done() - ctx.Throw(c.redoDDLMgr.Run(stdCtx)) + ctx.Throw(c.redoDDLMgr.Run(cancelCtx)) }() } - c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx, - c.state.Info.Config.Consistent, checkpointTs) - if err != nil { - return err - } + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { defer c.wg.Done() - ctx.Throw(c.redoMetaMgr.Run(stdCtx)) + ctx.Throw(c.redoMetaMgr.Run(cancelCtx)) }() } log.Info("owner creates redo manager", @@ -733,15 +728,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { - redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent) - if err != nil { - log.Info("owner creates redo manager for clean fail", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err)) - return - } - c.redoMetaMgr = redoMetaMgr + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index b763c808929..9c0fcf51c09 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -101,7 +101,8 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, model.Ts(5), table.ResolvedTs()) ctx, cancel := context.WithCancel(context.Background()) eg, egCtx := errgroup.WithContext(ctx) - table.redoDMLMgr, _ = redo.NewDMLManager(ctx, &config.ConsistentConfig{ + changefeedID := model.DefaultChangeFeedID("test") + table.redoDMLMgr = redo.NewDMLManager(changefeedID, &config.ConsistentConfig{ Level: string(redoCfg.ConsistentLevelEventual), FlushIntervalInMs: redoCfg.MinFlushIntervalInMs, Storage: fmt.Sprintf("file://tmp/%s", t.TempDir()), diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 56f377ad8d0..f574ffaeb70 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -805,10 +805,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { conf := config.GetGlobalServerConfig() p.pullBasedSinking = conf.Debug.EnablePullBasedSink - p.redoDMLMgr, err = redo.NewDMLManager(stdCtx, p.changefeed.Info.Config.Consistent) - if err != nil { - return err - } + p.redoDMLMgr = redo.NewDMLManager(p.changefeedID, p.changefeed.Info.Config.Consistent) if p.redoDMLMgr.Enabled() { p.wg.Add(1) go func() { diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 53b3f81c608..44c07f29f72 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -19,9 +19,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" @@ -62,19 +61,17 @@ func NewDisabledDDLManager() *ddlManager { // NewDDLManager creates a new ddl Manager. func NewDDLManager( - ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts, -) (*ddlManager, error) { - logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType) - if err != nil { - return nil, err - } + changefeedID model.ChangeFeedID, + cfg *config.ConsistentConfig, ddlStartTs model.Ts, +) *ddlManager { + m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType) tableID := int64(0) - logManager.AddTable(tableID, ddlStartTs) + m.AddTable(tableID, ddlStartTs) return &ddlManager{ - logManager: logManager, + logManager: m, // The current fakeTableID is meaningless, find a meaningful id in the future. fakeTableID: tableID, - }, nil + } } type ddlManager struct { @@ -111,12 +108,12 @@ type DMLManager interface { } // NewDMLManager creates a new dml Manager. -func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) { - logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType) - if err != nil { - return nil, err +func NewDMLManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, +) *dmlManager { + return &dmlManager{ + logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType), } - return &dmlManager{logManager: logManager}, nil } // NewDisabledDMLManager creates a disabled dml Manager. @@ -222,28 +219,21 @@ type logManager struct { } func newLogManager( - ctx context.Context, cfg *config.ConsistentConfig, logType string, -) (*logManager, error) { + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string, +) *logManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &logManager{enabled: false}, nil + return &logManager{enabled: false} } - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - changefeedID := contextutil.ChangefeedIDFromCtx(ctx) - m := &logManager{ + return &logManager{ enabled: true, cfg: &writer.LogWriterConfig{ - ConsistentConfig: *cfg, - LogType: logType, - CaptureID: contextutil.CaptureAddrFromCtx(ctx), - ChangeFeedID: changefeedID, - URI: *uri, - UseExternalStorage: redo.IsExternalStorage(uri.Scheme), - MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, + ConsistentConfig: *cfg, + LogType: logType, + CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, + ChangeFeedID: changefeedID, + MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, }, logBuffer: chann.NewDrainableChann[cacheEvents](), metricWriteLogDuration: common.RedoWriteLogDurationHistogram. @@ -255,15 +245,17 @@ func newLogManager( metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio. WithLabelValues(changefeedID.Namespace, changefeedID.ID), } - - m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg) - if err != nil { - return nil, err - } - return m, nil } +// Run implements pkg/util.Runnable. func (m *logManager) Run(ctx context.Context) error { + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + failpoint.Return(errors.New("changefeed new redo manager injected error")) + }) + if !m.Enabled() { + return nil + } + defer m.close() start := time.Now() w, err := factory.NewRedoLogWriter(ctx, m.cfg) @@ -549,11 +541,13 @@ func (m *logManager) close() { atomic.StoreInt32(&m.closed, 1) m.logBuffer.CloseAndDrain() - if err := m.writer.Close(); err != nil { - log.Error("redo manager fails to close writer", - zap.String("namespace", m.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", m.cfg.ChangeFeedID.ID), - zap.Error(err)) + if m.writer != nil { + if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { + log.Error("redo manager fails to close writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Error(err)) + } } log.Info("redo manager closed", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 9d29fe3d74d..e30eae138f4 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -54,9 +54,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } - dmlMgr, err := NewDMLManager(ctx, cfg) - require.Nil(b, err) - eg := errgroup.Group{} + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group eg.Go(func() error { return dmlMgr.Run(ctx) }) @@ -124,6 +123,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { time.Sleep(time.Millisecond * 500) } cancel() - err = eg.Wait() - require.ErrorIs(b, err, context.Canceled) + + require.ErrorIs(b, eg.Wait(), context.Canceled) } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 86ce58313eb..1a5f340c40a 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/pkg/config" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -47,6 +47,9 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error + + // Running return true if the meta manager is running or not. + Running() bool } type metaManager struct { @@ -54,6 +57,9 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + // running means the meta manager now running normally. + running atomic.Bool + metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -63,8 +69,11 @@ type metaManager struct { uuidGenerator uuid.Generator preMetaFile string - lastFlushTime time.Time + startTs model.Ts + flushIntervalInMs int64 + lastFlushTime time.Time + cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer } @@ -75,51 +84,22 @@ func NewDisabledMetaManager() *metaManager { } } -// NewMetaManagerWithInit creates a new Manager and initializes the meta. -func NewMetaManagerWithInit( - ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts, -) (*metaManager, error) { - m, err := NewMetaManager(ctx, cfg) - if err != nil { - return nil, err - } - - // There is no need to perform initialize operation if metaMgr is disabled - // or the scheme is blackhole. - if m.extStorage != nil { - m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. - WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err = m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - if err = m.initMeta(ctx, startTs); err != nil { - log.Warn("init redo meta fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - } - - return m, nil -} - // NewMetaManager creates a new meta Manager. -func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) { +func NewMetaManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, +) *metaManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &metaManager{enabled: false}, nil + return &metaManager{enabled: false} } m := &metaManager{ - captureID: contextutil.CaptureAddrFromCtx(ctx), - changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), + changeFeedID: changefeedID, + captureID: config.GetGlobalServerConfig().AdvertiseAddr, uuidGenerator: uuid.NewGenerator(), enabled: true, + cfg: cfg, + startTs: checkpoint, flushIntervalInMs: cfg.MetaFlushIntervalInMs, } @@ -128,44 +108,70 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan zap.Int64("interval", m.flushIntervalInMs)) m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } + return m +} - uri, err := storage.ParseRawURL(cfg.Storage) +// Enabled returns whether this log manager is enabled +func (m *metaManager) Enabled() bool { + return m.enabled +} + +// Running return whether the meta manager is initialized, +// which means the external storage is accessible to the meta manager. +func (m *metaManager) Running() bool { + return m.running.Load() +} + +func (m *metaManager) preStart(ctx context.Context) error { + uri, err := storage.ParseRawURL(m.cfg.Storage) if err != nil { - return nil, err - } - if redo.IsBlackholeStorage(uri.Scheme) { - return m, nil + return err } - // "nfs" and "local" scheme are converted to "file" scheme redo.FixLocalScheme(uri) + extStorage, err := redo.InitExternalStorage(ctx, *uri) if err != nil { - return nil, err + return err } m.extStorage = extStorage - return m, nil -} -// Enabled returns whether this log manager is enabled -func (m *metaManager) Enabled() bool { - return m.enabled + m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + + err = m.preCleanupExtStorage(ctx) + if err != nil { + log.Warn("redo: pre clean redo logs fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + err = m.initMeta(ctx) + if err != nil { + log.Warn("redo: init redo meta fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + return nil } // Run runs bgFlushMeta and bgGC. func (m *metaManager) Run(ctx context.Context) error { - if m.extStorage == nil { - log.Warn("extStorage of redo meta manager is nil, skip running") - return nil + if err := m.preStart(ctx); err != nil { + return err } - eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { - return m.bgFlushMeta(egCtx, m.flushIntervalInMs) + return m.bgFlushMeta(egCtx) }) eg.Go(func() error { return m.bgGC(egCtx) }) + + m.running.Store(true) return eg.Wait() } @@ -194,9 +200,9 @@ func (m *metaManager) GetFlushedMeta() common.LogMeta { return common.LogMeta{CheckpointTs: checkpointTs, ResolvedTs: resolvedTs} } -// initMeta will read the meta file from external storage and initialize the meta -// field of metaManager. -func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { +// initMeta will read the meta file from external storage and +// use it to initialize the meta field of the metaManager. +func (m *metaManager) initMeta(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -204,10 +210,14 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { } metas := []*common.LogMeta{ - {CheckpointTs: startTs, ResolvedTs: startTs}, + {CheckpointTs: m.startTs, ResolvedTs: m.startTs}, } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + log.Info("redo: meta manager walk dir", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Int64("size", size)) // TODO: use prefix to accelerate traverse operation if !strings.HasSuffix(path, redo.MetaEXT) { return nil @@ -215,22 +225,30 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { toRemoveMetaFiles = append(toRemoveMetaFiles, path) data, err := m.extStorage.ReadFile(ctx, path) - if err != nil && !util.IsNotExistInExtStorage(err) { - return err - } - if len(data) != 0 { - var meta common.LogMeta - _, err = meta.UnmarshalMsg(data) - if err != nil { + if err != nil { + log.Warn("redo: read meta file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) + if !util.IsNotExistInExtStorage(err) { return err } - metas = append(metas, &meta) + return nil + } + var meta common.LogMeta + _, err = meta.UnmarshalMsg(data) + if err != nil { + log.Error("redo: unmarshal meta data failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err), zap.ByteString("data", data)) + return err } + metas = append(metas, &meta) return nil }) if err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "read meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } var checkpointTs, resolvedTs uint64 @@ -243,9 +261,15 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { m.metaResolvedTs.unflushed = resolvedTs m.metaCheckpointTs.unflushed = checkpointTs if err := m.maybeFlushMeta(ctx); err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "flush meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } + + flushedMeta := m.GetFlushedMeta() + log.Info("redo: meta manager flush init meta success", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Uint64("checkpointTs", flushedMeta.CheckpointTs), + zap.Uint64("resolvedTs", flushedMeta.ResolvedTs)) return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles) } @@ -305,8 +329,19 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { // deleteAllLogs delete all redo logs and leave a deleted mark. func (m *metaManager) deleteAllLogs(ctx context.Context) error { + // when one changefeed with redo enabled gets deleted, it's extStorage should always be set to not nil + // otherwise it should have already meet panic during changefeed running time. + // the extStorage may be nil in the unit test, so just set the external storage to make unit test happy. if m.extStorage == nil { - return nil + uri, err := storage.ParseRawURL(m.cfg.Storage) + redo.FixLocalScheme(uri) + if err != nil { + return err + } + m.extStorage, err = redo.InitExternalStorage(ctx, *uri) + if err != nil { + return err + } } // Write deleted mark before clean any files. deleteMarker := getDeletedChangefeedMarker(m.changeFeedID) @@ -382,6 +417,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } metaFile := getMetafileName(m.captureID, m.changeFeedID, m.uuidGenerator) if err := m.extStorage.WriteFile(ctx, metaFile, data); err != nil { + log.Error("redo: meta manager flush meta write file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } @@ -392,6 +431,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } err := m.extStorage.DeleteFile(ctx, m.preMetaFile) if err != nil && !util.IsNotExistInExtStorage(err) { + log.Error("redo: meta manager flush meta delete file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } } @@ -418,8 +461,8 @@ func (m *metaManager) Cleanup(ctx context.Context) error { return m.deleteAllLogs(ctx) } -func (m *metaManager) bgFlushMeta(egCtx context.Context, flushIntervalInMs int64) (err error) { - ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) +func (m *metaManager) bgFlushMeta(egCtx context.Context) (err error) { + ticker := time.NewTicker(time.Duration(m.flushIntervalInMs) * time.Millisecond) defer func() { ticker.Stop() log.Info("redo metaManager bgFlushMeta exits", diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index e630d19f249..51ddb37e9fd 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -51,17 +51,18 @@ func TestInitAndWriteMeta(t *testing.T) { {CheckpointTs: 8, ResolvedTs: 9}, {CheckpointTs: 9, ResolvedTs: 11}, } - toReomoveFiles := []string{} + + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) metaName := getMetafileName(captureID, changefeedID, uuid.NewGenerator()) err = extStorage.WriteFile(ctx, metaName, data) require.NoError(t, err) - toReomoveFiles = append(toReomoveFiles, metaName) + toRemoveFiles = append(toRemoveFiles, metaName) } - // err = extStorage.WriteFile(ctx, getDeletedChangefeedMarker(changefeedID), []byte{}) - notRemoveFiles := []string{} + + var notRemoveFiles []string require.NoError(t, err) for i := 0; i < 10; i++ { fileName := "dummy" + getChangefeedMatcher(changefeedID) + strconv.Itoa(i) @@ -78,11 +79,22 @@ func TestInitAndWriteMeta(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) - for _, fileName := range toReomoveFiles { + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return uint64(11) == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) @@ -93,7 +105,10 @@ func TestInitAndWriteMeta(t *testing.T) { require.True(t, ret, "file %s should not be removed", fileName) } - testWriteMeta(t, m) + testWriteMeta(ctx, t, m) + + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestPreCleanupAndWriteMeta(t *testing.T) { @@ -116,7 +131,7 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { {CheckpointTs: 9, ResolvedTs: 11}, {CheckpointTs: 11, ResolvedTs: 12}, } - toRemoveFiles := []string{} + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) @@ -143,24 +158,36 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) } - testWriteMeta(t, m) -} + testWriteMeta(ctx, t, m) -func testWriteMeta(t *testing.T, m *metaManager) { - ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) +} - checkMeta := func(targetCkpts, targetRts uint64) { +func testWriteMeta(ctx context.Context, t *testing.T, m *metaManager) { + checkMeta := func(targetCheckpointTs, targetResolvedTs uint64) { var checkpointTs, resolvedTs uint64 - metas := []*common.LogMeta{} + var metas []*common.LogMeta cnt := 0 m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { if !strings.HasSuffix(path, redo.MetaEXT) { @@ -177,14 +204,10 @@ func testWriteMeta(t *testing.T, m *metaManager) { }) require.Equal(t, 1, cnt) common.ParseMeta(metas, &checkpointTs, &resolvedTs) - require.Equal(t, targetCkpts, checkpointTs) - require.Equal(t, targetRts, resolvedTs) + require.Equal(t, targetCheckpointTs, checkpointTs) + require.Equal(t, targetResolvedTs, resolvedTs) } - eg := errgroup.Group{} - eg.Go(func() error { - return m.Run(ctx) - }) // test both regressed meta := m.GetFlushedMeta() m.UpdateMeta(1, 2) @@ -210,9 +233,6 @@ func testWriteMeta(t *testing.T, m *metaManager) { return m.metaCheckpointTs.getFlushed() == 16 }, time.Second, 50*time.Millisecond) checkMeta(16, 21) - - cancel() - require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestGCAndCleanup(t *testing.T) { @@ -275,15 +295,21 @@ func TestGCAndCleanup(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) + m := NewMetaManager(changefeedID, cfg, startTs) - eg := errgroup.Group{} + var eg errgroup.Group eg.Go(func() error { return m.Run(ctx) }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + checkGC(startTs) for i := startTs; i <= uint64(maxCommitTs); i++ { diff --git a/cdc/redo/writer/blackhole/blackhole_log_writer.go b/cdc/redo/writer/blackhole/blackhole_log_writer.go index 24cc03bd3df..9a877f262f3 100644 --- a/cdc/redo/writer/blackhole/blackhole_log_writer.go +++ b/cdc/redo/writer/blackhole/blackhole_log_writer.go @@ -26,14 +26,21 @@ var _ writer.RedoLogWriter = (*blackHoleWriter)(nil) // blackHoleSink defines a blackHole storage, it receives events and persists // without any latency -type blackHoleWriter struct{} +type blackHoleWriter struct { + invalid bool +} // NewLogWriter creates a blackHole writer -func NewLogWriter() *blackHoleWriter { - return &blackHoleWriter{} +func NewLogWriter(invalid bool) *blackHoleWriter { + return &blackHoleWriter{ + invalid: invalid, + } } func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoEvent) (err error) { + if bs.invalid { + return errors.New("[WriteLog] invalid black hole writer") + } if len(events) == 0 { return nil } @@ -45,30 +52,12 @@ func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoE } func (bs *blackHoleWriter) FlushLog(_ context.Context) error { + if bs.invalid { + return errors.New("[FlushLog] invalid black hole writer") + } return nil } func (bs *blackHoleWriter) Close() error { return nil } - -type invalidBlackHoleWriter struct { - *blackHoleWriter -} - -// NewInvalidLogWriter creates a invalid blackHole writer -func NewInvalidLogWriter(rl writer.RedoLogWriter) *invalidBlackHoleWriter { - return &invalidBlackHoleWriter{ - blackHoleWriter: rl.(*blackHoleWriter), - } -} - -func (ibs *invalidBlackHoleWriter) WriteEvents( - _ context.Context, _ ...writer.RedoEvent, -) (err error) { - return errors.New("[WriteLog] invalid black hole writer") -} - -func (ibs *invalidBlackHoleWriter) FlushLog(_ context.Context) error { - return errors.New("[FlushLog] invalid black hole writer") -} diff --git a/cdc/redo/writer/factory/factory.go b/cdc/redo/writer/factory/factory.go index 7684b3f06f0..796d803ae45 100644 --- a/cdc/redo/writer/factory/factory.go +++ b/cdc/redo/writer/factory/factory.go @@ -15,7 +15,9 @@ package factory import ( "context" + "strings" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/cdc/redo/writer/file" @@ -28,14 +30,23 @@ import ( func NewRedoLogWriter( ctx context.Context, lwCfg *writer.LogWriterConfig, ) (writer.RedoLogWriter, error) { - scheme := lwCfg.URI.Scheme - if !redo.IsValidConsistentStorage(scheme) { - return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme) + uri, err := storage.ParseRawURL(lwCfg.Storage) + if err != nil { + return nil, err } - if redo.IsBlackholeStorage(scheme) { - return blackhole.NewLogWriter(), nil + if !redo.IsValidConsistentStorage(uri.Scheme) { + return nil, errors.ErrConsistentStorage.GenWithStackByArgs(uri.Scheme) } + + lwCfg.URI = uri + lwCfg.UseExternalStorage = redo.IsExternalStorage(uri.Scheme) + + if redo.IsBlackholeStorage(uri.Scheme) { + invalid := strings.HasSuffix(uri.Scheme, "invalid") + return blackhole.NewLogWriter(invalid), nil + } + if lwCfg.UseFileBackend { return file.NewLogWriter(ctx, lwCfg) } diff --git a/cdc/redo/writer/file/file.go b/cdc/redo/writer/file/file.go index e895bffe8c6..e12a5df59de 100644 --- a/cdc/redo/writer/file/file.go +++ b/cdc/redo/writer/file/file.go @@ -86,7 +86,7 @@ func NewFileWriter( var extStorage storage.ExternalStorage if cfg.UseExternalStorage { var err error - extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err = redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func NewFileWriter( // if we use S3 as the remote storage, a file allocator can be leveraged to // pre-allocate files for us. // TODO: test whether this improvement can also be applied to NFS. - if cfg.UseExternalStorage { + if w.cfg.UseExternalStorage { w.allocator = fsutil.NewFileAllocator(cfg.Dir, cfg.LogType, cfg.MaxLogSizeInBytes) } diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index fa59c6dfe94..f2dfa102590 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -123,7 +123,7 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 771888a418d..45b366117c0 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -138,7 +138,7 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo file workers closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index a47a83e855b..0460fdbf08b 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -54,11 +54,11 @@ func NewLogWriter( } // "nfs" and "local" scheme are converted to "file" scheme if !cfg.UseExternalStorage { - redo.FixLocalScheme(&cfg.URI) + redo.FixLocalScheme(cfg.URI) cfg.UseExternalStorage = redo.IsExternalStorage(cfg.URI.Scheme) } - extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err := redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 8d1a03a7107..53943e52a29 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -68,7 +68,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { LogType: redo.RedoDDLLogFileType, CaptureID: "test-capture", ChangeFeedID: model.DefaultChangeFeedID("test-changefeed"), - URI: *uri, + URI: uri, UseExternalStorage: true, MaxLogSizeInBytes: 10 * redo.Megabyte, } diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 18779997b30..42ac64d4ff6 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -52,7 +52,7 @@ type LogWriterConfig struct { CaptureID model.CaptureID ChangeFeedID model.ChangeFeedID - URI url.URL + URI *url.URL UseExternalStorage bool Dir string MaxLogSizeInBytes int64 diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index c5e7aad2c93..cc52f042bf9 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -589,6 +589,10 @@ func (m *mockRedoMetaManager) Run(ctx context.Context) error { return nil } +func (m *mockRedoMetaManager) Running() bool { + return true +} + func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { t.Parallel() diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 48582905789..bee9095f566 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -686,10 +686,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { defer c.ddlListMu.Unlock() // DDL CommitTs fallback, just crash it to indicate the bug. if c.ddlWithMaxCommitTs != nil && ddl.CommitTs < c.ddlWithMaxCommitTs.CommitTs { - log.Panic("DDL CommitTs < maxCommitTsDDL.CommitTs", + log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.Uint64("maxCommitTs", c.ddlWithMaxCommitTs.CommitTs), zap.Any("DDL", ddl)) + return } // A rename tables DDL job contains multiple DDL events with same CommitTs. diff --git a/pkg/redo/config.go b/pkg/redo/config.go index d0562b83be5..d6b66de45ed 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -174,7 +174,7 @@ func FixLocalScheme(uri *url.URL) { // IsBlackholeStorage returns whether a blackhole storage is used. func IsBlackholeStorage(scheme string) bool { - return ConsistentStorage(scheme) == consistentStorageBlackhole + return strings.HasPrefix(scheme, string(consistentStorageBlackhole)) } // InitExternalStorage init an external storage. @@ -222,6 +222,18 @@ func ValidateStorage(uri *url.URL) error { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, fmt.Sprintf("can't make dir for new redo log: %+v", uri))) } + + file := filepath.Join(uri.Path, "file.test") + if err := os.WriteFile(file, []byte(""), DefaultFileMode); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't write file for new redo log: %+v", uri))) + } + + if _, err := os.ReadFile(file); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't read file for new redo log: %+v", uri))) + } + _ = os.Remove(file) return nil } diff --git a/tests/integration_tests/changefeed_error/conf/enable_redo.toml b/tests/integration_tests/changefeed_error/conf/enable_redo.toml new file mode 100644 index 00000000000..4f1ab09b13a --- /dev/null +++ b/tests/integration_tests/changefeed_error/conf/enable_redo.toml @@ -0,0 +1,3 @@ +[consistent] +level = "eventual" +storage = "file:///tmp/tidb_cdc_test/changefeed_error/redo" diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index abaf3d6b556..67ef5e5492e 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -104,11 +104,11 @@ function run() { cleanup_process $CDC_BINARY # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_3="changefeed-initialize-error" - run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 + run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 --config=$CUR/conf/enable_redo.toml ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" run_cdc_cli changefeed pause -c $changefeedid_3 ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" ""