Skip to content

Commit

Permalink
sorter(ticdc): remove notifier from sorter merger (#4397)
Browse files Browse the repository at this point in the history
ref #4351
  • Loading branch information
sdojjy authored Jan 21, 2022
1 parent 962d115 commit 99389d6
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions cdc/sorter/unified/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sorter"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -405,8 +404,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return nil
}

resolvedTsNotifier := &notify.Notifier{}
defer resolvedTsNotifier.Close()
resolvedTsNotifierChan := make(chan struct{}, 1)
errg, ctx := errgroup.WithContext(ctx)

errg.Go(func() error {
Expand Down Expand Up @@ -443,40 +441,46 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch

if minTemp > minResolvedTs {
atomic.StoreUint64(&minResolvedTs, minTemp)
resolvedTsNotifier.Notify()
select {
case resolvedTsNotifierChan <- struct{}{}:
default:
}
}
}
})

errg.Go(func() error {
resolvedTsReceiver, err := resolvedTsNotifier.NewReceiver(time.Second * 1)
if err != nil {
if cerrors.ErrOperateOnClosedNotifier.Equal(err) {
// This won't happen unless `resolvedTsNotifier` has been closed, which is
// impossible at this point.
log.Panic("unexpected error", zap.Error(err))
}
return errors.Trace(err)
}
resolvedTsTicker := time.NewTicker(time.Second * 1)

defer resolvedTsReceiver.Stop()
defer resolvedTsTicker.Stop()

var lastResolvedTs uint64
resolvedTsTickFunc := func() error {
curResolvedTs := atomic.LoadUint64(&minResolvedTs)
if curResolvedTs > lastResolvedTs {
err := onMinResolvedTsUpdate(curResolvedTs)
if err != nil {
return errors.Trace(err)
}
} else if curResolvedTs < lastResolvedTs {
log.Panic("resolved-ts regressed in sorter",
zap.Uint64("curResolvedTs", curResolvedTs),
zap.Uint64("lastResolvedTs", lastResolvedTs))
}
return nil
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-resolvedTsReceiver.C:
curResolvedTs := atomic.LoadUint64(&minResolvedTs)
if curResolvedTs > lastResolvedTs {
err := onMinResolvedTsUpdate(curResolvedTs)
if err != nil {
return errors.Trace(err)
}
} else if curResolvedTs < lastResolvedTs {
log.Panic("resolved-ts regressed in sorter",
zap.Uint64("curResolved-ts", curResolvedTs),
zap.Uint64("lastResolved-ts", lastResolvedTs))
case <-resolvedTsTicker.C:
if err := resolvedTsTickFunc(); err != nil {
return err
}
case <-resolvedTsNotifierChan:
if err := resolvedTsTickFunc(); err != nil {
return err
}
}
}
Expand Down

0 comments on commit 99389d6

Please sign in to comment.