Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink, ddl(ticdc): support add index ddl in downstream (#11476) #11480

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ func (m *ddlManager) tick(
zap.Uint64("commitTs", nextDDL.CommitTs),
zap.Uint64("checkpointTs", m.checkpointTs))
m.executingDDL = nextDDL
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
if err != nil {
return nil, nil, errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
}
}
err := m.executeDDL(ctx)
if err != nil {
Expand Down Expand Up @@ -431,14 +438,6 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
if m.executingDDL == nil {
return nil
}
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
if err != nil {
return errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
return nil
}

failpoint.Inject("ExecuteNotDone", func() {
// This ddl will never finish executing.
Expand Down
186 changes: 186 additions & 0 deletions cdc/sink/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

var checkRunningAddIndexSQL = `
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
FROM information_schema.ddl_jobs
WHERE DB_NAME = "%s"
AND TABLE_NAME = "%s"
AND JOB_TYPE LIKE "add index%%"
AND (STATE = "running" OR STATE = "queueing")
LIMIT 1;
`

func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool {
return m.cfg.IsTiDB && ddl.Type == timodel.ActionAddIndex
}

// asyncExecDDL executes ddl in async mode.
// this function only works in TiDB, because TiDB will save ddl jobs
// and execute them asynchronously even if ticdc crashed.
func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error {
done := make(chan error, 1)
// Use a longer timeout to ensure the add index ddl is sent to tidb before executing the next ddl.
tick := time.NewTimer(10 * time.Second)
defer tick.Stop()
log.Info("async exec add index ddl start",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
go func() {
if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil {
log.Error("async exec add index ddl failed",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- err
return
}
log.Info("async exec add index ddl done",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- nil
}()

select {
case <-ctx.Done():
// if the ddl is canceled, we just return nil, if the ddl is not received by tidb,
// the downstream ddl is lost, because the checkpoint ts is forwarded.
log.Info("async add index ddl exits as canceled",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
case err := <-done:
// if the ddl is executed within 2 seconds, we just return the result to the caller.
return err
case <-tick.C:
// if the ddl is still running, we just return nil,
// then if the ddl is failed, the downstream ddl is lost.
// because the checkpoint ts is forwarded.
log.Info("async add index ddl is still running",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
}
}

// Should always wait for async ddl done before executing the next ddl.
func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) {
if !m.cfg.IsTiDB {
return
}

tables := make(map[model.TableName]struct{})
if ddl.TableInfo != nil {
tables[ddl.TableInfo.TableName] = struct{}{}
}
if ddl.PreTableInfo != nil {
tables[ddl.PreTableInfo.TableName] = struct{}{}
}
if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) {
return
}

log.Debug("wait async exec ddl done",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Any("tables", tables),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
done := m.checkAsyncExecDDLDone(ctx, tables)
if done {
return
}
}
}
}

func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool {
for table := range tables {
done := m.doCheck(ctx, table)
if !done {
return false
}
}
return true
}

func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) {
if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok {
ddlType := v.(timodel.ActionType)
if ddlType == timodel.ActionAddIndex {
log.Panic("invalid ddl type in lastExecutedNormalDDLCache",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("ddlType", ddlType.String()))
}
return true
}

ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
if ret.Err() != nil {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(ret.Err()))
return true
}
var jobID, jobType, schemaState, schemaID, tableID, state, query string
if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
}
return true
}

log.Info("async ddl is still running",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("table", table.String()),
zap.String("jobID", jobID),
zap.String("jobType", jobType),
zap.String("schemaState", schemaState),
zap.String("schemaID", schemaID),
zap.String("tableID", tableID),
zap.String("state", state),
zap.String("query", query))
return false
}
168 changes: 168 additions & 0 deletions cdc/sink/ddlsink/mysql/async_ddl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"errors"
"fmt"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/stretchr/testify/require"
)

func TestWaitAsynExecDone(t *testing.T) {
var dbIndex int32 = 0
GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) {
defer func() {
atomic.AddInt32(&dbIndex, 1)
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))

// Case 1: there is a running add index job
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}).
AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"),
)
// Case 2: there is no running add index job
// Case 3: no permission to query ddl_jobs, TiDB will return empty result
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
sqlmock.NewRows(nil),
)
// Case 4: query ddl_jobs failed
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnError(
errors.New("mock error"),
)

mock.ExpectClose()
return db, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sinkURI, err := url.Parse("mysql://root:@127.0.0.1:4000")
require.NoError(t, err)
replicateCfg := config.GetDefaultReplicaConfig()
ddlSink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateCfg)
require.NoError(t, err)

table := model.TableName{Schema: "test", Table: "sbtest0"}
tables := make(map[model.TableName]struct{})
tables[table] = struct{}{}

// Test fast path, ddlSink.lastExecutedNormalDDLCache meet panic
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionAddIndex)
require.Panics(t, func() {
ddlSink.checkAsyncExecDDLDone(ctx, tables)
})

// Test fast path, ddlSink.lastExecutedNormalDDLCache is hit
ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionCreateTable)
done := ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

// Clenup the cache, always check the async running state
ddlSink.lastExecutedNormalDDLCache.Remove(table)

// Test has running async ddl job
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.False(t, done)

// Test no running async ddl job
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

// Test ignore error
done = ddlSink.checkAsyncExecDDLDone(ctx, tables)
require.True(t, done)

ddlSink.Close()
}

func TestAsyncExecAddIndex(t *testing.T) {
ddlExecutionTime := time.Second * 15
var dbIndex int32 = 0
GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) {
defer func() {
atomic.AddInt32(&dbIndex, 1)
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("Create index idx1 on test.t1(a)").
WillDelayFor(ddlExecutionTime).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectClose()
return db, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, rc)

require.Nil(t, err)

ddl1 := &model.DDLEvent{
StartTs: 1000,
CommitTs: 1010,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
},
},
Type: timodel.ActionAddIndex,
Query: "Create index idx1 on test.t1(a)",
}
start := time.Now()
err = sink.WriteDDLEvent(ctx, ddl1)
require.Nil(t, err)
require.True(t, time.Since(start) < ddlExecutionTime)
require.True(t, time.Since(start) >= 10*time.Second)
sink.Close()
}
Loading
Loading