Skip to content

Commit

Permalink
sorter: fix Unified Sorter error handling & add more unit test cases (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Apr 16, 2021
1 parent cc8eadf commit 2277431
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 21 deletions.
5 changes: 5 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ func (p *backEndPool) terminate() {
defer close(p.cancelCh)
// the background goroutine can be considered terminated here

log.Debug("Unified Sorter terminating...")
p.cancelRWLock.Lock()
defer p.cancelRWLock.Unlock()
p.isTerminating = true

log.Debug("Unified Sorter cleaning up before exiting")
// any new allocs and deallocs will not succeed from this point
// accessing p.cache without atomics is safe from now

Expand All @@ -258,11 +260,14 @@ func (p *backEndPool) terminate() {
log.Warn("Unified Sorter clean-up failed", zap.Error(err))
}
for _, file := range files {
log.Debug("Unified Sorter backEnd removing file", zap.String("file", file))
err = os.RemoveAll(file)
if err != nil {
log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err))
}
}

log.Debug("Unified Sorter backEnd terminated")
}

func (p *backEndPool) sorterMemoryUsage() int64 {
Expand Down
42 changes: 35 additions & 7 deletions cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,35 @@ const (
type flushTask struct {
taskID int
heapSorterID int
backend backEnd
reader backEndReader
tsLowerBound uint64
maxResolvedTs uint64
finished chan error
dealloc func() error
isDeallocated int32
dataSize int64
lastTs uint64 // for debugging TODO remove
canceller *asyncCanceller

isEmpty bool // read only field

deallocLock sync.RWMutex
isDeallocated bool // do not access directly
backend backEnd // do not access directly
}

func (t *flushTask) markDeallocated() {
t.deallocLock.Lock()
defer t.deallocLock.Unlock()

t.backend = nil
t.isDeallocated = true
}

func (t *flushTask) GetBackEnd() backEnd {
t.deallocLock.RLock()
defer t.deallocLock.RUnlock()

return t.backend
}

type heapSorter struct {
Expand Down Expand Up @@ -104,6 +123,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
isEmptyFlush := h.heap.Len() == 0
var finishCh chan error
if !isEmptyFlush {
failpoint.Inject("InjectErrorBackEndAlloc", func() {
failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause())
})

var err error
backEnd, err = pool.alloc(ctx)
if err != nil {
Expand All @@ -121,17 +144,16 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
maxResolvedTs: maxResolvedTs,
finished: finishCh,
canceller: h.canceller,
isEmpty: isEmptyFlush,
}
h.taskCounter++

var oldHeap sortHeap
if !isEmptyFlush {
task.dealloc = func() error {
if atomic.SwapInt32(&task.isDeallocated, 1) == 1 {
return nil
}
if task.backend != nil {
task.backend = nil
backEnd := task.GetBackEnd()
if backEnd != nil {
defer task.markDeallocated()
return pool.dealloc(backEnd)
}
return nil
Expand All @@ -140,6 +162,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
h.heap = make(sortHeap, 0, 65536)
} else {
task.dealloc = func() error {
task.markDeallocated()
return nil
}
}
Expand Down Expand Up @@ -190,6 +213,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
close(task.finished)
}()

failpoint.Inject("InjectErrorBackEndWrite", func() {
task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause()
failpoint.Return()
})

counter := 0
for oldHeap.Len() > 0 {
failpoint.Inject("asyncFlushInProcessDelay", func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/sorter/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
}

if task.reader == nil {
task.reader, err = task.backend.reader()
task.reader, err = task.GetBackEnd().reader()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return nil
}

if task.backend != nil {
if !task.isEmpty {
pendingSet[task] = nil
} // otherwise it is an empty flush

Expand Down
137 changes: 127 additions & 10 deletions cdc/puller/sorter/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"testing"
"time"

"go.uber.org/zap/zapcore"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
Expand Down Expand Up @@ -76,7 +77,8 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) {

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
testSorter(ctx, c, sorter, 10000, true)
err = testSorter(ctx, c, sorter, 10000, true)
c.Assert(err, check.ErrorMatches, ".*context cancel.*")
}

func (s *sorterSuite) TestSorterCancel(c *check.C) {
Expand All @@ -102,7 +104,8 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {

finishedCh := make(chan struct{})
go func() {
testSorter(ctx, c, sorter, 10000000, true)
err := testSorter(ctx, c, sorter, 10000000, true)
c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*")
close(finishedCh)
}()

Expand All @@ -116,7 +119,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {
log.Info("Sorter successfully cancelled")
}

func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) {
func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) error {
err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)")
if err != nil {
log.Panic("Could not enable failpoint", zap.Error(err))
Expand Down Expand Up @@ -213,11 +216,7 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun
}
})

err = errg.Wait()
if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded {
return
}
c.Assert(err, check.IsNil)
return errg.Wait()
}

func (s *sorterSuite) TestSortDirConfigLocal(c *check.C) {
Expand Down Expand Up @@ -310,7 +309,125 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
for i := 0; i < 5; i++ {
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
testSorter(ctx, c, sorter, 100000000, true)
err := testSorter(ctx, c, sorter, 100000000, true)
c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*")
cancel()
}
}

func (s *sorterSuite) TestSorterIOError(c *check.C) {
defer testleak.AfterTest(c)()
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
}()

finishedCh := make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected alloc error.*")
close(finishedCh)
}()

after := time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}

_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
// enable the failpoint to simulate backEnd write error (usually would happen when writing to a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite")
}()

finishedCh = make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected write error.*")
close(finishedCh)
}()

after = time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}
}

func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) {
defer testleak.AfterTest(c)()
defer UnifiedSorterCleanUp()

log.SetLevel(zapcore.DebugLevel)
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay", "sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay")
}()

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
}()

finishedCh := make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected alloc error.*")
close(finishedCh)
}()

after := time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}
}
13 changes: 12 additions & 1 deletion cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,6 +173,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
}
// must wait for all writers to exit to close the channel.
close(heapSorterCollectCh)
failpoint.Inject("InjectHeapSorterExitDelay", func() {})
}()

select {
Expand Down Expand Up @@ -223,6 +224,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
}
err := sorter.poolHandle.AddEvent(subctx, event)
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -240,6 +246,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
err := heapSorters[targetID].poolHandle.AddEvent(subctx, event)
if err != nil {
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
return errors.Trace(err)
}
metricSorterConsumeCount.WithLabelValues("kv").Inc()
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,11 @@ error = '''
unified sorter backend is terminating
'''

["CDC:ErrUnifiedSorterIOError"]
error = '''
unified sorter IO error
'''

["CDC:ErrUnknownKVEventType"]
error = '''
unknown kv event type: %v, entry: %v
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ var (
ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating"))
ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter"))
ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled"))

ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError"))
// processor errors
ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely"))

Expand Down

0 comments on commit 2277431

Please sign in to comment.