diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index a9a9911895d..3004f3d2eb7 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -366,6 +366,13 @@ func (m *ddlManager) tick( zap.Uint64("commitTs", nextDDL.CommitTs), zap.Uint64("checkpointTs", m.checkpointTs)) m.executingDDL = nextDDL + skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL) + if err != nil { + return nil, nil, errors.Trace(err) + } + if skip { + m.cleanCache(cleanMsg) + } } err := m.executeDDL(ctx) if err != nil { @@ -431,14 +438,6 @@ func (m *ddlManager) executeDDL(ctx context.Context) error { if m.executingDDL == nil { return nil } - skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL) - if err != nil { - return errors.Trace(err) - } - if skip { - m.cleanCache(cleanMsg) - return nil - } failpoint.Inject("ExecuteNotDone", func() { // This ddl will never finish executing. diff --git a/cdc/sink/ddlsink/mysql/async_ddl.go b/cdc/sink/ddlsink/mysql/async_ddl.go new file mode 100644 index 00000000000..1692a394f37 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/async_ddl.go @@ -0,0 +1,186 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/pingcap/log" + timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +var checkRunningAddIndexSQL = ` +SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY +FROM information_schema.ddl_jobs +WHERE DB_NAME = "%s" + AND TABLE_NAME = "%s" + AND JOB_TYPE LIKE "add index%%" + AND (STATE = "running" OR STATE = "queueing") +LIMIT 1; +` + +func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool { + return m.cfg.IsTiDB && ddl.Type == timodel.ActionAddIndex +} + +// asyncExecDDL executes ddl in async mode. +// this function only works in TiDB, because TiDB will save ddl jobs +// and execute them asynchronously even if ticdc crashed. +func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error { + done := make(chan error, 1) + // Use a longer timeout to ensure the add index ddl is sent to tidb before executing the next ddl. + tick := time.NewTimer(10 * time.Second) + defer tick.Stop() + log.Info("async exec add index ddl start", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + go func() { + if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { + log.Error("async exec add index ddl failed", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + done <- err + return + } + log.Info("async exec add index ddl done", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + done <- nil + }() + + select { + case <-ctx.Done(): + // if the ddl is canceled, we just return nil, if the ddl is not received by tidb, + // the downstream ddl is lost, because the checkpoint ts is forwarded. + log.Info("async add index ddl exits as canceled", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + return nil + case err := <-done: + // if the ddl is executed within 2 seconds, we just return the result to the caller. + return err + case <-tick.C: + // if the ddl is still running, we just return nil, + // then if the ddl is failed, the downstream ddl is lost. + // because the checkpoint ts is forwarded. + log.Info("async add index ddl is still running", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + return nil + } +} + +// Should always wait for async ddl done before executing the next ddl. +func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { + if !m.cfg.IsTiDB { + return + } + + tables := make(map[model.TableName]struct{}) + if ddl.TableInfo != nil { + tables[ddl.TableInfo.TableName] = struct{}{} + } + if ddl.PreTableInfo != nil { + tables[ddl.PreTableInfo.TableName] = struct{}{} + } + if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) { + return + } + + log.Debug("wait async exec ddl done", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Any("tables", tables), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + done := m.checkAsyncExecDDLDone(ctx, tables) + if done { + return + } + } + } +} + +func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool { + for table := range tables { + done := m.doCheck(ctx, table) + if !done { + return false + } + } + return true +} + +func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) { + if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok { + ddlType := v.(timodel.ActionType) + if ddlType == timodel.ActionAddIndex { + log.Panic("invalid ddl type in lastExecutedNormalDDLCache", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.String("ddlType", ddlType.String())) + } + return true + } + + ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table)) + if ret.Err() != nil { + log.Error("check async exec ddl failed", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(ret.Err())) + return true + } + var jobID, jobType, schemaState, schemaID, tableID, state, query string + if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Error("check async exec ddl failed", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) + } + return true + } + + log.Info("async ddl is still running", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.String("table", table.String()), + zap.String("jobID", jobID), + zap.String("jobType", jobType), + zap.String("schemaState", schemaState), + zap.String("schemaID", schemaID), + zap.String("tableID", tableID), + zap.String("state", state), + zap.String("query", query)) + return false +} diff --git a/cdc/sink/ddlsink/mysql/async_ddl_test.go b/cdc/sink/ddlsink/mysql/async_ddl_test.go new file mode 100644 index 00000000000..5f5802b30b6 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/async_ddl_test.go @@ -0,0 +1,168 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + "sync/atomic" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/stretchr/testify/require" +) + +func TestWaitAsynExecDone(t *testing.T) { + var dbIndex int32 = 0 + GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + atomic.AddInt32(&dbIndex, 1) + }() + if atomic.LoadInt32(&dbIndex) == 0 { + // test db + db, err := pmysql.MockTestDB(true) + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + mock.ExpectQuery("select tidb_version()"). + WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b")) + + // Case 1: there is a running add index job + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows( + sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}). + AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"), + ) + // Case 2: there is no running add index job + // Case 3: no permission to query ddl_jobs, TiDB will return empty result + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows( + sqlmock.NewRows(nil), + ) + // Case 4: query ddl_jobs failed + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnError( + errors.New("mock error"), + ) + + mock.ExpectClose() + return db, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sinkURI, err := url.Parse("mysql://root:@127.0.0.1:4000") + require.NoError(t, err) + replicateCfg := config.GetDefaultReplicaConfig() + ddlSink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateCfg) + require.NoError(t, err) + + table := model.TableName{Schema: "test", Table: "sbtest0"} + tables := make(map[model.TableName]struct{}) + tables[table] = struct{}{} + + // Test fast path, ddlSink.lastExecutedNormalDDLCache meet panic + ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionAddIndex) + require.Panics(t, func() { + ddlSink.checkAsyncExecDDLDone(ctx, tables) + }) + + // Test fast path, ddlSink.lastExecutedNormalDDLCache is hit + ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionCreateTable) + done := ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + // Clenup the cache, always check the async running state + ddlSink.lastExecutedNormalDDLCache.Remove(table) + + // Test has running async ddl job + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.False(t, done) + + // Test no running async ddl job + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + // Test ignore error + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + ddlSink.Close() +} + +func TestAsyncExecAddIndex(t *testing.T) { + ddlExecutionTime := time.Second * 15 + var dbIndex int32 = 0 + GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + atomic.AddInt32(&dbIndex, 1) + }() + if atomic.LoadInt32(&dbIndex) == 0 { + // test db + db, err := pmysql.MockTestDB(true) + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + mock.ExpectQuery("select tidb_version()"). + WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b")) + mock.ExpectBegin() + mock.ExpectExec("USE `test`;"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("Create index idx1 on test.t1(a)"). + WillDelayFor(ddlExecutionTime). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectClose() + return db, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sinkURI, err := url.Parse("mysql://127.0.0.1:4000") + require.Nil(t, err) + rc := config.GetDefaultReplicaConfig() + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, rc) + + require.Nil(t, err) + + ddl1 := &model.DDLEvent{ + StartTs: 1000, + CommitTs: 1010, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", + Table: "t1", + }, + }, + Type: timodel.ActionAddIndex, + Query: "Create index idx1 on test.t1(a)", + } + start := time.Now() + err = sink.WriteDDLEvent(ctx, ddl1) + require.Nil(t, err) + require.True(t, time.Since(start) < ddlExecutionTime) + require.True(t, time.Since(start) >= 10*time.Second) + sink.Close() +} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go index 7c56f19694c..3a90e970d98 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -20,7 +20,7 @@ import ( "net/url" "time" - cerrors "github.com/pingcap/errors" + lru "github.com/hashicorp/golang-lru" "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" @@ -28,7 +28,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/retry" @@ -61,6 +61,11 @@ type DDLSink struct { // statistics is the statistics of this sink. // We use it to record the DDL count. statistics *metrics.Statistics + + // lastExecutedNormalDDLCache is a fast path to check whether aync DDL of a table + // is running in downstream. + // map: model.TableName -> timodel.ActionType + lastExecutedNormalDDLCache *lru.Cache } // NewDDLSink creates a new DDLSink. @@ -91,11 +96,16 @@ func NewDDLSink( return nil, err } + lruCache, err := lru.New(1024) + if err != nil { + return nil, err + } m := &DDLSink{ - id: changefeedID, - db: db, - cfg: cfg, - statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + id: changefeedID, + db: db, + cfg: cfg, + statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + lastExecutedNormalDDLCache: lruCache, } log.Info("MySQL DDL sink is created", @@ -106,10 +116,18 @@ func NewDDLSink( // WriteDDLEvent writes a DDL event to the mysql database. func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - if ddl.Type == timodel.ActionAddIndex && m.cfg.IsTiDB { - return m.asyncExecAddIndexDDLIfTimeout(ctx, ddl) + m.waitAsynExecDone(ctx, ddl) + + if m.shouldAsyncExecDDL(ddl) { + m.lastExecutedNormalDDLCache.Remove(ddl.TableInfo.TableName) + return m.asyncExecDDL(ctx, ddl) + } + + if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { + return errors.Trace(err) } - return m.execDDLWithMaxRetries(ctx, ddl) + m.lastExecutedNormalDDLCache.Add(ddl.TableInfo.TableName, ddl.Type) + return nil } func (m *DDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { @@ -217,7 +235,7 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { zap.Duration("duration", time.Since(start)), zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) + return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) } log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), @@ -257,55 +275,3 @@ func (m *DDLSink) Close() { } } } - -// asyncExecAddIndexDDLIfTimeout executes ddl in async mode. -// this function only works in TiDB, because TiDB will save ddl jobs -// and execute them asynchronously even if ticdc crashed. -func (m *DDLSink) asyncExecAddIndexDDLIfTimeout(ctx context.Context, ddl *model.DDLEvent) error { - done := make(chan error, 1) - // wait for 2 seconds at most - tick := time.NewTimer(2 * time.Second) - defer tick.Stop() - log.Info("async exec add index ddl start", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - go func() { - if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { - log.Error("async exec add index ddl failed", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - done <- err - return - } - log.Info("async exec add index ddl done", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - done <- nil - }() - - select { - case <-ctx.Done(): - // if the ddl is canceled, we just return nil, if the ddl is not received by tidb, - // the downstream ddl is lost, because the checkpoint ts is forwarded. - log.Info("async add index ddl exits as canceled", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - return nil - case err := <-done: - // if the ddl is executed within 2 seconds, we just return the result to the caller. - return err - case <-tick.C: - // if the ddl is still running, we just return nil, - // then if the ddl is failed, the downstream ddl is lost. - // because the checkpoint ts is forwarded. - log.Info("async add index ddl is still running", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - return nil - } -} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go index a2f2c1ed53a..602d0da8209 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go @@ -17,9 +17,7 @@ import ( "context" "database/sql" "net/url" - "sync/atomic" "testing" - "time" "github.com/DATA-DOG/go-sqlmock" dmysql "github.com/go-sql-driver/mysql" @@ -144,62 +142,3 @@ func TestNeedSwitchDB(t *testing.T) { require.Equal(t, tc.needSwitch, needSwitchDB(tc.ddl)) } } - -func TestAsyncExecAddIndex(t *testing.T) { - ddlExecutionTime := time.Millisecond * 3000 - var dbIndex int32 = 0 - GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { - defer func() { - atomic.AddInt32(&dbIndex, 1) - }() - if atomic.LoadInt32(&dbIndex) == 0 { - // test db - db, err := pmysql.MockTestDB(true) - require.Nil(t, err) - return db, nil - } - // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) - mock.ExpectQuery("select tidb_version()"). - WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b")) - mock.ExpectBegin() - mock.ExpectExec("USE `test`;"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("Create index idx1 on test.t1(a)"). - WillDelayFor(ddlExecutionTime). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - mock.ExpectClose() - return db, nil - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - changefeed := "test-changefeed" - sinkURI, err := url.Parse("mysql://127.0.0.1:4000") - require.Nil(t, err) - rc := config.GetDefaultReplicaConfig() - sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID(changefeed), sinkURI, rc) - - require.Nil(t, err) - - ddl1 := &model.DDLEvent{ - StartTs: 1000, - CommitTs: 1010, - TableInfo: &model.TableInfo{ - TableName: model.TableName{ - Schema: "test", - Table: "t1", - }, - }, - Type: timodel.ActionAddIndex, - Query: "Create index idx1 on test.t1(a)", - } - start := time.Now() - err = sink.WriteDDLEvent(ctx, ddl1) - require.Nil(t, err) - require.True(t, time.Since(start) < ddlExecutionTime) - require.True(t, time.Since(start) >= 2*time.Second) - sink.Close() -} diff --git a/pkg/errors/reexport.go b/pkg/errors/reexport.go index 5204382c134..b619fb3878a 100644 --- a/pkg/errors/reexport.go +++ b/pkg/errors/reexport.go @@ -36,4 +36,6 @@ var ( Annotate = perrors.Annotate // Annotatef is a shortcut for github.com/pingcap/errors.Annotatef. Annotatef = perrors.Annotatef + // WithMessage is a shortcut for github.com/pingcap/errors.WithMessage. + WithMessage = perrors.WithMessage )