From f37b88cc9a6a5718d9189c0a96b76eb39749ef0a Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 30 Nov 2021 22:29:53 +0800 Subject: [PATCH] cdc/sink: Sink manager manage checkpoint per table (#3621) --- cdc/processor/processor.go | 2 ++ cdc/processor/processor_test.go | 2 ++ cdc/sink/manager.go | 48 +++++++++++++++++++++------------ cdc/sink/manager_test.go | 39 +++++++++++++++++---------- cdc/sink/table_sink.go | 4 +-- 5 files changed, 62 insertions(+), 33 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 6229d57b25c..68acc2acacf 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -307,6 +307,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR if err := p.lazyInit(ctx); err != nil { return nil, errors.Trace(err) } + // sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed + p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status)) if err := p.handleTableOperation(ctx); err != nil { return nil, errors.Trace(err) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index cbf234e1aa4..bd7b3e76204 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -29,6 +29,7 @@ import ( tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline" "github.com/pingcap/ticdc/cdc/redo" "github.com/pingcap/ticdc/cdc/scheduler" + "github.com/pingcap/ticdc/cdc/sink" cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" @@ -52,6 +53,7 @@ func newProcessor4Test( ) *processor { p := newProcessor(ctx) p.lazyInit = func(ctx cdcContext.Context) error { return nil } + p.sinkManager = &sink.Manager{} p.redoManager = redo.NewDisabledManager() p.createTablePipeline = createTablePipeline p.schemaStorage = &mockSchemaStorage{c: c, resolvedTs: math.MaxUint64} diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index ed80e41703d..ce71608b4ae 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - redo "github.com/pingcap/ticdc/cdc/redo" + "github.com/pingcap/ticdc/cdc/redo" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -34,10 +34,11 @@ const ( // Manager manages table sinks, maintains the relationship between table sinks and backendSink. type Manager struct { - backendSink Sink - checkpointTs model.Ts - tableSinks map[model.TableID]*tableSink - tableSinksMu sync.Mutex + backendSink Sink + tableCheckpointTsMap sync.Map + tableSinks map[model.TableID]*tableSink + tableSinksMu sync.Mutex + changeFeedCheckpointTs uint64 flushMu sync.Mutex flushing int64 @@ -57,7 +58,7 @@ func NewManager( drawbackChan := make(chan drawbackMsg, 16) return &Manager{ backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan), - checkpointTs: checkpointTs, + changeFeedCheckpointTs: checkpointTs, tableSinks: make(map[model.TableID]*tableSink), drawbackChan: drawbackChan, captureAddr: captureAddr, @@ -87,18 +88,21 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts, // Close closes the Sink manager and backend Sink, this method can be reentrantly called func (m *Manager) Close(ctx context.Context) error { tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) - return m.backendSink.Close(ctx) + if m.backendSink != nil { + return m.backendSink.Close(ctx) + } + return nil } -func (m *Manager) getMinEmittedTs() model.Ts { +func (m *Manager) getMinEmittedTs(tableID model.TableID) model.Ts { m.tableSinksMu.Lock() defer m.tableSinksMu.Unlock() if len(m.tableSinks) == 0 { - return m.getCheckpointTs() + return m.getCheckpointTs(tableID) } minTs := model.Ts(math.MaxUint64) - for _, tableSink := range m.tableSinks { - resolvedTs := tableSink.getResolvedTs() + for _, tblSink := range m.tableSinks { + resolvedTs := tblSink.getResolvedTs() if minTs > resolvedTs { minTs = resolvedTs } @@ -111,19 +115,19 @@ func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) ( // which will cause a lot of lock contention and blocking in high concurrency cases. // So here we use flushing as a lightweight lock to improve the lock competition problem. if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - return m.getCheckpointTs(), nil + return m.getCheckpointTs(tableID), nil } m.flushMu.Lock() defer func() { m.flushMu.Unlock() atomic.StoreInt64(&m.flushing, 0) }() - minEmittedTs := m.getMinEmittedTs() + minEmittedTs := m.getMinEmittedTs(tableID) checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs) if err != nil { - return m.getCheckpointTs(), errors.Trace(err) + return m.getCheckpointTs(tableID), errors.Trace(err) } - atomic.StoreUint64(&m.checkpointTs, checkpointTs) + m.tableCheckpointTsMap.Store(tableID, checkpointTs) return checkpointTs, nil } @@ -145,8 +149,18 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return m.backendSink.Barrier(ctx, tableID) } -func (m *Manager) getCheckpointTs() uint64 { - return atomic.LoadUint64(&m.checkpointTs) +func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := m.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) + } + // cannot find table level checkpointTs because of no table level resolvedTs flush task finished successfully, + // for example: first time to flush resolvedTs but cannot get the flush lock, return changefeed level checkpointTs is safe + return atomic.LoadUint64(&m.changeFeedCheckpointTs) +} + +func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs) } type drawbackMsg struct { diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index a485d9e7a75..1e49d1ab01d 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -35,9 +35,17 @@ var _ = check.Suite(&managerSuite{}) type checkSink struct { *check.C - rows []*model.RowChangedEvent + rows map[model.TableID][]*model.RowChangedEvent rowsMu sync.Mutex - lastResolvedTs uint64 + lastResolvedTs map[model.TableID]uint64 +} + +func newCheckSink(c *check.C) *checkSink { + return &checkSink{ + C: c, + rows: make(map[model.TableID][]*model.RowChangedEvent), + lastResolvedTs: make(map[model.TableID]uint64), + } } func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { @@ -47,7 +55,9 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() - c.rows = append(c.rows, rows...) + for _, row := range rows { + c.rows[row.Table.TableID] = append(c.rows[row.Table.TableID], row) + } return nil } @@ -59,20 +69,21 @@ func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab c.rowsMu.Lock() defer c.rowsMu.Unlock() var newRows []*model.RowChangedEvent - for _, row := range c.rows { - if row.CommitTs <= c.lastResolvedTs { - return c.lastResolvedTs, errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs) + rows := c.rows[tableID] + for _, row := range rows { + if row.CommitTs <= c.lastResolvedTs[tableID] { + return c.lastResolvedTs[tableID], errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs) } if row.CommitTs > resolvedTs { newRows = append(newRows, row) } } - c.Assert(c.lastResolvedTs, check.LessEqual, resolvedTs) - c.lastResolvedTs = resolvedTs - c.rows = newRows + c.Assert(c.lastResolvedTs[tableID], check.LessEqual, resolvedTs) + c.lastResolvedTs[tableID] = resolvedTs + c.rows[tableID] = newRows - return c.lastResolvedTs, nil + return c.lastResolvedTs[tableID], nil } func (c *checkSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -92,7 +103,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) goroutineNum := 10 rowNum := 100 @@ -147,7 +158,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) goroutineNum := 200 var wg sync.WaitGroup @@ -234,7 +245,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { defer cancel() errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "") defer manager.Close(ctx) tableID := int64(49) @@ -255,7 +266,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { func BenchmarkManagerFlushing(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 16) - manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "") + manager := NewManager(ctx, newCheckSink(nil), errCh, 0, "", "") // Init table sinks. goroutineNum := 2000 diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 2cb30150ae9..4a442266f30 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -71,7 +71,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) if err != nil { - return t.manager.getCheckpointTs(), errors.Trace(err) + return t.manager.getCheckpointTs(tableID), errors.Trace(err) } atomic.StoreUint64(&t.emittedTs, resolvedTs) ckpt, err := t.flushRedoLogs(ctx, resolvedTs) @@ -85,7 +85,7 @@ func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint6 if t.redoManager.Enabled() { err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) if err != nil { - return t.manager.getCheckpointTs(), err + return t.manager.getCheckpointTs(t.tableID), err } } return 0, nil