From 6a409e9c48da4e4bf824923634ed0ccb9ef1e7c5 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Dec 2021 12:12:38 +0800 Subject: [PATCH] remove Initialize method from the sink. --- cdc/owner/async_sink.go | 5 ----- cdc/owner/async_sink_test.go | 25 ++++------------------ cdc/owner/changefeed.go | 5 +---- cdc/processor/pipeline/sink_test.go | 4 ---- cdc/sink/black_hole.go | 5 ----- cdc/sink/cdclog/file.go | 33 ----------------------------- cdc/sink/cdclog/s3.go | 14 ------------ cdc/sink/manager_test.go | 8 ------- cdc/sink/mq.go | 6 ------ cdc/sink/mysql.go | 5 ----- cdc/sink/simple_mysql_tester.go | 5 ----- cdc/sink/sink.go | 2 -- cdc/sink/table_sink.go | 5 ----- 13 files changed, 5 insertions(+), 117 deletions(-) diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go index e51af798c6c..a995ad7d769 100644 --- a/cdc/owner/async_sink.go +++ b/cdc/owner/async_sink.go @@ -38,7 +38,6 @@ const ( // The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now // Other functions are still synchronization type AsyncSink interface { - Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error // EmitCheckpointTs emits the checkpoint Ts to downstream data source // this function will return after recording the checkpointTs specified in memory immediately // and the recorded checkpointTs will be sent and updated to downstream data source every second @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) { return asyncSink, nil } -func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error { - return s.sink.Initialize(ctx, tableInfo) -} - func (s *asyncSinkImpl) run(ctx cdcContext.Context) { defer s.wg.Done() // TODO make the tick duration configurable diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 529eea01704..2d692f8cf54 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -36,16 +36,10 @@ type asyncSinkSuite struct{} type mockSink struct { sink.Sink - initTableInfo []*model.SimpleTableInfo - checkpointTs model.Ts - ddl *model.DDLEvent - ddlMu sync.Mutex - ddlError error -} - -func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - m.initTableInfo = tableInfo - return nil + checkpointTs model.Ts + ddl *model.DDLEvent + ddlMu sync.Mutex + ddlError error } func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -87,17 +81,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context, return ctx, sink, mockSink } -func (s *asyncSinkSuite) TestInitialize(c *check.C) { - defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - ctx, sink, mockSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) - tableInfos := []*model.SimpleTableInfo{{Schema: "test"}} - err := sink.Initialize(ctx, tableInfos) - c.Assert(err, check.IsNil) - c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo) -} - func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 3bb3dfbb826..05bff14479a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -256,10 +256,7 @@ LOOP: if err != nil { return errors.Trace(err) } - err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos()) - if err != nil { - return errors.Trace(err) - } + // Refer to the previous comment on why we use (checkpointTs-1). c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 2b334ca6323..dd0378c9547 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 { return 0 } -func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { s.received = append(s.received, struct { diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 2f730bf381f..d5e48f6502a 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e return nil } -// Initialize is no-op for blackhole -func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (b *blackHoleSink) Close(ctx context.Context) error { return nil } diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index 6913b015a0b..800b4bcac4f 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -312,39 +312,6 @@ func (f *fileSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return nil } -func (f *fileSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - if tableInfo != nil { - for _, table := range tableInfo { - if table != nil { - name := makeTableDirectoryName(table.TableID) - err := os.MkdirAll(filepath.Join(f.logPath.root, name), defaultDirMode) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkCreateDir, err) - } - } - } - // update log meta to record the relationship about tableName and tableID - f.logMeta = makeLogMetaContent(tableInfo) - data, err := f.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - filePath := f.logPath.meta - if _, err := os.Stat(filePath); !os.IsNotExist(err) { - return cerror.WrapError(cerror.ErrFileSinkMetaAlreadyExists, err) - } - file, err := os.Create(filePath) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkCreateDir, err) - } - _, err = file.Write(data) - if err != nil { - return cerror.WrapError(cerror.ErrFileSinkFileOp, err) - } - } - return nil -} - func (f *fileSink) Close(ctx context.Context) error { return nil } diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index 52726b60c9b..0674035d4a8 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -329,20 +329,6 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return s.storage.WriteFile(ctx, name, fileData) } -func (s *s3Sink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - if tableInfo != nil { - // update log meta to record the relationship about tableName and tableID - s.logMeta = makeLogMetaContent(tableInfo) - - data, err := s.logMeta.Marshal() - if err != nil { - return cerror.WrapError(cerror.ErrMarshalFailed, err) - } - return s.storage.WriteFile(ctx, logMetaFile, data) - } - return nil -} - func (s *s3Sink) Close(ctx context.Context) error { return nil } diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 1e49d1ab01d..dbe501b3f2d 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -48,10 +48,6 @@ func newCheckSink(c *check.C) *checkSink { } } -func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() @@ -344,10 +340,6 @@ type errorSink struct { *check.C } -func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { return errors.New("error in emit row changed events") } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 0dc4a96dfad..e6ffdafbda8 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -249,12 +249,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } -// Initialize registers Avro schemas for all tables -func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // No longer need it for now - return nil -} - func (k *mqSink) Close(ctx context.Context) error { err := k.mqProducer.Close() return errors.Trace(err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 3b4d8478b7a..c038622b72d 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -291,11 +291,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return errors.Trace(err) } -// Initialize is no-op for Mysql sink -func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { return retry.Do(ctx, func() error { err := s.execDDL(ctx, ddl) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index fd435344b5c..86d7a8c013d 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -105,11 +105,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re return sink, nil } -func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // do nothing - return nil -} - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 6b89e334167..fbf5c455de3 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -33,8 +33,6 @@ const ( // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { - Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 4a442266f30..a57cc0123b6 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -32,11 +32,6 @@ type tableSink struct { redoManager redo.LogManager } -func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // do nothing - return nil -} - func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { t.buffer = append(t.buffer, rows...) t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))