Skip to content

Commit

Permalink
cdc/sink: Sink manager manage checkpoint per table (pingcap#3621)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored and okJiang committed Dec 8, 2021
1 parent 3dbcbc0 commit f37b88c
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 33 deletions.
2 changes: 2 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
if err := p.lazyInit(ctx); err != nil {
return nil, errors.Trace(err)
}
// sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed
p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status))
if err := p.handleTableOperation(ctx); err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline"
"github.com/pingcap/ticdc/cdc/redo"
"github.com/pingcap/ticdc/cdc/scheduler"
"github.com/pingcap/ticdc/cdc/sink"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
Expand All @@ -52,6 +53,7 @@ func newProcessor4Test(
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.sinkManager = &sink.Manager{}
p.redoManager = redo.NewDisabledManager()
p.createTablePipeline = createTablePipeline
p.schemaStorage = &mockSchemaStorage{c: c, resolvedTs: math.MaxUint64}
Expand Down
48 changes: 31 additions & 17 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
redo "github.com/pingcap/ticdc/cdc/redo"
"github.com/pingcap/ticdc/cdc/redo"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -34,10 +34,11 @@ const (

// Manager manages table sinks, maintains the relationship between table sinks and backendSink.
type Manager struct {
backendSink Sink
checkpointTs model.Ts
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
backendSink Sink
tableCheckpointTsMap sync.Map
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
changeFeedCheckpointTs uint64

flushMu sync.Mutex
flushing int64
Expand All @@ -57,7 +58,7 @@ func NewManager(
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
changeFeedCheckpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
captureAddr: captureAddr,
Expand Down Expand Up @@ -87,18 +88,21 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts,
// Close closes the Sink manager and backend Sink, this method can be reentrantly called
func (m *Manager) Close(ctx context.Context) error {
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
return m.backendSink.Close(ctx)
if m.backendSink != nil {
return m.backendSink.Close(ctx)
}
return nil
}

func (m *Manager) getMinEmittedTs() model.Ts {
func (m *Manager) getMinEmittedTs(tableID model.TableID) model.Ts {
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if len(m.tableSinks) == 0 {
return m.getCheckpointTs()
return m.getCheckpointTs(tableID)
}
minTs := model.Ts(math.MaxUint64)
for _, tableSink := range m.tableSinks {
resolvedTs := tableSink.getResolvedTs()
for _, tblSink := range m.tableSinks {
resolvedTs := tblSink.getResolvedTs()
if minTs > resolvedTs {
minTs = resolvedTs
}
Expand All @@ -111,19 +115,19 @@ func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
return m.getCheckpointTs(), nil
return m.getCheckpointTs(tableID), nil
}
m.flushMu.Lock()
defer func() {
m.flushMu.Unlock()
atomic.StoreInt64(&m.flushing, 0)
}()
minEmittedTs := m.getMinEmittedTs()
minEmittedTs := m.getMinEmittedTs(tableID)
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs)
if err != nil {
return m.getCheckpointTs(), errors.Trace(err)
return m.getCheckpointTs(tableID), errors.Trace(err)
}
atomic.StoreUint64(&m.checkpointTs, checkpointTs)
m.tableCheckpointTsMap.Store(tableID, checkpointTs)
return checkpointTs, nil
}

Expand All @@ -145,8 +149,18 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
return m.backendSink.Barrier(ctx, tableID)
}

func (m *Manager) getCheckpointTs() uint64 {
return atomic.LoadUint64(&m.checkpointTs)
func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := m.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
// cannot find table level checkpointTs because of no table level resolvedTs flush task finished successfully,
// for example: first time to flush resolvedTs but cannot get the flush lock, return changefeed level checkpointTs is safe
return atomic.LoadUint64(&m.changeFeedCheckpointTs)
}

func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
}

type drawbackMsg struct {
Expand Down
39 changes: 25 additions & 14 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ var _ = check.Suite(&managerSuite{})

type checkSink struct {
*check.C
rows []*model.RowChangedEvent
rows map[model.TableID][]*model.RowChangedEvent
rowsMu sync.Mutex
lastResolvedTs uint64
lastResolvedTs map[model.TableID]uint64
}

func newCheckSink(c *check.C) *checkSink {
return &checkSink{
C: c,
rows: make(map[model.TableID][]*model.RowChangedEvent),
lastResolvedTs: make(map[model.TableID]uint64),
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
Expand All @@ -47,7 +55,9 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab
func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
c.rows = append(c.rows, rows...)
for _, row := range rows {
c.rows[row.Table.TableID] = append(c.rows[row.Table.TableID], row)
}
return nil
}

Expand All @@ -59,20 +69,21 @@ func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
var newRows []*model.RowChangedEvent
for _, row := range c.rows {
if row.CommitTs <= c.lastResolvedTs {
return c.lastResolvedTs, errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs)
rows := c.rows[tableID]
for _, row := range rows {
if row.CommitTs <= c.lastResolvedTs[tableID] {
return c.lastResolvedTs[tableID], errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs)
}
if row.CommitTs > resolvedTs {
newRows = append(newRows, row)
}
}

c.Assert(c.lastResolvedTs, check.LessEqual, resolvedTs)
c.lastResolvedTs = resolvedTs
c.rows = newRows
c.Assert(c.lastResolvedTs[tableID], check.LessEqual, resolvedTs)
c.lastResolvedTs[tableID] = resolvedTs
c.rows[tableID] = newRows

return c.lastResolvedTs, nil
return c.lastResolvedTs[tableID], nil
}

func (c *checkSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand All @@ -92,7 +103,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 10
rowNum := 100
Expand Down Expand Up @@ -147,7 +158,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 200
var wg sync.WaitGroup
Expand Down Expand Up @@ -234,7 +245,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer cancel()

errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)

tableID := int64(49)
Expand All @@ -255,7 +266,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
func BenchmarkManagerFlushing(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(nil), errCh, 0, "", "")

// Init table sinks.
goroutineNum := 2000
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab

err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...)
if err != nil {
return t.manager.getCheckpointTs(), errors.Trace(err)
return t.manager.getCheckpointTs(tableID), errors.Trace(err)
}
atomic.StoreUint64(&t.emittedTs, resolvedTs)
ckpt, err := t.flushRedoLogs(ctx, resolvedTs)
Expand All @@ -85,7 +85,7 @@ func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint6
if t.redoManager.Enabled() {
err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs)
if err != nil {
return t.manager.getCheckpointTs(), err
return t.manager.getCheckpointTs(t.tableID), err
}
}
return 0, nil
Expand Down

0 comments on commit f37b88c

Please sign in to comment.