Skip to content

Commit

Permalink
Merge branch 'master' into fix_3884_add_log_to_detect_stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Jan 21, 2022
2 parents ad7df9d + 0874043 commit d279324
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 155 deletions.
71 changes: 42 additions & 29 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -39,7 +39,9 @@ import (
)

const (
defaultOutputChanSize = 128000
// The buffer size of input channel of each mounter worker.
// 16 is large enough, because a channel exclusively belongs to a worker.
defaultInputChanSize = 16
)

type baseKVEntry struct {
Expand Down Expand Up @@ -67,7 +69,11 @@ type rowKVEntry struct {
// Mounter is used to parse SQL events from KV events
type Mounter interface {
Run(ctx context.Context) error
Input() chan<- *model.PolymorphicEvent
// AddEntry accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
// decodes `RawKVEntry` into `RowChangedEvent`.
// It also close `model.PolymorphicEvent.finished` channel to notify callers
// that decoding is done.
AddEntry(ctx context.Context, event *model.PolymorphicEvent) error
}

type mounterImpl struct {
Expand All @@ -76,6 +82,9 @@ type mounterImpl struct {
tz *time.Location
workerNum int
enableOldValue bool

// index is an atomic variable to dispatch input events to workers.
index int64
}

// NewMounter creates a mounter
Expand All @@ -85,7 +94,7 @@ func NewMounter(schemaStorage SchemaStorage, workerNum int, enableOldValue bool)
}
chs := make([]chan *model.PolymorphicEvent, workerNum)
for i := 0; i < workerNum; i++ {
chs[i] = make(chan *model.PolymorphicEvent, defaultOutputChanSize)
chs[i] = make(chan *model.PolymorphicEvent, defaultInputChanSize)
}
return &mounterImpl{
schemaStorage: schemaStorage,
Expand All @@ -100,17 +109,34 @@ const defaultMounterWorkerNum = 32
func (m *mounterImpl) Run(ctx context.Context) error {
m.tz = util.TimezoneFromCtx(ctx)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
m.collectMetrics(ctx)
return nil
})
for i := 0; i < m.workerNum; i++ {
index := i
errg.Go(func() error {
return m.codecWorker(ctx, index)
})
}
return errg.Wait()

captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

flushMetricsInterval := 15 * time.Second
timer := time.NewTimer(flushMetricsInterval)
defer timer.Stop()
for {
select {
// ctx.Done returns when parent ctx done or error occurs in errg.
case <-ctx.Done():
return errg.Wait()
case <-timer.C:
chSize := 0
for _, ch := range m.rawRowChangedChs {
chSize += len(ch)
}
metricMounterInputChanSize.Set(float64(chSize))
timer.Reset(flushMetricsInterval)
}
}
}

func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
Expand Down Expand Up @@ -148,26 +174,13 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
}
}

func (m *mounterImpl) Input() chan<- *model.PolymorphicEvent {
return m.rawRowChangedChs[rand.Intn(m.workerNum)]
}

func (m *mounterImpl) collectMetrics(ctx context.Context) {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMounterInputChanSize := mounterInputChanSizeGauge.WithLabelValues(captureAddr, changefeedID)

for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 15):
chSize := 0
for _, ch := range m.rawRowChangedChs {
chSize += len(ch)
}
metricMounterInputChanSize.Set(float64(chSize))
}
func (m *mounterImpl) AddEntry(ctx context.Context, event *model.PolymorphicEvent) error {
index := atomic.AddInt64(&m.index, 1) % int64(m.workerNum)
select {
case <-ctx.Done():
return ctx.Err()
case m.rawRowChangedChs[index] <- event:
return nil
}
}

Expand Down
9 changes: 4 additions & 5 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,9 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b
// this separate goroutine to prevent blocking
// the whole pipeline.
msg.SetUpFinishedChan()
select {
case <-ctx.Done():
return nil
case n.mounter.Input() <- msg:
err := n.mounter.AddEntry(ctx, msg)
if err != nil {
return errors.Trace(err)
}

commitTs := msg.CRTs
Expand All @@ -198,7 +197,7 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b
}

// Must wait before accessing msg.Row
err := msg.WaitPrepare(ctx)
err = msg.WaitPrepare(ctx)
if err != nil {
if errors.Cause(err) != context.Canceled {
ctx.Throw(err)
Expand Down
54 changes: 29 additions & 25 deletions cdc/sorter/unified/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sorter"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -405,8 +404,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return nil
}

resolvedTsNotifier := &notify.Notifier{}
defer resolvedTsNotifier.Close()
resolvedTsNotifierChan := make(chan struct{}, 1)
errg, ctx := errgroup.WithContext(ctx)

errg.Go(func() error {
Expand Down Expand Up @@ -443,40 +441,46 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch

if minTemp > minResolvedTs {
atomic.StoreUint64(&minResolvedTs, minTemp)
resolvedTsNotifier.Notify()
select {
case resolvedTsNotifierChan <- struct{}{}:
default:
}
}
}
})

errg.Go(func() error {
resolvedTsReceiver, err := resolvedTsNotifier.NewReceiver(time.Second * 1)
if err != nil {
if cerrors.ErrOperateOnClosedNotifier.Equal(err) {
// This won't happen unless `resolvedTsNotifier` has been closed, which is
// impossible at this point.
log.Panic("unexpected error", zap.Error(err))
}
return errors.Trace(err)
}
resolvedTsTicker := time.NewTicker(time.Second * 1)

defer resolvedTsReceiver.Stop()
defer resolvedTsTicker.Stop()

var lastResolvedTs uint64
resolvedTsTickFunc := func() error {
curResolvedTs := atomic.LoadUint64(&minResolvedTs)
if curResolvedTs > lastResolvedTs {
err := onMinResolvedTsUpdate(curResolvedTs)
if err != nil {
return errors.Trace(err)
}
} else if curResolvedTs < lastResolvedTs {
log.Panic("resolved-ts regressed in sorter",
zap.Uint64("curResolvedTs", curResolvedTs),
zap.Uint64("lastResolvedTs", lastResolvedTs))
}
return nil
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-resolvedTsReceiver.C:
curResolvedTs := atomic.LoadUint64(&minResolvedTs)
if curResolvedTs > lastResolvedTs {
err := onMinResolvedTsUpdate(curResolvedTs)
if err != nil {
return errors.Trace(err)
}
} else if curResolvedTs < lastResolvedTs {
log.Panic("resolved-ts regressed in sorter",
zap.Uint64("curResolved-ts", curResolvedTs),
zap.Uint64("lastResolved-ts", lastResolvedTs))
case <-resolvedTsTicker.C:
if err := resolvedTsTickFunc(); err != nil {
return err
}
case <-resolvedTsNotifierChan:
if err := resolvedTsTickFunc(); err != nil {
return err
}
}
}
Expand Down
Loading

0 comments on commit d279324

Please sign in to comment.