Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): use meta flush interval in redo ddl manager (#9999) #10006

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,26 @@ func newLogManager(

func (m *logManager) Run(ctx context.Context) error {
defer m.close()
return m.bgUpdateLog(ctx)
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
return m.bgUpdateLog(ctx, m.getFlushDuration())
}

func (m *logManager) getFlushDuration() time.Duration {
flushIntervalInMs := m.cfg.FlushIntervalInMs
if m.cfg.LogType == redo.RedoDDLLogFileType {
flushIntervalInMs = m.cfg.MetaFlushIntervalInMs
}
return time.Duration(flushIntervalInMs) * time.Millisecond
}

// Enabled returns whether this log manager is enabled
Expand Down Expand Up @@ -468,15 +487,14 @@ func (m *logManager) onResolvedTsMsg(tableID model.TableID, resolvedTs model.Ts)
}
}

func (m *logManager) bgUpdateLog(ctx context.Context) error {
func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duration) error {
m.releaseMemoryCbs = make([]func(), 0, 1024)
flushIntervalInMs := m.cfg.FlushIntervalInMs
ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
ticker := time.NewTicker(flushDuration)
defer ticker.Stop()
log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Int64("flushIntervalInMs", flushIntervalInMs))
zap.Duration("flushIntervalInMs", flushDuration))

var err error
// logErrCh is used to retrieve errors from log flushing goroutines.
Expand Down
287 changes: 6 additions & 281 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,296 +16,20 @@ package redo
import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/blackhole"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func checkResolvedTs(t *testing.T, mgr *logManager, expectedRts uint64) {
time.Sleep(time.Duration(redo.MinFlushIntervalInMs+200) * time.Millisecond)
resolvedTs := uint64(math.MaxUint64)
mgr.rtsMap.Range(func(tableID any, value any) bool {
v, ok := value.(*statefulRts)
require.True(t, ok)
ts := v.getFlushed()
if ts < resolvedTs {
resolvedTs = ts
}
return true
})
require.Equal(t, expectedRts, resolvedTs)
}

func TestConsistentConfig(t *testing.T) {
t.Parallel()
levelCases := []struct {
level string
valid bool
}{
{"none", true},
{"eventual", true},
{"NONE", false},
{"", false},
}
for _, lc := range levelCases {
require.Equal(t, lc.valid, redo.IsValidConsistentLevel(lc.level))
}

levelEnableCases := []struct {
level string
consistent bool
}{
{"invalid-level", false},
{"none", false},
{"eventual", true},
}
for _, lc := range levelEnableCases {
require.Equal(t, lc.consistent, redo.IsConsistentEnabled(lc.level))
}

storageCases := []struct {
storage string
valid bool
}{
{"local", true},
{"nfs", true},
{"s3", true},
{"blackhole", true},
{"Local", false},
{"", false},
}
for _, sc := range storageCases {
require.Equal(t, sc.valid, redo.IsValidConsistentStorage(sc.storage))
}

s3StorageCases := []struct {
storage string
s3Enabled bool
}{
{"local", false},
{"nfs", false},
{"s3", true},
{"blackhole", false},
}
for _, sc := range s3StorageCases {
require.Equal(t, sc.s3Enabled, redo.IsExternalStorage(sc.storage))
}
}

// TestLogManagerInProcessor tests how redo log manager is used in processor.
func TestLogManagerInProcessor(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testWriteDMLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, cfg)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
dmlMgr.Run(ctx)
}()

// check emit row changed events can move forward resolved ts
spans := []model.TableID{53, 55, 57, 59}

startTs := uint64(100)
for _, span := range spans {
dmlMgr.AddTable(span, startTs)
}
testCases := []struct {
span model.TableID
rows []*model.RowChangedEvent
}{
{
span: 53,
rows: []*model.RowChangedEvent{
{CommitTs: 120, Table: &model.TableName{TableID: 53}},
{CommitTs: 125, Table: &model.TableName{TableID: 53}},
{CommitTs: 130, Table: &model.TableName{TableID: 53}},
},
},
{
span: 55,
rows: []*model.RowChangedEvent{
{CommitTs: 130, Table: &model.TableName{TableID: 55}},
{CommitTs: 135, Table: &model.TableName{TableID: 55}},
},
},
{
span: 57,
rows: []*model.RowChangedEvent{
{CommitTs: 130, Table: &model.TableName{TableID: 57}},
},
},
{
span: 59,
rows: []*model.RowChangedEvent{
{CommitTs: 128, Table: &model.TableName{TableID: 59}},
{CommitTs: 130, Table: &model.TableName{TableID: 59}},
{CommitTs: 133, Table: &model.TableName{TableID: 59}},
},
},
}
for _, tc := range testCases {
err := dmlMgr.EmitRowChangedEvents(ctx, tc.span, nil, tc.rows...)
require.NoError(t, err)
}

// check UpdateResolvedTs can move forward the resolved ts when there is not row event.
flushResolvedTs := uint64(150)
for _, span := range spans {
checkResolvedTs(t, dmlMgr.logManager, startTs)
err := dmlMgr.UpdateResolvedTs(ctx, span, flushResolvedTs)
require.NoError(t, err)
}
checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs)

// check remove table can work normally
removeTable := spans[len(spans)-1]
spans = spans[:len(spans)-1]
dmlMgr.RemoveTable(removeTable)
flushResolvedTs = uint64(200)
for _, span := range spans {
err := dmlMgr.UpdateResolvedTs(ctx, span, flushResolvedTs)
require.NoError(t, err)
}
checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs)

cancel()
wg.Wait()
}

testWriteDMLs("blackhole://", true)
storages := []string{
fmt.Sprintf("file://%s", t.TempDir()),
fmt.Sprintf("nfs://%s", t.TempDir()),
}
for _, storage := range storages {
testWriteDMLs(storage, true)
testWriteDMLs(storage, false)
}
}

// TestLogManagerInOwner tests how redo log manager is used in owner,
// where the redo log manager needs to handle DDL event only.
func TestLogManagerInOwner(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testWriteDDLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr, err := NewDDLManager(ctx, cfg, startTs)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ddlMgr.Run(ctx)
}()

require.Equal(t, startTs, ddlMgr.GetResolvedTs())
ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"}
err = ddlMgr.EmitDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Equal(t, startTs, ddlMgr.GetResolvedTs())

ddlMgr.UpdateResolvedTs(ctx, ddl.CommitTs)
checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs)

cancel()
wg.Wait()
}

testWriteDDLs("blackhole://", true)
storages := []string{
fmt.Sprintf("file://%s", t.TempDir()),
fmt.Sprintf("nfs://%s", t.TempDir()),
}
for _, storage := range storages {
testWriteDDLs(storage, true)
testWriteDDLs(storage, false)
}
}

// TestManagerError tests whether internal error in bgUpdateLog could be managed correctly.
func TestLogManagerError(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr, err := NewDMLManager(ctx, cfg)
require.NoError(t, err)
err = logMgr.writer.Close()
require.NoError(t, err)
logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := logMgr.Run(ctx)
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}()

testCases := []struct {
span model.TableID
rows []writer.RedoEvent
}{
{
span: 53,
rows: []writer.RedoEvent{
&model.RowChangedEvent{CommitTs: 120, Table: &model.TableName{TableID: 53}},
&model.RowChangedEvent{CommitTs: 125, Table: &model.TableName{TableID: 53}},
&model.RowChangedEvent{CommitTs: 130, Table: &model.TableName{TableID: 53}},
},
},
}
for _, tc := range testCases {
err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...)
require.NoError(t, err)
}
wg.Wait()
}

func BenchmarkBlackhole(b *testing.B) {
runBenchTest(b, "blackhole://", false)
}
Expand All @@ -323,11 +47,12 @@ func BenchmarkFileWriter(b *testing.B) {
func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, cfg)
require.Nil(b, err)
Expand Down
Loading