Skip to content

Commit

Permalink
cherry pick pingcap#1619 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
liuzix authored and ti-srebot committed Apr 16, 2021
1 parent a366f30 commit 3a0a74e
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 8 deletions.
5 changes: 5 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,12 @@ func (p *backEndPool) terminate() {
defer close(p.cancelCh)
// the background goroutine can be considered terminated here

log.Debug("Unified Sorter terminating...")
p.cancelRWLock.Lock()
defer p.cancelRWLock.Unlock()
p.isTerminating = true

log.Debug("Unified Sorter cleaning up before exiting")
// any new allocs and deallocs will not succeed from this point
// accessing p.cache without atomics is safe from now

Expand All @@ -254,11 +256,14 @@ func (p *backEndPool) terminate() {
log.Warn("Unified Sorter clean-up failed", zap.Error(err))
}
for _, file := range files {
log.Debug("Unified Sorter backEnd removing file", zap.String("file", file))
err = os.RemoveAll(file)
if err != nil {
log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err))
}
}

log.Debug("Unified Sorter backEnd terminated")
}

func (p *backEndPool) sorterMemoryUsage() int64 {
Expand Down
51 changes: 50 additions & 1 deletion cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,38 @@ const (
type flushTask struct {
taskID int
heapSorterID int
backend backEnd
reader backEndReader
tsLowerBound uint64
maxResolvedTs uint64
finished chan error
dealloc func() error
dataSize int64
lastTs uint64 // for debugging TODO remove
<<<<<<< HEAD
=======
canceller *asyncCanceller

isEmpty bool // read only field

deallocLock sync.RWMutex
isDeallocated bool // do not access directly
backend backEnd // do not access directly
}

func (t *flushTask) markDeallocated() {
t.deallocLock.Lock()
defer t.deallocLock.Unlock()

t.backend = nil
t.isDeallocated = true
}

func (t *flushTask) GetBackEnd() backEnd {
t.deallocLock.RLock()
defer t.deallocLock.RUnlock()

return t.backend
>>>>>>> 2277431... sorter: fix Unified Sorter error handling & add more unit test cases (#1619)
}

type heapSorter struct {
Expand Down Expand Up @@ -99,6 +123,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
isEmptyFlush := h.heap.Len() == 0
var finishCh chan error
if !isEmptyFlush {
failpoint.Inject("InjectErrorBackEndAlloc", func() {
failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause())
})

var err error
backEnd, err = pool.alloc(ctx)
if err != nil {
Expand All @@ -115,14 +143,25 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
tsLowerBound: lowerBound,
maxResolvedTs: maxResolvedTs,
finished: finishCh,
<<<<<<< HEAD
=======
canceller: h.canceller,
isEmpty: isEmptyFlush,
>>>>>>> 2277431... sorter: fix Unified Sorter error handling & add more unit test cases (#1619)
}
h.taskCounter++

var oldHeap sortHeap
if !isEmptyFlush {
task.dealloc = func() error {
<<<<<<< HEAD
if task.backend != nil {
task.backend = nil
=======
backEnd := task.GetBackEnd()
if backEnd != nil {
defer task.markDeallocated()
>>>>>>> 2277431... sorter: fix Unified Sorter error handling & add more unit test cases (#1619)
return pool.dealloc(backEnd)
}
return nil
Expand All @@ -131,6 +170,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
h.heap = make(sortHeap, 0, 65536)
} else {
task.dealloc = func() error {
task.markDeallocated()
return nil
}
}
Expand Down Expand Up @@ -164,6 +204,15 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
close(task.finished)
}()

<<<<<<< HEAD
=======
failpoint.Inject("InjectErrorBackEndWrite", func() {
task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause()
failpoint.Return()
})

counter := 0
>>>>>>> 2277431... sorter: fix Unified Sorter error handling & add more unit test cases (#1619)
for oldHeap.Len() > 0 {
event := heap.Pop(&oldHeap).(*sortItem).entry
err := writer.writeNext(event)
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/sorter/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
}

if task.reader == nil {
task.reader, err = task.backend.reader()
task.reader, err = task.GetBackEnd().reader()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return nil
}

if task.backend != nil {
if !task.isEmpty {
pendingSet[task] = nil
} // otherwise it is an empty flush

Expand Down
13 changes: 12 additions & 1 deletion cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,6 +110,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
}
// must wait for all writers to exit to close the channel.
close(heapSorterCollectCh)
failpoint.Inject("InjectHeapSorterExitDelay", func() {})
}()

for {
Expand Down Expand Up @@ -152,6 +153,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
}
err := sorter.poolHandle.AddEvent(subctx, event)
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -169,6 +175,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
err := heapSorters[targetID].poolHandle.AddEvent(subctx, event)
if err != nil {
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
return errors.Trace(err)
}
metricSorterConsumeCount.WithLabelValues("kv").Inc()
Expand Down
Loading

0 comments on commit 3a0a74e

Please sign in to comment.