Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: mysql sink manage checkpoint per table #3645

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
ba2c2fd
refine Sink interface
sdojjy Nov 25, 2021
26070d9
fix ut
sdojjy Nov 25, 2021
f388718
fix lint
sdojjy Nov 25, 2021
51b754a
fix lint
sdojjy Nov 25, 2021
112b4e0
sink manager manage checkpointTs per table
sdojjy Nov 25, 2021
bf10cc1
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 25, 2021
ce8598f
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
aaa2a9d
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
6bccf2f
Merge remote-tracking branch 'origin/sink-manager-manage-checkpoint-p…
sdojjy Nov 26, 2021
dbd09d3
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
1fd207d
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 26, 2021
e189647
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
fc0d687
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
4008fce
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 26, 2021
e938ba9
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 26, 2021
edee7b1
mysql sink manage checkpoint per table
sdojjy Nov 27, 2021
5f8b18f
fix lint
sdojjy Nov 27, 2021
0ac647d
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Nov 27, 2021
9667dc1
fix lint
sdojjy Nov 27, 2021
3421986
fix lint
sdojjy Nov 29, 2021
594593e
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
78f2628
address comment
sdojjy Nov 30, 2021
d87f6e7
address comment
sdojjy Nov 30, 2021
6e34487
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
8bfacd1
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Nov 30, 2021
d3444c1
fix ut
sdojjy Nov 30, 2021
6cd2eaa
fix ut
sdojjy Nov 30, 2021
2227e75
fix ut
sdojjy Nov 30, 2021
c827eee
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Nov 30, 2021
0683408
fix ut
sdojjy Nov 30, 2021
e75d9d0
Merge remote-tracking branch 'upstream/master' into sink-manager-mana…
sdojjy Nov 30, 2021
fefb6bf
Merge branch 'sink-manager-manage-checkpoint-per-table' into buffer_s…
sdojjy Nov 30, 2021
537456a
fix ut
sdojjy Nov 30, 2021
f21a91f
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Nov 30, 2021
9af09b3
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Nov 30, 2021
8e3d3c9
Merge remote-tracking branch 'upstream/master' into mysql-sink-manage…
sdojjy Nov 30, 2021
237aeea
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Nov 30, 2021
e243794
update buffer sink global checkpointTs
sdojjy Dec 1, 2021
07d0c99
fix sink_hang integration test
sdojjy Dec 2, 2021
853dd30
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Dec 2, 2021
a9f8020
Merge branch 'buffer_sink_manage_checkpointTs_per_table' of github.co…
sdojjy Dec 2, 2021
0356822
Merge branch 'buffer_sink_manage_checkpointTs_per_table' into mysql-s…
sdojjy Dec 2, 2021
0657cd2
fix ut
sdojjy Dec 2, 2021
e23efca
fix ut
sdojjy Dec 2, 2021
cf8efd7
fix checkpoint
sdojjy Dec 2, 2021
1f17ed3
fix lint
sdojjy Dec 2, 2021
275cab5
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 2, 2021
0b4d728
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 3, 2021
746a5c7
flush resolvedTs per table
sdojjy Dec 3, 2021
b554602
address comment
sdojjy Dec 3, 2021
05f29aa
address comment
sdojjy Dec 3, 2021
c3346e2
fix integration test
sdojjy Dec 3, 2021
a418e5a
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 3, 2021
adefd60
add ut test
sdojjy Dec 3, 2021
dd14d12
Merge remote-tracking branch 'origin/mysql-sink-manage-checkpoint-per…
sdojjy Dec 3, 2021
800174b
fix lint
sdojjy Dec 3, 2021
06ad1d8
Merge branch 'master' into buffer_sink_manage_checkpointTs_per_table
sdojjy Dec 4, 2021
0b67675
add ut cases
sdojjy Dec 4, 2021
40a64e0
Merge branch 'buffer_sink_manage_checkpointTs_per_table' into mysql-s…
sdojjy Dec 6, 2021
025a96e
add ut case
sdojjy Dec 6, 2021
5b69e4a
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 6, 2021
46abbe0
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 6, 2021
204a9e2
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 7, 2021
89af4a9
Update cdc/sink/mysql.go
sdojjy Dec 8, 2021
1268e1d
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 8, 2021
b559140
address comment
sdojjy Dec 8, 2021
48b51cc
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
ti-chi-bot Dec 8, 2021
b99a8c1
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
ti-chi-bot Dec 8, 2021
175643f
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
ti-chi-bot Dec 8, 2021
971f3fc
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
ti-chi-bot Dec 8, 2021
1f78944
kafka consumer flush resovedTs with table ID
sdojjy Dec 9, 2021
c4d2ed7
kafka consumer flush resovedTs with table ID
sdojjy Dec 9, 2021
77b9755
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 9, 2021
8e65f06
kafka consumer flush resovedTs with table ID
sdojjy Dec 9, 2021
0368bc7
Merge remote-tracking branch 'origin/mysql-sink-manage-checkpoint-per…
sdojjy Dec 9, 2021
e330b60
kafka consumer flush resovedTs with table ID
sdojjy Dec 9, 2021
c87c972
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
sdojjy Dec 9, 2021
40161f5
kafka consumer flush resovedTs with table ID
sdojjy Dec 9, 2021
a614acf
Merge remote-tracking branch 'origin/mysql-sink-manage-checkpoint-per…
sdojjy Dec 9, 2021
da216bd
Merge branch 'master' into mysql-sink-manage-checkpoint-per-table
ti-chi-bot Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"sort"
"sync"
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
Expand Down Expand Up @@ -55,7 +54,6 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
type UnresolvedTxnCache struct {
unresolvedTxnsMu sync.Mutex
unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs
checkpointTs uint64
}

// NewUnresolvedTxnCache returns a new UnresolvedTxnCache
Expand Down Expand Up @@ -103,32 +101,27 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha

// Resolved returns resolved txns according to resolvedTs
// The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing
func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn {
if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) {
return nil
}

func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
if len(c.unresolvedTxns) == 0 {
return nil
return nil, nil
}

_, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns)
return resolvedTxnsMap
}

// UpdateCheckpoint updates the checkpoint ts
func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) {
atomic.StoreUint64(&c.checkpointTs, checkpointTs)
return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns)
}

func splitResolvedTxn(
resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
minTs = resolvedTs
flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
for tableID, txns := range unresolvedTxns {
v, ok := resolvedTsMap.Load(tableID)
if !ok {
continue
}
resolvedTs := v.(uint64)
i := sort.Search(len(txns), func(i int) bool {
return txns[i].commitTs > resolvedTs
})
Expand All @@ -154,9 +147,7 @@ func splitResolvedTxn(
}
}
resolvedRowsMap[tableID] = resolvedTxns
if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs {
minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs
}
flushedResolvedTsMap[tableID] = resolvedTs
}
return
}
65 changes: 42 additions & 23 deletions cdc/sink/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@ package common

import (
"sort"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type SinkCommonSuite struct{}
func TestSplitResolvedTxn(test *testing.T) {
defer testleak.AfterTestT(test)()

func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&SinkCommonSuite{})

func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
defer testleak.AfterTest(c)()
testCases := [][]struct {
input []*model.RowChangedEvent
resolvedTs model.Ts
expected map[model.TableID][]*model.SingleTableTxn
input []*model.RowChangedEvent
resolvedTsMap map[model.TableID]uint64
expected map[model.TableID][]*model.SingleTableTxn
}{{{ // Testing basic transaction collocation, no txns with the same committs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -45,7 +41,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -59,7 +58,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}},
},
resolvedTs: 13,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
3: uint64(13),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
Expand All @@ -76,17 +79,24 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
}}},
},
}}, {{ // Testing the short circuit path
input: []*model.RowChangedEvent{},
resolvedTs: 6,
expected: nil,
input: []*model.RowChangedEvent{},
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
3: uint64(13),
},
expected: nil,
}, {
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
expected: map[model.TableID][]*model.SingleTableTxn{},
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{},
}}, {{ // Testing the txns with the same commitTs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -99,7 +109,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -119,7 +132,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}},
},
resolvedTs: 13,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
Expand All @@ -144,7 +160,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
cache := NewUnresolvedTxnCache()
for _, t := range tc {
cache.Append(nil, t.input...)
resolved := cache.Resolved(t.resolvedTs)
resolvedTsMap := sync.Map{}
for tableID, ts := range t.resolvedTsMap {
resolvedTsMap.Store(tableID, ts)
}
_, resolved := cache.Resolved(&resolvedTsMap)
for tableID, txns := range resolved {
sort.Slice(txns, func(i, j int) bool {
if txns[i].CommitTs != txns[j].CommitTs {
Expand All @@ -154,8 +174,7 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
})
resolved[tableID] = txns
}
c.Assert(resolved, check.DeepEquals, t.expected,
check.Commentf("%s", cmp.Diff(resolved, t.expected)))
require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected))
}
}
}
70 changes: 32 additions & 38 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

dmysql "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -58,10 +57,10 @@ type mysqlSink struct {
filter *tifilter.Filter
cyclic *cyclic.Cyclic

txnCache *common.UnresolvedTxnCache
workers []*mysqlSinkWorker
resolvedTs uint64
maxResolvedTs uint64
txnCache *common.UnresolvedTxnCache
workers []*mysqlSinkWorker
tableCheckpointTs sync.Map
tableMaxResolvedTs sync.Map

execWaitNotifier *notify.Notifier
resolvedNotifier *notify.Notifier
Expand Down Expand Up @@ -208,12 +207,10 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
// FlushRowChangedEvents will flush all received events, we don't allow mysql
// sink to receive events before resolving
func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if atomic.LoadUint64(&s.maxResolvedTs) < resolvedTs {
atomic.StoreUint64(&s.maxResolvedTs, resolvedTs)
v, ok := s.tableMaxResolvedTs.Load(tableID)
if !ok || v.(uint64) < resolvedTs {
s.tableMaxResolvedTs.Store(tableID, resolvedTs)
}
// resolvedTs can be fallen back, such as a new table is added into this sink
// with a smaller start-ts
atomic.StoreUint64(&s.resolvedTs, resolvedTs)
s.resolvedNotifier.Notify()

// check and throw error
Expand All @@ -223,13 +220,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
default:
}

checkpointTs := resolvedTs
for _, worker := range s.workers {
workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs)
if workerCheckpointTs < checkpointTs {
checkpointTs = workerCheckpointTs
}
}
checkpointTs := s.getTableCheckpointTs(tableID)
s.statistics.PrintStatus(ctx)
return checkpointTs, nil
}
Expand All @@ -246,13 +237,12 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.
return
case <-receiver.C:
}
resolvedTs := atomic.LoadUint64(&s.resolvedTs)
resolvedTxnsMap := s.txnCache.Resolved(resolvedTs)
flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs)
if len(resolvedTxnsMap) == 0 {
for _, worker := range s.workers {
atomic.StoreUint64(&worker.checkpointTs, resolvedTs)
}
s.txnCache.UpdateCheckpoint(resolvedTs)
s.tableMaxResolvedTs.Range(func(key, value interface{}) bool {
s.tableCheckpointTs.Store(key, value)
return true
})
continue
}

Expand All @@ -264,10 +254,9 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.
}

s.dispatchAndExecTxns(ctx, resolvedTxnsMap)
for _, worker := range s.workers {
atomic.StoreUint64(&worker.checkpointTs, resolvedTs)
for tableID, resolvedTs := range flushedResolvedTsMap {
s.tableCheckpointTs.Store(tableID, resolvedTs)
}
s.txnCache.UpdateCheckpoint(resolvedTs)
}
}

Expand Down Expand Up @@ -482,12 +471,20 @@ func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
maxResolvedTs, ok := s.tableMaxResolvedTs.Load(tableID)
log.Warn("Barrier doesn't return in time, may be stuck",
zap.Uint64("resolved-ts", atomic.LoadUint64(&s.maxResolvedTs)),
zap.Uint64("checkpoint-ts", s.checkpointTs()))
zap.Int64("tableID", tableID),
zap.Bool("has resolvedTs", ok),
zap.Any("resolvedTs", maxResolvedTs),
zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID)))
default:
maxResolvedTs := atomic.LoadUint64(&s.maxResolvedTs)
if s.checkpointTs() >= maxResolvedTs {
v, ok := s.tableMaxResolvedTs.Load(tableID)
if !ok {
log.Info("No table resolvedTs is found", zap.Int64("table-id", tableID))
return nil
}
maxResolvedTs := v.(uint64)
if s.getTableCheckpointTs(tableID) >= maxResolvedTs {
return nil
}
checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs)
Expand All @@ -503,15 +500,12 @@ func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
}
}

func (s *mysqlSink) checkpointTs() uint64 {
checkpointTs := atomic.LoadUint64(&s.resolvedTs)
for _, worker := range s.workers {
workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs)
if workerCheckpointTs < checkpointTs {
checkpointTs = workerCheckpointTs
}
func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 {
v, ok := s.tableCheckpointTs.Load(tableID)
if ok {
return v.(uint64)
}
return checkpointTs
return uint64(0)
}

func logDMLTxnErr(err error) error {
Expand Down
Loading