From 810fd7a470672ffc26e336915bda499d43e46be1 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 10 Jun 2021 16:26:30 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #1987 Signed-off-by: ti-chi-bot --- cdc/puller/sorter/merger.go | 269 +++++++++++----------------- cdc/puller/sorter/merger_test.go | 165 +++++++---------- cdc/puller/sorter/unified_sorter.go | 17 +- 3 files changed, 184 insertions(+), 267 deletions(-) diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 817e3c96b18..139e2df819c 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -22,7 +22,6 @@ import ( "sync/atomic" "time" - "github.com/edwingeng/deque" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -36,13 +35,7 @@ import ( ) // TODO refactor this into a struct Merger. -func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out chan *model.PolymorphicEvent, onExit func(), bufLen *int64) error { - // TODO remove bufLenPlaceholder when refactoring - if bufLen == nil { - var bufLenPlaceholder int64 - bufLen = &bufLenPlaceholder - } - +func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out chan *model.PolymorphicEvent, onExit func()) error { captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) _, tableName := util.TableIDFromCtx(ctx) @@ -58,13 +51,11 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch lastResolvedTs := make([]uint64, numSorters) minResolvedTs := uint64(0) - taskBuf := newTaskBuffer(bufLen) var workingSet map[*flushTask]struct{} - pendingSet := make(map[*flushTask]*model.PolymorphicEvent) + pendingSet := &sync.Map{} defer func() { - log.Info("Unified Sorter: merger exiting, cleaning up resources", zap.Int("pending-set-size", len(pendingSet))) - taskBuf.setClosed() + log.Info("Unified Sorter: merger exiting, cleaning up resources") // cancel pending async IO operations. onExit() cleanUpTask := func(task *flushTask) { @@ -88,29 +79,30 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch _ = printError(task.dealloc()) } + LOOP: for { - task, err := taskBuf.get(ctx) - if err != nil { - _ = printError(err) - break + var task *flushTask + select { + case task = <-in: + default: + break LOOP } if task == nil { - log.Debug("Merger exiting, taskBuf is exhausted") + log.Debug("Merger exiting, in-channel is exhausted") break } cleanUpTask(task) } - for task := range pendingSet { - cleanUpTask(task) - } + pendingSet.Range(func(task, _ interface{}) bool { + cleanUpTask(task.(*flushTask)) + return true + }) for task := range workingSet { cleanUpTask(task) } - - taskBuf.close() }() lastOutputTs := uint64(0) @@ -133,11 +125,12 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } } - onMinResolvedTsUpdate := func() error { + onMinResolvedTsUpdate := func(minResolvedTs /* note the shadowing */ uint64) error { metricSorterMergerStartTsGauge.Set(float64(oracle.ExtractPhysical(minResolvedTs))) workingSet = make(map[*flushTask]struct{}) sortHeap := new(sortHeap) +<<<<<<< HEAD defer func() { // clean up cleanUpCtx, cancel := context.WithTimeout(context.TODO(), 2*time.Second) @@ -163,35 +156,62 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch }() for task, cache := range pendingSet { +======= + // loopErr is used to return an error out of the closure taken by `pendingSet.Range`. + var loopErr error + // NOTE 1: We can block the closure passed to `pendingSet.Range` WITHOUT worrying about + // deadlocks because the closure is NOT called with any lock acquired in the implementation + // of Sync.Map. + // NOTE 2: It is safe to used `Range` to iterate through the pendingSet, in spite of NOT having + // a snapshot consistency because (1) pendingSet is updated first before minResolvedTs is updated, + // which guarantees that useful new flushTasks are not missed, and (2) by design, once minResolvedTs is updated, + // new flushTasks will satisfy `task.tsLowerBound > minResolvedTs`, and such flushTasks are ignored in + // the closure. + pendingSet.Range(func(iTask, iCache interface{}) bool { + task := iTask.(*flushTask) + var cache *model.PolymorphicEvent + if iCache != nil { + cache = iCache.(*model.PolymorphicEvent) + } + +>>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) if task.tsLowerBound > minResolvedTs { // the condition above implies that for any event in task.backend, CRTs > minResolvedTs. - continue + return true } var event *model.PolymorphicEvent if cache != nil { event = cache } else { - var err error - select { case <-ctx.Done(): - return ctx.Err() + loopErr = ctx.Err() + // terminates the loop + return false case err := <-task.finished: if err != nil { - return errors.Trace(err) + loopErr = errors.Trace(err) + // terminates the loop + return false } } if task.reader == nil { + var err error task.reader, err = task.GetBackEnd().reader() if err != nil { - return errors.Trace(err) + loopErr = errors.Trace(err) + // terminates the loop + return false } } + var err error event, err = task.reader.readNext() if err != nil { - return errors.Trace(err) + loopErr = errors.Trace(err) + // terminates the loop + return false } if event == nil { @@ -201,17 +221,22 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } if event.CRTs > minResolvedTs { - pendingSet[task] = event - continue + pendingSet.Store(task, event) + // continues the loop + return true } - pendingSet[task] = nil + pendingSet.Store(task, nil) workingSet[task] = struct{}{} heap.Push(sortHeap, &sortItem{ entry: event, data: task, }) + return true + }) + if loopErr != nil { + return errors.Trace(loopErr) } resolvedTicker := time.NewTicker(1 * time.Second) @@ -219,9 +244,15 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch retire := func(task *flushTask) error { delete(workingSet, task) - if pendingSet[task] != nil { + cached, ok := pendingSet.Load(task) + if !ok { + log.Panic("task not found in pendingSet") + } + + if cached != nil { return nil } + nextEvent, err := task.reader.readNext() if err != nil { _ = task.reader.resetAndClose() // prevents fd leak @@ -230,7 +261,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } if nextEvent == nil { - delete(pendingSet, task) + pendingSet.Delete(task) err := task.reader.resetAndClose() if err != nil { @@ -243,7 +274,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return errors.Trace(err) } } else { - pendingSet[task] = nextEvent + pendingSet.Store(task, nextEvent) if nextEvent.CRTs < minResolvedTs { log.Panic("remaining event CRTs too small", zap.Uint64("next-ts", nextEvent.CRTs), @@ -335,7 +366,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch if event == nil { // EOF delete(workingSet, task) - delete(pendingSet, task) + pendingSet.Delete(task) err := task.reader.resetAndClose() if err != nil { @@ -354,7 +385,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.RawKV.OpType == model.OpTypeResolved) { // we have processed all events from this task that need to be processed in this merge if event.CRTs > minResolvedTs || event.RawKV.OpType != model.OpTypeResolved { - pendingSet[task] = event + pendingSet.Store(task, event) } err := retire(task) if err != nil { @@ -405,10 +436,10 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - resolveTicker := time.NewTicker(1 * time.Second) - defer resolveTicker.Stop() - + resolvedTsNotifier := ¬ify.Notifier{} + defer resolvedTsNotifier.Close() errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { for { var task *flushTask @@ -426,33 +457,8 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - taskBuf.put(task) - } - }) - - errg.Go(func() error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - task, err := taskBuf.get(ctx) - if err != nil { - return errors.Trace(err) - } - - if task == nil { - tableID, tableName := util.TableIDFromCtx(ctx) - log.Debug("Merger buffer exhausted and is closed, exiting", - zap.Int64("table-id", tableID), - zap.String("table-name", tableName), - zap.Uint64("max-output", minResolvedTs)) - return nil - } - if !task.isEmpty { - pendingSet[task] = nil + pendingSet.Store(task, nil) } // otherwise it is an empty flush if lastResolvedTs[task.heapSorterID] < task.maxResolvedTs { @@ -467,10 +473,41 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } if minTemp > minResolvedTs { - minResolvedTs = minTemp - err := onMinResolvedTsUpdate() - if err != nil { - return errors.Trace(err) + atomic.StoreUint64(&minResolvedTs, minTemp) + resolvedTsNotifier.Notify() + } + } + }) + + errg.Go(func() error { + resolvedTsReceiver, err := resolvedTsNotifier.NewReceiver(time.Second * 1) + if err != nil { + if cerrors.ErrOperateOnClosedNotifier.Equal(err) { + // This won't happen unless `resolvedTsNotifier` has been closed, which is + // impossible at this point. + log.Panic("unexpected error", zap.Error(err)) + } + return errors.Trace(err) + } + + defer resolvedTsReceiver.Stop() + + var lastResolvedTs uint64 + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-resolvedTsReceiver.C: + curResolvedTs := atomic.LoadUint64(&minResolvedTs) + if curResolvedTs > lastResolvedTs { + err := onMinResolvedTsUpdate(curResolvedTs) + if err != nil { + return errors.Trace(err) + } + } else if curResolvedTs < lastResolvedTs { + log.Panic("resolved-ts regressed in sorter", + zap.Uint64("cur-resolved-ts", curResolvedTs), + zap.Uint64("last-resolved-ts", lastResolvedTs)) } } } @@ -503,96 +540,8 @@ func printError(err error) error { !strings.Contains(err.Error(), "context deadline exceeded") && cerrors.ErrAsyncIOCancelled.NotEqual(errors.Cause(err)) { - log.Warn("Unified Sorter: Error detected", zap.Error(err)) + log.Warn("Unified Sorter: Error detected", zap.Error(err), zap.Stack("stack")) } log.Debug("Unified Sorter error debug", zap.Error(err), zap.Stack("stack")) return err } - -// taskBuffer is used to store pending flushTasks. -// The design purpose is to reduce the backpressure caused by a congested output chan of the merger, -// so that heapSorter does not block. -type taskBuffer struct { - mu sync.Mutex // mu only protects queue - queue deque.Deque - - notifier notify.Notifier - len *int64 - isClosed int32 -} - -func newTaskBuffer(len *int64) *taskBuffer { - return &taskBuffer{ - queue: deque.NewDeque(), - notifier: notify.Notifier{}, - len: len, - } -} - -func (b *taskBuffer) put(task *flushTask) { - b.mu.Lock() - defer b.mu.Unlock() - - b.queue.PushBack(task) - prevCount := atomic.AddInt64(b.len, 1) - - if prevCount == 1 { - b.notifier.Notify() - } -} - -func (b *taskBuffer) get(ctx context.Context) (*flushTask, error) { - if atomic.LoadInt32(&b.isClosed) == 1 && atomic.LoadInt64(b.len) == 0 { - return nil, nil - } - - if atomic.LoadInt64(b.len) == 0 { - recv, err := b.notifier.NewReceiver(time.Millisecond * 50) - if err != nil { - return nil, errors.Trace(err) - } - defer recv.Stop() - - startTime := time.Now() - for atomic.LoadInt64(b.len) == 0 { - select { - case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) - case <-recv.C: - // Note that there can be spurious wake-ups - } - - if atomic.LoadInt32(&b.isClosed) == 1 && atomic.LoadInt64(b.len) == 0 { - return nil, nil - } - - if time.Since(startTime) > time.Second*5 { - log.Debug("taskBuffer reading blocked for too long", zap.Duration("duration", time.Since(startTime))) - } - } - } - - postCount := atomic.AddInt64(b.len, -1) - if postCount < 0 { - log.Panic("taskBuffer: len < 0, report a bug", zap.Int64("len", postCount)) - } - - b.mu.Lock() - defer b.mu.Unlock() - - ret := b.queue.PopFront() - if ret == nil { - log.Panic("taskBuffer: PopFront() returned nil, report a bug") - } - - return ret.(*flushTask), nil -} - -func (b *taskBuffer) setClosed() { - atomic.SwapInt32(&b.isClosed, 1) -} - -// Only call this when the taskBuffer is NEVER going to be accessed again. -func (b *taskBuffer) close() { - b.notifier.Close() -} diff --git a/cdc/puller/sorter/merger_test.go b/cdc/puller/sorter/merger_test.go index caa67db575d..afc23a47dbd 100644 --- a/cdc/puller/sorter/merger_test.go +++ b/cdc/puller/sorter/merger_test.go @@ -18,8 +18,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" - "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -107,7 +105,7 @@ func (s *sorterSuite) TestMergerSingleHeap(c *check.C) { outChan := make(chan *model.PolymorphicEvent, 1024) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) totalCount := 0 @@ -178,7 +176,7 @@ func (s *sorterSuite) TestMergerSingleHeapRetire(c *check.C) { outChan := make(chan *model.PolymorphicEvent, 1024) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) totalCount := 0 @@ -259,7 +257,7 @@ func (s *sorterSuite) TestMergerSortDelay(c *check.C) { outChan := make(chan *model.PolymorphicEvent, 1024) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) totalCount := 0 @@ -339,7 +337,7 @@ func (s *sorterSuite) TestMergerCancel(c *check.C) { outChan := make(chan *model.PolymorphicEvent, 1024) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) builder := newMockFlushTaskBuilder() @@ -394,7 +392,7 @@ func (s *sorterSuite) TestMergerCancelWithUnfinishedFlushTasks(c *check.C) { outChan := make(chan *model.PolymorphicEvent, 1024) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) builder := newMockFlushTaskBuilder() @@ -457,7 +455,7 @@ func (s *sorterSuite) TestMergerCloseChannel(c *check.C) { close(task1.finished) wg.Go(func() error { - return runMerger(ctx, 1, inChan, outChan, func() {}, nil) + return runMerger(ctx, 1, inChan, outChan, func() {}) }) wg.Go(func() error { @@ -480,115 +478,76 @@ func (s *sorterSuite) TestMergerCloseChannel(c *check.C) { c.Assert(atomic.LoadInt64(&backEndCounterForTest), check.Equals, int64(0)) } -// TestTaskBufferBasic tests the basic functionality of TaskBuffer -func (s *sorterSuite) TestTaskBufferBasic(c *check.C) { +// TestMergerOutputBlocked simulates a situation where the output channel is blocked for +// a significant period of time. +func (s *sorterSuite) TestMergerOutputBlocked(c *check.C) { defer testleak.AfterTest(c)() + err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") + c.Assert(err, check.IsNil) + defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug") //nolint:errcheck - ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*25) defer cancel() - errg, ctx := errgroup.WithContext(ctx) - var bufLen int64 - taskBuf := newTaskBuffer(&bufLen) - - // run producer - errg.Go(func() error { - for i := 0; i < 10000; i++ { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } - - var dummyTask flushTask - taskBuf.put(&dummyTask) - } - - taskBuf.setClosed() - return nil - }) - - // run consumer - errg.Go(func() error { - for i := 0; i < 10001; i++ { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } - - task, err := taskBuf.get(ctx) - c.Assert(err, check.IsNil) - - if i == 10000 { - c.Assert(task, check.IsNil) - taskBuf.close() - return nil - } + wg, ctx := errgroup.WithContext(ctx) + // use unbuffered channel to make sure that the input has been processed + inChan := make(chan *flushTask) + // make a small channel to test blocking + outChan := make(chan *model.PolymorphicEvent, 1) - c.Assert(task, check.NotNil) - } - c.Fail() // unreachable - return nil + wg.Go(func() error { + return runMerger(ctx, 1, inChan, outChan, func() {}) }) - c.Assert(errg.Wait(), check.IsNil) - c.Assert(bufLen, check.Equals, int64(0)) -} - -// TestTaskBufferBasic tests the situation where the taskBuffer's consumer is -// first starved and then exit due to taskBuf shutdown. -func (s *sorterSuite) TestTaskBufferStarveAndClose(c *check.C) { - defer testleak.AfterTest(c)() - - ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) - defer cancel() - errg, ctx := errgroup.WithContext(ctx) - var bufLen int64 - taskBuf := newTaskBuffer(&bufLen) - - // run producer - errg.Go(func() error { - for i := 0; i < 1000; i++ { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } + totalCount := 0 + builder := newMockFlushTaskBuilder() + task1 := builder.generateRowChanges(1000, 100000, 2048).addResolved(100001).build() + totalCount += builder.totalCount + builder = newMockFlushTaskBuilder() + task2 := builder.generateRowChanges(100002, 200000, 2048).addResolved(200001).build() + totalCount += builder.totalCount + builder = newMockFlushTaskBuilder() + task3 := builder.generateRowChanges(200002, 300000, 2048).addResolved(300001).build() + totalCount += builder.totalCount - var dummyTask flushTask - taskBuf.put(&dummyTask) - } + wg.Go(func() error { + inChan <- task1 + close(task1.finished) + inChan <- task2 + close(task2.finished) + inChan <- task3 + close(task3.finished) - // starve the consumer - time.Sleep(3 * time.Second) - taskBuf.setClosed() return nil }) - // run consumer - errg.Go(func() error { - for i := 0; i < 1001; i++ { + wg.Go(func() error { + time.Sleep(10 * time.Second) + count := 0 + lastTs := uint64(0) + lastResolved := uint64(0) + for { select { case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } - - task, err := taskBuf.get(ctx) - if i < 1000 { - c.Assert(task, check.NotNil) - c.Assert(err, check.IsNil) - } else { - c.Assert(task, check.IsNil) - c.Assert(err, check.IsNil) - taskBuf.close() - return nil + return ctx.Err() + case event := <-outChan: + switch event.RawKV.OpType { + case model.OpTypePut: + count++ + c.Assert(event.CRTs, check.GreaterEqual, lastTs) + c.Assert(event.CRTs, check.GreaterEqual, lastResolved) + lastTs = event.CRTs + case model.OpTypeResolved: + c.Assert(event.CRTs, check.GreaterEqual, lastResolved) + lastResolved = event.CRTs + } + if lastResolved >= 300001 { + c.Assert(count, check.Equals, totalCount) + cancel() + return nil + } } } - c.Fail() // unreachable - return nil }) - - c.Assert(errg.Wait(), check.IsNil) - c.Assert(bufLen, check.Equals, int64(0)) + c.Assert(wg.Wait(), check.ErrorMatches, ".*context canceled.*") + c.Assert(atomic.LoadInt64(&backEndCounterForTest), check.Equals, int64(0)) } diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index aa562cdca9a..6b1968a9a65 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -17,8 +17,6 @@ import ( "context" "os" "sync" - "sync/atomic" - "time" cerror "github.com/pingcap/ticdc/pkg/errors" @@ -31,6 +29,15 @@ import ( "golang.org/x/sync/errgroup" ) +<<<<<<< HEAD +======= +const ( + inputChSize = 128 + outputChSize = 128 + heapCollectChSize = 128 // this should be not be too small, to guarantee IO concurrency +) + +>>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) // UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use. type UnifiedSorter struct { inputCh chan *model.PolymorphicEvent @@ -189,9 +196,8 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { } }) - var mergerBufLen int64 errg.Go(func() error { - return printError(runMerger(subctx, numConcurrentHeaps, heapSorterCollectCh, s.outputCh, ioCancelFunc, &mergerBufLen)) + return printError(runMerger(subctx, numConcurrentHeaps, heapSorterCollectCh, s.outputCh, ioCancelFunc)) }) errg.Go(func() error { @@ -207,6 +213,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { nextSorterID := 0 for { +<<<<<<< HEAD // tentative value 1280000 for atomic.LoadInt64(&mergerBufLen) > 1280000 { after := time.After(1 * time.Second) @@ -216,6 +223,8 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { case <-after: } } +======= +>>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) select { case <-subctx.Done(): return subctx.Err() From 5409e39af2958936e95ba1eb7fac166ef040c3ca Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 10 Aug 2021 17:37:42 +0800 Subject: [PATCH 2/3] resolve conflict --- cdc/puller/sorter/merger.go | 29 ----------------------------- cdc/puller/sorter/unified_sorter.go | 24 +----------------------- 2 files changed, 1 insertion(+), 52 deletions(-) diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 139e2df819c..5738256dde0 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -130,33 +130,6 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch workingSet = make(map[*flushTask]struct{}) sortHeap := new(sortHeap) -<<<<<<< HEAD - defer func() { - // clean up - cleanUpCtx, cancel := context.WithTimeout(context.TODO(), 2*time.Second) - defer cancel() - for task := range workingSet { - select { - case <-cleanUpCtx.Done(): - // This should only happen when the workerpool is being cancelled, in which case - // the whole CDC process is exiting, so the leaked resource should not matter. - log.Warn("Unified Sorter: merger cleaning up timeout.") - return - case err := <-task.finished: - _ = printError(err) - } - - if task.reader != nil { - err := task.reader.resetAndClose() - task.reader = nil - _ = printError(err) - } - _ = printError(task.dealloc()) - } - }() - - for task, cache := range pendingSet { -======= // loopErr is used to return an error out of the closure taken by `pendingSet.Range`. var loopErr error // NOTE 1: We can block the closure passed to `pendingSet.Range` WITHOUT worrying about @@ -173,8 +146,6 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch if iCache != nil { cache = iCache.(*model.PolymorphicEvent) } - ->>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) if task.tsLowerBound > minResolvedTs { // the condition above implies that for any event in task.backend, CRTs > minResolvedTs. return true diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index 6b1968a9a65..f32c64ad998 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -18,26 +18,16 @@ import ( "os" "sync" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/util" "golang.org/x/sync/errgroup" ) -<<<<<<< HEAD -======= -const ( - inputChSize = 128 - outputChSize = 128 - heapCollectChSize = 128 // this should be not be too small, to guarantee IO concurrency -) - ->>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) // UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use. type UnifiedSorter struct { inputCh chan *model.PolymorphicEvent @@ -213,18 +203,6 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { nextSorterID := 0 for { -<<<<<<< HEAD - // tentative value 1280000 - for atomic.LoadInt64(&mergerBufLen) > 1280000 { - after := time.After(1 * time.Second) - select { - case <-subctx.Done(): - return subctx.Err() - case <-after: - } - } -======= ->>>>>>> e9f799b8 (*: fix deadlock in new processor (#1987)) select { case <-subctx.Done(): return subctx.Err() From 472b2ab7fbad80cb409dd7c080336d32a3880c3f Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Tue, 10 Aug 2021 18:02:59 +0800 Subject: [PATCH 3/3] fix check --- cdc/puller/sorter/unified_sorter.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index b92197f0f8b..56539f07f4f 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -32,11 +32,6 @@ const ( inputChSize = 128 outputChSize = 128 heapCollectChSize = 128 // this should be not be too small, to guarantee IO concurrency - // maxOpenHeapNum is the maximum number of allowed pending chunks in memory OR on-disk. - // This constant is a worst case upper limit, and setting a large number DOES NOT imply actually - // allocating these resources. This constant is PER TABLE. - // TODO refactor this out - maxOpenHeapNum = 1280000 ) // UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use.