From c04f715dde0bc1e8b7233431a2ff2c4794d4679a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 20 Jan 2022 22:09:46 +0800 Subject: [PATCH] entry,processor(ticdc): enhancements to mounter (#4400) close pingcap/tiflow#4401 --- cdc/entry/mounter.go | 71 +++++++++++++++++++------------- cdc/processor/pipeline/sorter.go | 9 ++-- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index e90e209e97a..5418d4f8d01 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -19,7 +19,7 @@ import ( "encoding/json" "fmt" "math" - "math/rand" + "sync/atomic" "time" "unsafe" @@ -39,7 +39,9 @@ import ( ) const ( - defaultOutputChanSize = 128000 + // The buffer size of input channel of each mounter worker. + // 16 is large enough, because a channel exclusively belongs to a worker. + defaultInputChanSize = 16 ) type baseKVEntry struct { @@ -67,7 +69,11 @@ type rowKVEntry struct { // Mounter is used to parse SQL events from KV events type Mounter interface { Run(ctx context.Context) error - Input() chan<- *model.PolymorphicEvent + // AddEntry accepts `model.PolymorphicEvent` with `RawKVEntry` filled and + // decodes `RawKVEntry` into `RowChangedEvent`. + // It also close `model.PolymorphicEvent.finished` channel to notify callers + // that decoding is done. + AddEntry(ctx context.Context, event *model.PolymorphicEvent) error } type mounterImpl struct { @@ -76,6 +82,9 @@ type mounterImpl struct { tz *time.Location workerNum int enableOldValue bool + + // index is an atomic variable to dispatch input events to workers. + index int64 } // NewMounter creates a mounter @@ -85,7 +94,7 @@ func NewMounter(schemaStorage SchemaStorage, workerNum int, enableOldValue bool) } chs := make([]chan *model.PolymorphicEvent, workerNum) for i := 0; i < workerNum; i++ { - chs[i] = make(chan *model.PolymorphicEvent, defaultOutputChanSize) + chs[i] = make(chan *model.PolymorphicEvent, defaultInputChanSize) } return &mounterImpl{ schemaStorage: schemaStorage, @@ -100,17 +109,34 @@ const defaultMounterWorkerNum = 32 func (m *mounterImpl) Run(ctx context.Context) error { m.tz = util.TimezoneFromCtx(ctx) errg, ctx := errgroup.WithContext(ctx) - errg.Go(func() error { - m.collectMetrics(ctx) - return nil - }) for i := 0; i < m.workerNum; i++ { index := i errg.Go(func() error { return m.codecWorker(ctx, index) }) } - return errg.Wait() + + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID) + + flushMetricsInterval := 15 * time.Second + timer := time.NewTimer(flushMetricsInterval) + defer timer.Stop() + for { + select { + // ctx.Done returns when parent ctx done or error occurs in errg. + case <-ctx.Done(): + return errg.Wait() + case <-timer.C: + chSize := 0 + for _, ch := range m.rawRowChangedChs { + chSize += len(ch) + } + metricMounterInputChanSize.Set(float64(chSize)) + timer.Reset(flushMetricsInterval) + } + } } func (m *mounterImpl) codecWorker(ctx context.Context, index int) error { @@ -148,26 +174,13 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error { } } -func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent { - return m.rawRowChangedChs[rand.Intn(m.workerNum)] -} - -func (m *mounterImpl) collectMetrics(ctx context.Context) { - captureAddr := util.CaptureAddrFromCtx(ctx) - changefeedID := util.ChangefeedIDFromCtx(ctx) - metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 15): - chSize := 0 - for _, ch := range m.rawRowChangedChs { - chSize += len(ch) - } - metricMounterInputChanSize.Set(float64(chSize)) - } +func (m *mounterImpl) AddEntry(ctx context.Context, event *model.PolymorphicEvent) error { + index := atomic.AddInt64(&m.index, 1) % int64(m.workerNum) + select { + case <-ctx.Done(): + return ctx.Err() + case m.rawRowChangedChs[index] <- event: + return nil } } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 90ecad4b672..3032c41fcaa 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -176,10 +176,9 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b // this separate goroutine to prevent blocking // the whole pipeline. msg.SetUpFinishedChan() - select { - case <-ctx.Done(): - return nil - case n.mounter.Input() <- msg: + err := n.mounter.AddEntry(ctx, msg) + if err != nil { + return errors.Trace(err) } commitTs := msg.CRTs @@ -198,7 +197,7 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b } // Must wait before accessing msg.Row - err := msg.WaitPrepare(ctx) + err = msg.WaitPrepare(ctx) if err != nil { if errors.Cause(err) != context.Canceled { ctx.Throw(err)