From 57b9ae64c68be08c4b32df283d18854d8c417319 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 11 Dec 2020 19:06:22 +0800 Subject: [PATCH] cherry pick #1185 to release-4.0 Signed-off-by: ti-srebot --- cdc/processor.go | 24 ++- cdc/puller/entry_sorter.go | 5 +- cdc/puller/puller_test.go | 219 +++++++++++++++++++++++ cdc/sink/mq.go | 6 +- cdc/sink/mysql.go | 19 +- cdc/sink/mysql_test.go | 287 ++++++++++++++++++++++++++++++- cdc/sink/producer/kafka/kafka.go | 6 +- errors.toml | 5 + pkg/errors/errors.go | 1 + pkg/notify/notify.go | 13 +- pkg/notify/notify_test.go | 28 ++- 11 files changed, 593 insertions(+), 20 deletions(-) create mode 100644 cdc/puller/puller_test.go diff --git a/cdc/processor.go b/cdc/processor.go index 59001ef130f..477470cd773 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -194,6 +194,19 @@ func newProcessor( sinkEmittedResolvedNotifier := new(notify.Notifier) localResolvedNotifier := new(notify.Notifier) localCheckpointTsNotifier := new(notify.Notifier) + sinkEmittedResolvedReceiver, err := sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + localResolvedReceiver, err := localResolvedNotifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + localCheckpointTsReceiver, err := localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + p := &processor{ id: uuid.New().String(), limitter: limitter, @@ -216,14 +229,14 @@ func newProcessor( output: make(chan *model.PolymorphicEvent, defaultOutputChanSize), sinkEmittedResolvedNotifier: sinkEmittedResolvedNotifier, - sinkEmittedResolvedReceiver: sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond), + sinkEmittedResolvedReceiver: sinkEmittedResolvedReceiver, localResolvedNotifier: localResolvedNotifier, - localResolvedReceiver: localResolvedNotifier.NewReceiver(50 * time.Millisecond), + localResolvedReceiver: localResolvedReceiver, checkpointTs: checkpointTs, localCheckpointTsNotifier: localCheckpointTsNotifier, - localCheckpointTsReceiver: localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond), + localCheckpointTsReceiver: localCheckpointTsReceiver, tables: make(map[int64]*tableInfo), markTableIDs: make(map[int64]struct{}), @@ -674,9 +687,12 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { lastResolvedTs uint64 watchKey = kv.GetEtcdKeyJob(p.changefeedID) globalResolvedTsNotifier = new(notify.Notifier) - globalResolvedTsReceiver = globalResolvedTsNotifier.NewReceiver(1 * time.Second) ) defer globalResolvedTsNotifier.Close() + globalResolvedTsReceiver, err := globalResolvedTsNotifier.NewReceiver(1 * time.Second) + if err != nil { + return err + } updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) diff --git a/cdc/puller/entry_sorter.go b/cdc/puller/entry_sorter.go index 194071a1b25..6f70094b800 100644 --- a/cdc/puller/entry_sorter.go +++ b/cdc/puller/entry_sorter.go @@ -111,7 +111,10 @@ func (es *EntrySorter) Run(ctx context.Context) error { } } }) - receiver := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond) + receiver, err := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond) + if err != nil { + return err + } defer es.resolvedNotifier.Close() errg.Go(func() error { var sorted []*model.PolymorphicEvent diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go new file mode 100644 index 00000000000..0a28627a38d --- /dev/null +++ b/cdc/puller/puller_test.go @@ -0,0 +1,219 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package puller + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/regionspan" + "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil" + "github.com/pingcap/ticdc/pkg/util/testleak" + tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/tikv" + pd "github.com/tikv/pd/client" +) + +type pullerSuite struct { +} + +var _ = check.Suite(&pullerSuite{}) + +type mockPdClientForPullerTest struct { + pd.Client + clusterID uint64 +} + +func (mc *mockPdClientForPullerTest) GetClusterID(ctx context.Context) uint64 { + return mc.clusterID +} + +type mockCDCKVClient struct { + expectations chan *model.RegionFeedEvent +} + +type mockInjectedPuller struct { + Puller + cli *mockCDCKVClient +} + +func newMockCDCKVClient( + ctx context.Context, + pd pd.Client, + kvStorage tikv.Storage, + credential *security.Credential, +) kv.CDCKVClient { + return &mockCDCKVClient{ + expectations: make(chan *model.RegionFeedEvent, 1024), + } +} + +func (mc *mockCDCKVClient) EventFeed( + ctx context.Context, + span regionspan.ComparableSpan, + ts uint64, + enableOldValue bool, + lockResolver txnutil.LockResolver, + isPullerInit kv.PullerInitialization, + eventCh chan<- *model.RegionFeedEvent, +) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-mc.expectations: + if ev == nil { + return nil + } + eventCh <- ev + } + } +} + +func (mc *mockCDCKVClient) Close() error { + close(mc.expectations) + if len(mc.expectations) > 0 { + buf := bytes.NewBufferString("mockCDCKVClient: not all expectations were satisfied! Still waiting\n") + for e := range mc.expectations { + _, _ = buf.WriteString(fmt.Sprintf("%s", e.GetValue())) + } + return errors.New(buf.String()) + } + return nil +} + +func (mc *mockCDCKVClient) Returns(ev *model.RegionFeedEvent) { + mc.expectations <- ev +} + +func (s *pullerSuite) newPullerForTest( + c *check.C, + spans []regionspan.Span, + checkpointTs uint64, +) (*mockInjectedPuller, context.CancelFunc, *sync.WaitGroup, tidbkv.Storage) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + store, err := mockstore.NewMockStore() + c.Assert(err, check.IsNil) + enableOldValue := true + backupNewCDCKVClient := kv.NewCDCKVClient + kv.NewCDCKVClient = newMockCDCKVClient + defer func() { + kv.NewCDCKVClient = backupNewCDCKVClient + }() + pdCli := &mockPdClientForPullerTest{clusterID: uint64(1)} + plr := NewPuller(ctx, pdCli, nil /* credential */, store, checkpointTs, spans, nil /* limitter */, enableOldValue) + wg.Add(1) + go func() { + defer wg.Done() + err := plr.Run(ctx) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } + }() + c.Assert(err, check.IsNil) + mockPlr := &mockInjectedPuller{ + Puller: plr, + cli: plr.(*pullerImpl).kvCli.(*mockCDCKVClient), + } + return mockPlr, cancel, &wg, store +} + +func (s *pullerSuite) TestPullerResolvedForward(c *check.C) { + defer testleak.AfterTest(c)() + spans := []regionspan.Span{ + {Start: []byte("t_a"), End: []byte("t_e")}, + } + checkpointTs := uint64(996) + plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs) + + plr.cli.Returns(&model.RegionFeedEvent{ + Resolved: &model.ResolvedSpan{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_a"), End: []byte("t_c")}), + ResolvedTs: uint64(1001), + }, + }) + plr.cli.Returns(&model.RegionFeedEvent{ + Resolved: &model.ResolvedSpan{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_c"), End: []byte("t_d")}), + ResolvedTs: uint64(1002), + }, + }) + plr.cli.Returns(&model.RegionFeedEvent{ + Resolved: &model.ResolvedSpan{ + Span: regionspan.ToComparableSpan(regionspan.Span{Start: []byte("t_d"), End: []byte("t_e")}), + ResolvedTs: uint64(1000), + }, + }) + ev := <-plr.Output() + c.Assert(ev.OpType, check.Equals, model.OpTypeResolved) + c.Assert(ev.CRTs, check.Equals, uint64(1000)) + c.Assert(plr.IsInitialized(), check.IsTrue) + err := retry.Run(time.Millisecond*10, 10, func() error { + ts := plr.GetResolvedTs() + if ts != uint64(1000) { + return errors.Errorf("resolved ts %d of puller does not forward to 1000", ts) + } + return nil + }) + c.Assert(err, check.IsNil) + + store.Close() + cancel() + wg.Wait() +} + +func (s *pullerSuite) TestPullerRawKV(c *check.C) { + defer testleak.AfterTest(c)() + spans := []regionspan.Span{ + {Start: []byte("c"), End: []byte("e")}, + } + checkpointTs := uint64(996) + plr, cancel, wg, store := s.newPullerForTest(c, spans, checkpointTs) + + // key not in expected region spans, will be ignored + plr.cli.Returns(&model.RegionFeedEvent{ + Val: &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("a"), + Value: []byte("test-value"), + CRTs: uint64(1002), + }, + }) + plr.cli.Returns(&model.RegionFeedEvent{ + Val: &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("d"), + Value: []byte("test-value"), + CRTs: uint64(1003), + }, + }) + ev := <-plr.Output() + c.Assert(ev.OpType, check.Equals, model.OpTypePut) + c.Assert(ev.Key, check.DeepEquals, []byte("d")) + + store.Close() + cancel() + wg.Wait() +} diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index fcaa3aa69e2..a13b0b94fe5 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -126,6 +126,10 @@ func newMqSink( return ret } + resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } k := &mqSink{ mqProducer: mqProducer, dispatcher: d, @@ -137,7 +141,7 @@ func newMqSink( partitionInput: partitionInput, partitionResolvedTs: make([]uint64, partitionNum), resolvedNotifier: notifier, - resolvedReceiver: notifier.NewReceiver(50 * time.Millisecond), + resolvedReceiver: resolvedReceiver, statistics: NewStatistics(ctx, "MQ", opts), } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 677fc603e74..9aa4ef6e4af 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -133,7 +133,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 } func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) { - receiver := s.resolvedNotifier.NewReceiver(50 * time.Millisecond) + receiver, err := s.resolvedNotifier.NewReceiver(50 * time.Millisecond) + if err != nil { + log.Error("flush row changed events routine starts failed", zap.Error(err)) + return + } for { select { case <-ctx.Done(): @@ -574,17 +578,23 @@ func newMySQLSink( sink.execWaitNotifier = new(notify.Notifier) sink.resolvedNotifier = new(notify.Notifier) - sink.createSinkWorkers(ctx) + err = sink.createSinkWorkers(ctx) + if err != nil { + return nil, err + } go sink.flushRowChangedEvents(ctx) return sink, nil } -func (s *mysqlSink) createSinkWorkers(ctx context.Context) { +func (s *mysqlSink) createSinkWorkers(ctx context.Context) error { s.workers = make([]*mysqlSinkWorker, s.params.workerCount) for i := range s.workers { - receiver := s.execWaitNotifier.NewReceiver(defaultFlushInterval) + receiver, err := s.execWaitNotifier.NewReceiver(defaultFlushInterval) + if err != nil { + return err + } worker := newMySQLSinkWorker( s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], receiver, s.execDMLs) s.workers[i] = worker @@ -598,6 +608,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) { } }() } + return nil } func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) { diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index a0807e42bd5..8ee1c975ea1 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -381,9 +381,11 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) { cctx, cancel := context.WithCancel(ctx) var outputRows [][]*model.RowChangedEvent var outputReplicaIDs []uint64 + receiver, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) w := newMySQLSinkWorker(tc.maxTxnRow, 1, bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), - notifier.NewReceiver(-1), + receiver, func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { outputRows = append(outputRows, events) outputReplicaIDs = append(outputReplicaIDs, replicaID) @@ -960,6 +962,289 @@ func (s MySQLSinkSuite) TestCheckTiDBVariable(c *check.C) { c.Assert(err, check.ErrorMatches, ".*"+sql.ErrConnDone.Error()) } +<<<<<<< HEAD +======= +func mockTestDB() (*sql.DB, error) { + // mock for test db, which is used querying TiDB session variable + db, mock, err := sqlmock.New() + if err != nil { + return nil, err + } + columns := []string{"Variable_name", "Value"} + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), + ) + mock.ExpectClose() + return db, nil +} + +func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { + defer testleak.AfterTest(c)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dbIndex := 0 + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). + WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). + AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) + mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';"). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConn + defer func() { + getDBConnImpl = backupGetDBConn + }() + + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + rc.Cyclic = &config.CyclicConfig{ + Enable: true, + ReplicaID: 1, + FilterReplicaID: []uint64{2}, + } + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + cyclicConfig, err := rc.Cyclic.Marshal() + c.Assert(err, check.IsNil) + opts := map[string]string{ + mark.OptCyclicConfig: cyclicConfig, + } + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, opts) + c.Assert(err, check.IsNil) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + +func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { + defer testleak.AfterTest(c)() + + dbIndex := 0 + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). + WithArgs(1, "test", 2, "test"). + WillReturnResult(sqlmock.NewResult(2, 2)) + mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)"). + WithArgs(1, "test", 2, "test"). + WillReturnResult(sqlmock.NewResult(2, 2)) + mock.ExpectCommit() + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConn + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(err, check.IsNil) + + rows := []*model.RowChangedEvent{ + { + StartTs: 1, + CommitTs: 2, + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"}, + }, + }, + { + StartTs: 1, + CommitTs: 2, + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 2}, + {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"}, + }, + }, + { + StartTs: 5, + CommitTs: 6, + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 3}, + {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"}, + }, + }, + { + StartTs: 3, + CommitTs: 4, + Table: &model.TableName{Schema: "s1", Table: "t2", TableID: 2}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"}, + }, + }, + { + StartTs: 3, + CommitTs: 4, + Table: &model.TableName{Schema: "s1", Table: "t2", TableID: 2}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 2}, + {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test"}, + }, + }, + } + + err = sink.EmitRowChangedEvents(ctx, rows...) + c.Assert(err, check.IsNil) + + err = retry.Run(time.Millisecond*20, 10, func() error { + ts, err := sink.FlushRowChangedEvents(ctx, uint64(2)) + c.Assert(err, check.IsNil) + if ts < uint64(2) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) + } + return nil + }) + c.Assert(err, check.IsNil) + + err = retry.Run(time.Millisecond*20, 10, func() error { + ts, err := sink.FlushRowChangedEvents(ctx, uint64(4)) + c.Assert(err, check.IsNil) + if ts < uint64(4) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) + } + return nil + }) + c.Assert(err, check.IsNil) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + +func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { + defer testleak.AfterTest(c)() + + dbIndex := 0 + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int"). + WillReturnError(&dmysql.MySQLError{ + Number: uint16(infoschema.ErrColumnExists.Code()), + }) + mock.ExpectRollback() + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConn + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=4") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + rc.Filter = &config.FilterConfig{ + Rules: []string{"test.t1"}, + } + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(err, check.IsNil) + + ddl1 := &model.DDLEvent{ + StartTs: 1000, + CommitTs: 1010, + TableInfo: &model.SimpleTableInfo{ + Schema: "test", + Table: "t1", + }, + Type: timodel.ActionAddColumn, + Query: "ALTER TABLE test.t1 ADD COLUMN a int", + } + ddl2 := &model.DDLEvent{ + StartTs: 1020, + CommitTs: 1030, + TableInfo: &model.SimpleTableInfo{ + Schema: "test", + Table: "t2", + }, + Type: timodel.ActionAddColumn, + Query: "ALTER TABLE test.t1 ADD COLUMN a int", + } + + err = sink.EmitDDLEvent(ctx, ddl1) + c.Assert(err, check.IsNil) + err = sink.EmitDDLEvent(ctx, ddl2) + c.Assert(cerror.ErrDDLEventIgnored.Equal(err), check.IsTrue) + // DDL execute failed, but error can be ignored + err = sink.EmitDDLEvent(ctx, ddl1) + c.Assert(err, check.IsNil) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + +>>>>>>> 415aa00... pkg/notify: fix create receiver on closed notifier (#1185) /* import ( "context" diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 43f919e49fa..f29302725ec 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -342,6 +342,10 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c } notifier := new(notify.Notifier) + flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } k := &kafkaSaramaProducer{ asyncClient: asyncClient, syncClient: syncClient, @@ -352,7 +356,7 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c sent uint64 }, partitionNum), flushedNotifier: notifier, - flushedReceiver: notifier.NewReceiver(50 * time.Millisecond), + flushedReceiver: flushedReceiver, closeCh: make(chan struct{}), failpointCh: make(chan error, 1), } diff --git a/errors.toml b/errors.toml index 6019129b324..a014b6699f4 100755 --- a/errors.toml +++ b/errors.toml @@ -451,6 +451,11 @@ error = ''' old value is not enabled ''' +["CDC:ErrOperateOnClosedNotifier"] +error = ''' +operate on a closed notifier +''' + ["CDC:ErrOwnerCampaignKeyDeleted"] error = ''' owner campaign key deleted diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 22402a26d33..151fc0c5c69 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -114,6 +114,7 @@ var ( ErrLoadTimezone = errors.Normalize("load timezone", errors.RFCCodeText("CDC:ErrLoadTimezone")) ErrURLFormatInvalid = errors.Normalize("url format is invalid", errors.RFCCodeText("CDC:ErrURLFormatInvalid")) ErrIntersectNoOverlap = errors.Normalize("span doesn't overlap: %+v vs %+v", errors.RFCCodeText("CDC:ErrIntersectNoOverlap")) + ErrOperateOnClosedNotifier = errors.Normalize("operate on a closed notifier", errors.RFCCodeText("CDC:ErrOperateOnClosedNotifier")) // encode/decode, data format and data integrity errors ErrInvalidRecordKey = errors.Normalize("invalid record key - %q", errors.RFCCodeText("CDC:ErrInvalidRecordKey")) diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 38187ddea44..477aa67d0cd 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -16,6 +16,8 @@ package notify import ( "sync" "time" + + "github.com/pingcap/ticdc/pkg/errors" ) // Notifier provides a one-to-many notification mechanism @@ -26,6 +28,7 @@ type Notifier struct { } maxIndex int mu sync.RWMutex + closed bool } // Notify sends a signal to the Receivers @@ -77,9 +80,12 @@ func (r *Receiver) signalTickLoop() { // NewReceiver creates a receiver // returns a channel to receive notifications and a function to close this receiver -func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver { +func (n *Notifier) NewReceiver(tickTime time.Duration) (*Receiver, error) { n.mu.Lock() defer n.mu.Unlock() + if n.closed { + return nil, errors.ErrOperateOnClosedNotifier.GenWithStackByArgs() + } currentIndex := n.maxIndex n.maxIndex++ receiverCh := make(chan struct{}, 1) @@ -104,7 +110,7 @@ func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver { rec *Receiver index int }{rec: rec, index: currentIndex}) - return rec + return rec, nil } func (n *Notifier) remove(index int) { @@ -123,6 +129,8 @@ func (n *Notifier) remove(index int) { } // Close closes the notify and stops all receiver in this notifier +// Note we must `Close` the notifier if we can't ensure each receiver of this +// notifier is called `Stop` in order to prevent goroutine leak. func (n *Notifier) Close() { n.mu.Lock() defer n.mu.Unlock() @@ -133,4 +141,5 @@ func (n *Notifier) Close() { close(receiver.rec.closeCh) } n.receivers = nil + n.closed = true } diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index cb737a4b9c4..e2517988f8e 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -33,9 +34,12 @@ var _ = check.Suite(¬ifySuite{}) func (s *notifySuite) TestNotifyHub(c *check.C) { defer testleak.AfterTest(c)() notifier := new(Notifier) - r1 := notifier.NewReceiver(-1) - r2 := notifier.NewReceiver(-1) - r3 := notifier.NewReceiver(-1) + r1, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) + r2, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) + r3, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) finishedCh := make(chan struct{}) go func() { for i := 0; i < 5; i++ { @@ -52,12 +56,14 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r2.Stop() r3.Stop() c.Assert(len(notifier.receivers), check.Equals, 0) - r4 := notifier.NewReceiver(-1) + r4, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) <-r4.C r4.Stop() notifier2 := new(Notifier) - r5 := notifier2.NewReceiver(10 * time.Millisecond) + r5, err := notifier2.NewReceiver(10 * time.Millisecond) + c.Assert(err, check.IsNil) <-r5.C r5.Stop() <-finishedCh // To make the leak checker happy @@ -80,8 +86,10 @@ func (s *notifySuite) TestContinusStop(c *check.C) { }() n := 50 receivers := make([]*Receiver, n) + var err error for i := 0; i < n; i++ { - receivers[i] = notifier.NewReceiver(10 * time.Millisecond) + receivers[i], err = notifier.NewReceiver(10 * time.Millisecond) + c.Assert(err, check.IsNil) } for i := 0; i < n; i++ { i := i @@ -100,3 +108,11 @@ func (s *notifySuite) TestContinusStop(c *check.C) { } <-ctx.Done() } + +func (s *notifySuite) TestNewReceiverWithClosedNotifier(c *check.C) { + defer testleak.AfterTest(c)() + notifier := new(Notifier) + notifier.Close() + _, err := notifier.NewReceiver(50 * time.Millisecond) + c.Assert(errors.ErrOperateOnClosedNotifier.Equal(err), check.IsTrue) +}