From 8fab33315f425bcc2eb5effa9d84aa56c286237f Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 15 Apr 2022 15:36:28 +0800 Subject: [PATCH 01/10] close the mq sink asynchronously --- cdc/sink/manager.go | 3 +++ cdc/sink/mq.go | 8 ++++++-- cdc/sink/mq_flush_worker_test.go | 4 ++++ cdc/sink/producer/kafka/kafka.go | 8 ++++++++ cdc/sink/producer/mq_producer.go | 2 ++ cdc/sink/producer/pulsar/producer.go | 8 ++++++++ 6 files changed, 31 insertions(+), 2 deletions(-) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 7fda82a95f9..a5d83388cac 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -86,6 +86,9 @@ func (m *Manager) Close(ctx context.Context) error { log.Info("sinkManager try close bufSink", zap.String("changefeed", m.changefeedID)) start := time.Now() + // close the bufSink asynchronously to prevent block too long + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() if err := m.bufSink.Close(ctx); err != nil { log.Info("close bufSink failed", zap.String("changefeed", m.changefeedID), diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 02f1d46d4f7..55492e6e8ed 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -310,8 +310,12 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { } func (k *mqSink) Close(ctx context.Context) error { - err := k.mqProducer.Close() - return errors.Trace(err) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err := <-k.mqProducer.AsyncClose(): + return errors.Trace(err) + } } func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { diff --git a/cdc/sink/mq_flush_worker_test.go b/cdc/sink/mq_flush_worker_test.go index c5879b8d27b..ca896094339 100644 --- a/cdc/sink/mq_flush_worker_test.go +++ b/cdc/sink/mq_flush_worker_test.go @@ -69,6 +69,10 @@ func (m *mockProducer) Close() error { panic("Not used") } +func (m *mockProducer) AsyncClose() chan error { + panic("Not used") +} + func (m *mockProducer) InjectError(err error) { m.mockErr <- err } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index bbb9fe952fd..caf2445c90d 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -195,6 +195,14 @@ func (k *kafkaSaramaProducer) stop() { close(k.closeCh) } +func (k *kafkaSaramaProducer) AsyncClose() chan error { + ret := make(chan error) + go func() { + ret <- k.Close() + }() + return ret +} + // Close closes the sync and async clients. func (k *kafkaSaramaProducer) Close() error { log.Info("stop the kafka producer", zap.String("changefeed", k.id), zap.Any("role", k.role)) diff --git a/cdc/sink/producer/mq_producer.go b/cdc/sink/producer/mq_producer.go index ba11c3c62e8..6e71a1d4607 100644 --- a/cdc/sink/producer/mq_producer.go +++ b/cdc/sink/producer/mq_producer.go @@ -34,4 +34,6 @@ type Producer interface { Flush(ctx context.Context) error // Close closes the producer and client(s). Close() error + // AsyncClose close the produce in an asynchronous way. + AsyncClose() chan error } diff --git a/cdc/sink/producer/pulsar/producer.go b/cdc/sink/producer/pulsar/producer.go index 8f1a5a5ab4b..8d575189573 100644 --- a/cdc/sink/producer/pulsar/producer.go +++ b/cdc/sink/producer/pulsar/producer.go @@ -146,3 +146,11 @@ func (p *Producer) Close() error { p.client.Close() return nil } + +func (p *Producer) AsyncClose() chan error { + ret := make(chan error) + go func() { + ret <- p.Close() + }() + return ret +} From 278c5206d135b58e2f6bd188585e2e8b25ee7399 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 15 Apr 2022 16:17:14 +0800 Subject: [PATCH 02/10] tiny fix. --- cdc/processor/pipeline/table_actor.go | 8 +++++--- cdc/sink/manager.go | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 096167a85ac..b0d4be36b30 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -134,12 +134,14 @@ func NewTableActor(cdcCtx cdcContext.Context, router: globalVars.TableActorSystem.Router(), actorID: actorID, + changefeedID: changefeedVars.ID, + stopCtx: cctx, } startTime := time.Now() log.Info("table actor starting", - zap.String("changfeed", changefeedVars.ID), + zap.String("changefeed", changefeedVars.ID), zap.String("tableName", tableName), zap.Int64("tableID", tableID)) if err := table.start(cctx); err != nil { @@ -151,7 +153,7 @@ func NewTableActor(cdcCtx cdcContext.Context, return nil, errors.Trace(err) } log.Info("table actor started", - zap.String("changfeed", changefeedVars.ID), + zap.String("changefeed", changefeedVars.ID), zap.String("tableName", tableName), zap.Int64("tableID", tableID), zap.Duration("duration", time.Since(startTime))) @@ -381,7 +383,7 @@ func (t *tableActor) stop(err error) { log.Warn("close sink failed", zap.String("changefeed", t.changefeedID), zap.String("tableName", t.tableName), - zap.Error(err), zap.Error(err)) + zap.Error(err)) } } log.Info("table actor stopped", diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index a5d83388cac..1d61c71ee7a 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -91,6 +91,7 @@ func (m *Manager) Close(ctx context.Context) error { defer cancel() if err := m.bufSink.Close(ctx); err != nil { log.Info("close bufSink failed", + zap.Error(err), zap.String("changefeed", m.changefeedID), zap.Duration("duration", time.Since(start))) return err From f667a563ef748cf0c9528506f97817328fe5d03d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 15 Apr 2022 16:25:15 +0800 Subject: [PATCH 03/10] close the ddl sink, ignore context cancel. --- cdc/owner/ddl_sink.go | 5 ++++- cdc/sink/mq.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index dce11b4c021..4739b03b8ee 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -264,5 +264,8 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { err = s.syncPointStore.Close() } s.wg.Wait() - return err + if err != nil && errors.Cause(err) != context.Canceled { + return err + } + return nil } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 55492e6e8ed..8d85916c20e 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -310,10 +310,11 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { } func (k *mqSink) Close(ctx context.Context) error { + closedCh := k.mqProducer.AsyncClose() select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case err := <-k.mqProducer.AsyncClose(): + case err := <-closedCh: return errors.Trace(err) } } From 21e27016515e24dfebdd4ff6f4af60d8a8287841 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 18 Apr 2022 13:27:46 +0800 Subject: [PATCH 04/10] use buffered channel to close the producer. --- cdc/sink/producer/kafka/kafka.go | 4 +++- cdc/sink/producer/pulsar/producer.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index caf2445c90d..b9c6adb81b6 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -195,10 +195,12 @@ func (k *kafkaSaramaProducer) stop() { close(k.closeCh) } +// AsyncClose close the produce in an asynchronous way. func (k *kafkaSaramaProducer) AsyncClose() chan error { - ret := make(chan error) + ret := make(chan error, 1) go func() { ret <- k.Close() + close(ret) }() return ret } diff --git a/cdc/sink/producer/pulsar/producer.go b/cdc/sink/producer/pulsar/producer.go index 8d575189573..a00f42a94dc 100644 --- a/cdc/sink/producer/pulsar/producer.go +++ b/cdc/sink/producer/pulsar/producer.go @@ -147,10 +147,12 @@ func (p *Producer) Close() error { return nil } +// AsyncClose close the produce in an asynchronous way. func (p *Producer) AsyncClose() chan error { - ret := make(chan error) + ret := make(chan error, 1) go func() { ret <- p.Close() + close(ret) }() return ret } From 000f6cf6aef5c25068e3f8f2cf70f78efc6d200d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 18 Apr 2022 13:42:19 +0800 Subject: [PATCH 05/10] sink manager close buf sink asynchronously. --- cdc/sink/manager.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 1d61c71ee7a..973d13b5092 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -17,7 +17,6 @@ import ( "context" "sync" "sync/atomic" - "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -83,22 +82,15 @@ func (m *Manager) Close(ctx context.Context) error { defer m.tableSinksMu.Unlock() tableSinkTotalRowsCountCounter.DeleteLabelValues(m.changefeedID) if m.bufSink != nil { - log.Info("sinkManager try close bufSink", - zap.String("changefeed", m.changefeedID)) - start := time.Now() - // close the bufSink asynchronously to prevent block too long - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - if err := m.bufSink.Close(ctx); err != nil { - log.Info("close bufSink failed", - zap.Error(err), + // We don't need to wait sink Close, pass a canceled context is ok + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := m.bufSink.Close(ctx); err != nil && errors.Cause(err) != context.Canceled { + log.Warn("close bufSink failed", zap.String("changefeed", m.changefeedID), - zap.Duration("duration", time.Since(start))) + zap.Error(err)) return err } - log.Info("close bufSink success", - zap.String("changefeed", m.changefeedID), - zap.Duration("duration", time.Since(start))) } return nil } From b4550b95f189121342257d364142b659e947acd9 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 18 Apr 2022 14:12:24 +0800 Subject: [PATCH 06/10] tiny fix. --- cdc/sink/manager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 973d13b5092..655faa57166 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -82,9 +82,6 @@ func (m *Manager) Close(ctx context.Context) error { defer m.tableSinksMu.Unlock() tableSinkTotalRowsCountCounter.DeleteLabelValues(m.changefeedID) if m.bufSink != nil { - // We don't need to wait sink Close, pass a canceled context is ok - ctx, cancel := context.WithCancel(context.Background()) - cancel() if err := m.bufSink.Close(ctx); err != nil && errors.Cause(err) != context.Canceled { log.Warn("close bufSink failed", zap.String("changefeed", m.changefeedID), From fe4d06e82ec5c54a6179eb2c30177bdd9a2a7327 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 18 Apr 2022 14:45:15 +0800 Subject: [PATCH 07/10] fix by review. --- cdc/processor/pipeline/table_actor.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index b0d4be36b30..e8e62c20364 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -129,19 +129,18 @@ func NewTableActor(cdcCtx cdcContext.Context, targetTs: targetTs, started: false, + changefeedID: changefeedVars.ID, changefeedVars: changefeedVars, globalVars: globalVars, router: globalVars.TableActorSystem.Router(), actorID: actorID, - changefeedID: changefeedVars.ID, - stopCtx: cctx, } startTime := time.Now() log.Info("table actor starting", - zap.String("changefeed", changefeedVars.ID), + zap.String("changefeed", table.changefeedID), zap.String("tableName", tableName), zap.Int64("tableID", tableID)) if err := table.start(cctx); err != nil { @@ -153,14 +152,14 @@ func NewTableActor(cdcCtx cdcContext.Context, return nil, errors.Trace(err) } log.Info("table actor started", - zap.String("changefeed", changefeedVars.ID), + zap.String("changefeed", table.changefeedID), zap.String("tableName", tableName), zap.Int64("tableID", tableID), zap.Duration("duration", time.Since(startTime))) return table, nil } -// Close implements Actor interface. +// OnClose implements Actor interface. // TODO: implements table actor stop here. func (t *tableActor) OnClose() { } From ad88544ca0a777fc5738c6649d61d78b8800439b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 19 Apr 2022 14:10:17 +0800 Subject: [PATCH 08/10] add some comment. --- cdc/sink/mq.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 8d85916c20e..ac6eb7142ae 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -309,6 +309,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } +// Close the mqSink +// Caller can pass a canceled context to make sure this is non-blocking. func (k *mqSink) Close(ctx context.Context) error { closedCh := k.mqProducer.AsyncClose() select { From 22648330185a53406a9ba84214e47adddc241eed Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 19 Apr 2022 15:03:35 +0800 Subject: [PATCH 09/10] close the producer asynchronously. --- cdc/sink/mq.go | 10 +++------- cdc/sink/mq_flush_worker_test.go | 4 ---- cdc/sink/producer/kafka/kafka.go | 10 ---------- cdc/sink/producer/mq_producer.go | 2 -- cdc/sink/producer/pulsar/producer.go | 10 ---------- 5 files changed, 3 insertions(+), 33 deletions(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index ac6eb7142ae..4c5dd830981 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -311,14 +311,10 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { // Close the mqSink // Caller can pass a canceled context to make sure this is non-blocking. +// Close the producer asynchronously, does not care closed successfully or not. func (k *mqSink) Close(ctx context.Context) error { - closedCh := k.mqProducer.AsyncClose() - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case err := <-closedCh: - return errors.Trace(err) - } + go k.mqProducer.Close() + return nil } func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { diff --git a/cdc/sink/mq_flush_worker_test.go b/cdc/sink/mq_flush_worker_test.go index ca896094339..c5879b8d27b 100644 --- a/cdc/sink/mq_flush_worker_test.go +++ b/cdc/sink/mq_flush_worker_test.go @@ -69,10 +69,6 @@ func (m *mockProducer) Close() error { panic("Not used") } -func (m *mockProducer) AsyncClose() chan error { - panic("Not used") -} - func (m *mockProducer) InjectError(err error) { m.mockErr <- err } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index b9c6adb81b6..bbb9fe952fd 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -195,16 +195,6 @@ func (k *kafkaSaramaProducer) stop() { close(k.closeCh) } -// AsyncClose close the produce in an asynchronous way. -func (k *kafkaSaramaProducer) AsyncClose() chan error { - ret := make(chan error, 1) - go func() { - ret <- k.Close() - close(ret) - }() - return ret -} - // Close closes the sync and async clients. func (k *kafkaSaramaProducer) Close() error { log.Info("stop the kafka producer", zap.String("changefeed", k.id), zap.Any("role", k.role)) diff --git a/cdc/sink/producer/mq_producer.go b/cdc/sink/producer/mq_producer.go index 6e71a1d4607..ba11c3c62e8 100644 --- a/cdc/sink/producer/mq_producer.go +++ b/cdc/sink/producer/mq_producer.go @@ -34,6 +34,4 @@ type Producer interface { Flush(ctx context.Context) error // Close closes the producer and client(s). Close() error - // AsyncClose close the produce in an asynchronous way. - AsyncClose() chan error } diff --git a/cdc/sink/producer/pulsar/producer.go b/cdc/sink/producer/pulsar/producer.go index a00f42a94dc..8f1a5a5ab4b 100644 --- a/cdc/sink/producer/pulsar/producer.go +++ b/cdc/sink/producer/pulsar/producer.go @@ -146,13 +146,3 @@ func (p *Producer) Close() error { p.client.Close() return nil } - -// AsyncClose close the produce in an asynchronous way. -func (p *Producer) AsyncClose() chan error { - ret := make(chan error, 1) - go func() { - ret <- p.Close() - close(ret) - }() - return ret -} From 606639b370e0e9fa12bc4c7054a4387d4d425450 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 19 Apr 2022 15:06:07 +0800 Subject: [PATCH 10/10] tiny fix the comment. --- cdc/sink/mq.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 4c5dd830981..71bd5294a00 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -309,8 +309,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } -// Close the mqSink -// Caller can pass a canceled context to make sure this is non-blocking. // Close the producer asynchronously, does not care closed successfully or not. func (k *mqSink) Close(ctx context.Context) error { go k.mqProducer.Close()