Skip to content

Commit

Permalink
entry,processor(ticdc): enhancements to mounter (#4400)
Browse files Browse the repository at this point in the history
close #4401
  • Loading branch information
overvenus authored Jan 20, 2022
1 parent cc79ea8 commit c04f715
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 34 deletions.
71 changes: 42 additions & 29 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -39,7 +39,9 @@ import (
)

const (
defaultOutputChanSize = 128000
// The buffer size of input channel of each mounter worker.
// 16 is large enough, because a channel exclusively belongs to a worker.
defaultInputChanSize = 16
)

type baseKVEntry struct {
Expand Down Expand Up @@ -67,7 +69,11 @@ type rowKVEntry struct {
// Mounter is used to parse SQL events from KV events
type Mounter interface {
Run(ctx context.Context) error
Input() chan<- *model.PolymorphicEvent
// AddEntry accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
// decodes `RawKVEntry` into `RowChangedEvent`.
// It also close `model.PolymorphicEvent.finished` channel to notify callers
// that decoding is done.
AddEntry(ctx context.Context, event *model.PolymorphicEvent) error
}

type mounterImpl struct {
Expand All @@ -76,6 +82,9 @@ type mounterImpl struct {
tz *time.Location
workerNum int
enableOldValue bool

// index is an atomic variable to dispatch input events to workers.
index int64
}

// NewMounter creates a mounter
Expand All @@ -85,7 +94,7 @@ func NewMounter(schemaStorage SchemaStorage, workerNum int, enableOldValue bool)
}
chs := make([]chan *model.PolymorphicEvent, workerNum)
for i := 0; i < workerNum; i++ {
chs[i] = make(chan *model.PolymorphicEvent, defaultOutputChanSize)
chs[i] = make(chan *model.PolymorphicEvent, defaultInputChanSize)
}
return &mounterImpl{
schemaStorage: schemaStorage,
Expand All @@ -100,17 +109,34 @@ const defaultMounterWorkerNum = 32
func (m *mounterImpl) Run(ctx context.Context) error {
m.tz = util.TimezoneFromCtx(ctx)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
m.collectMetrics(ctx)
return nil
})
for i := 0; i < m.workerNum; i++ {
index := i
errg.Go(func() error {
return m.codecWorker(ctx, index)
})
}
return errg.Wait()

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

flushMetricsInterval := 15 * time.Second
timer := time.NewTimer(flushMetricsInterval)
defer timer.Stop()
for {
select {
// ctx.Done returns when parent ctx done or error occurs in errg.
case <-ctx.Done():
return errg.Wait()
case <-timer.C:
chSize := 0
for _, ch := range m.rawRowChangedChs {
chSize += len(ch)
}
metricMounterInputChanSize.Set(float64(chSize))
timer.Reset(flushMetricsInterval)
}
}
}

func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
Expand Down Expand Up @@ -148,26 +174,13 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
}
}

func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent {
return m.rawRowChangedChs[rand.Intn(m.workerNum)]
}

func (m *mounterImpl) collectMetrics(ctx context.Context) {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 15):
chSize := 0
for _, ch := range m.rawRowChangedChs {
chSize += len(ch)
}
metricMounterInputChanSize.Set(float64(chSize))
}
func (m *mounterImpl) AddEntry(ctx context.Context, event *model.PolymorphicEvent) error {
index := atomic.AddInt64(&m.index, 1) % int64(m.workerNum)
select {
case <-ctx.Done():
return ctx.Err()
case m.rawRowChangedChs[index] <- event:
return nil
}
}

Expand Down
9 changes: 4 additions & 5 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,9 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b
// this separate goroutine to prevent blocking
// the whole pipeline.
msg.SetUpFinishedChan()
select {
case <-ctx.Done():
return nil
case n.mounter.Input() <- msg:
err := n.mounter.AddEntry(ctx, msg)
if err != nil {
return errors.Trace(err)
}

commitTs := msg.CRTs
Expand All @@ -198,7 +197,7 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b
}

// Must wait before accessing msg.Row
err := msg.WaitPrepare(ctx)
err = msg.WaitPrepare(ctx)
if err != nil {
if errors.Cause(err) != context.Canceled {
ctx.Throw(err)
Expand Down

0 comments on commit c04f715

Please sign in to comment.