Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter: fix Unified Sorter error handling & add more unit test cases #1619

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ func (p *backEndPool) terminate() {
defer close(p.cancelCh)
// the background goroutine can be considered terminated here

log.Debug("Unified Sorter terminating...")
p.cancelRWLock.Lock()
defer p.cancelRWLock.Unlock()
p.isTerminating = true

log.Debug("Unified Sorter cleaning up before exiting")
// any new allocs and deallocs will not succeed from this point
// accessing p.cache without atomics is safe from now

Expand All @@ -258,11 +260,14 @@ func (p *backEndPool) terminate() {
log.Warn("Unified Sorter clean-up failed", zap.Error(err))
}
for _, file := range files {
log.Debug("Unified Sorter backEnd removing file", zap.String("file", file))
err = os.RemoveAll(file)
if err != nil {
log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err))
}
}

log.Debug("Unified Sorter backEnd terminated")
}

func (p *backEndPool) sorterMemoryUsage() int64 {
Expand Down
42 changes: 35 additions & 7 deletions cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,35 @@ const (
type flushTask struct {
taskID int
heapSorterID int
backend backEnd
reader backEndReader
tsLowerBound uint64
maxResolvedTs uint64
finished chan error
dealloc func() error
isDeallocated int32
dataSize int64
lastTs uint64 // for debugging TODO remove
canceller *asyncCanceller

isEmpty bool // read only field

deallocLock sync.RWMutex
isDeallocated bool // do not access directly
backend backEnd // do not access directly
}

func (t *flushTask) markDeallocated() {
t.deallocLock.Lock()
defer t.deallocLock.Unlock()

t.backend = nil
t.isDeallocated = true
}

func (t *flushTask) GetBackEnd() backEnd {
t.deallocLock.RLock()
defer t.deallocLock.RUnlock()

return t.backend
}

type heapSorter struct {
Expand Down Expand Up @@ -104,6 +123,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
isEmptyFlush := h.heap.Len() == 0
var finishCh chan error
if !isEmptyFlush {
failpoint.Inject("InjectErrorBackEndAlloc", func() {
failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause())
})

var err error
backEnd, err = pool.alloc(ctx)
if err != nil {
Expand All @@ -121,17 +144,16 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
maxResolvedTs: maxResolvedTs,
finished: finishCh,
canceller: h.canceller,
isEmpty: isEmptyFlush,
}
h.taskCounter++

var oldHeap sortHeap
if !isEmptyFlush {
task.dealloc = func() error {
if atomic.SwapInt32(&task.isDeallocated, 1) == 1 {
return nil
}
if task.backend != nil {
task.backend = nil
backEnd := task.GetBackEnd()
if backEnd != nil {
defer task.markDeallocated()
Comment on lines +154 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if

  1. In L154-L155, backEnd is not nil
  2. When calling pool.dealloc(backEnd), task is marked as deallocated and task.backend becomes nil, through backEnd is not nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dealloc will not race with itself, because dealloc calls from different goroutines are always synchronized by a write to finished channel.

return pool.dealloc(backEnd)
}
return nil
Expand All @@ -140,6 +162,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
h.heap = make(sortHeap, 0, 65536)
} else {
task.dealloc = func() error {
task.markDeallocated()
return nil
}
}
Expand Down Expand Up @@ -190,6 +213,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
close(task.finished)
}()

failpoint.Inject("InjectErrorBackEndWrite", func() {
task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause()
failpoint.Return()
})

counter := 0
for oldHeap.Len() > 0 {
failpoint.Inject("asyncFlushInProcessDelay", func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/sorter/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
}

if task.reader == nil {
task.reader, err = task.backend.reader()
task.reader, err = task.GetBackEnd().reader()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
return nil
}

if task.backend != nil {
if !task.isEmpty {
pendingSet[task] = nil
} // otherwise it is an empty flush

Expand Down
137 changes: 127 additions & 10 deletions cdc/puller/sorter/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"testing"
"time"

"go.uber.org/zap/zapcore"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
Expand Down Expand Up @@ -76,7 +77,8 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) {

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
testSorter(ctx, c, sorter, 10000, true)
err = testSorter(ctx, c, sorter, 10000, true)
c.Assert(err, check.ErrorMatches, ".*context cancel.*")
}

func (s *sorterSuite) TestSorterCancel(c *check.C) {
Expand All @@ -102,7 +104,8 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {

finishedCh := make(chan struct{})
go func() {
testSorter(ctx, c, sorter, 10000000, true)
err := testSorter(ctx, c, sorter, 10000000, true)
c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*")
close(finishedCh)
}()

Expand All @@ -116,7 +119,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {
log.Info("Sorter successfully cancelled")
}

func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) {
func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) error {
err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)")
if err != nil {
log.Panic("Could not enable failpoint", zap.Error(err))
Expand Down Expand Up @@ -213,11 +216,7 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun
}
})

err = errg.Wait()
if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded {
return
}
c.Assert(err, check.IsNil)
return errg.Wait()
}

func (s *sorterSuite) TestSortDirConfigLocal(c *check.C) {
Expand Down Expand Up @@ -310,7 +309,125 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
for i := 0; i < 5; i++ {
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
testSorter(ctx, c, sorter, 100000000, true)
err := testSorter(ctx, c, sorter, 100000000, true)
c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*")
cancel()
}
}

func (s *sorterSuite) TestSorterIOError(c *check.C) {
defer testleak.AfterTest(c)()
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
}()

finishedCh := make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected alloc error.*")
close(finishedCh)
}()

after := time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}

_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
// enable the failpoint to simulate backEnd write error (usually would happen when writing to a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite")
}()

finishedCh = make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected write error.*")
close(finishedCh)
}()

after = time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}
}

func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) {
defer testleak.AfterTest(c)()
defer UnifiedSorterCleanUp()

log.SetLevel(zapcore.DebugLevel)
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
c.Assert(err, check.IsNil)
sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay", "sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay")
}()

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc")
}()

finishedCh := make(chan struct{})
go func() {
err := testSorter(ctx, c, sorter, 100000, true)
c.Assert(err, check.ErrorMatches, ".*injected alloc error.*")
close(finishedCh)
}()

after := time.After(60 * time.Second)
select {
case <-after:
c.Fatal("TestSorterIOError timed out")
case <-finishedCh:
}
}
13 changes: 12 additions & 1 deletion cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,6 +173,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
}
// must wait for all writers to exit to close the channel.
close(heapSorterCollectCh)
failpoint.Inject("InjectHeapSorterExitDelay", func() {})
}()

select {
Expand Down Expand Up @@ -223,6 +224,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
}
err := sorter.poolHandle.AddEvent(subctx, event)
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -240,6 +246,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
default:
err := heapSorters[targetID].poolHandle.AddEvent(subctx, event)
if err != nil {
if cerror.ErrWorkerPoolHandleCancelled.Equal(err) {
// no need to report ErrWorkerPoolHandleCancelled,
// as it may confuse the user
return nil
}
return errors.Trace(err)
}
metricSorterConsumeCount.WithLabelValues("kv").Inc()
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,11 @@ error = '''
unified sorter backend is terminating
'''

["CDC:ErrUnifiedSorterIOError"]
error = '''
unified sorter IO error
'''

["CDC:ErrUnknownKVEventType"]
error = '''
unknown kv event type: %v, entry: %v
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ var (
ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating"))
ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter"))
ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled"))

ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError"))
// processor errors
ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely"))

Expand Down