From 22774319c392f564959e8cd2087549c8aa65479a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 16 Apr 2021 14:57:51 +0800 Subject: [PATCH] sorter: fix Unified Sorter error handling & add more unit test cases (#1619) --- cdc/puller/sorter/backend_pool.go | 5 + cdc/puller/sorter/heap_sorter.go | 42 +++++++-- cdc/puller/sorter/merger.go | 4 +- cdc/puller/sorter/sorter_test.go | 137 ++++++++++++++++++++++++++-- cdc/puller/sorter/unified_sorter.go | 13 ++- errors.toml | 5 + pkg/errors/errors.go | 2 +- 7 files changed, 187 insertions(+), 21 deletions(-) diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 14f9f0f80ad..04583853f3e 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -232,10 +232,12 @@ func (p *backEndPool) terminate() { defer close(p.cancelCh) // the background goroutine can be considered terminated here + log.Debug("Unified Sorter terminating...") p.cancelRWLock.Lock() defer p.cancelRWLock.Unlock() p.isTerminating = true + log.Debug("Unified Sorter cleaning up before exiting") // any new allocs and deallocs will not succeed from this point // accessing p.cache without atomics is safe from now @@ -258,11 +260,14 @@ func (p *backEndPool) terminate() { log.Warn("Unified Sorter clean-up failed", zap.Error(err)) } for _, file := range files { + log.Debug("Unified Sorter backEnd removing file", zap.String("file", file)) err = os.RemoveAll(file) if err != nil { log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err)) } } + + log.Debug("Unified Sorter backEnd terminated") } func (p *backEndPool) sorterMemoryUsage() int64 { diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go index 70aae9251fb..a82c1cde67d 100644 --- a/cdc/puller/sorter/heap_sorter.go +++ b/cdc/puller/sorter/heap_sorter.go @@ -38,16 +38,35 @@ const ( type flushTask struct { taskID int heapSorterID int - backend backEnd reader backEndReader tsLowerBound uint64 maxResolvedTs uint64 finished chan error dealloc func() error - isDeallocated int32 dataSize int64 lastTs uint64 // for debugging TODO remove canceller *asyncCanceller + + isEmpty bool // read only field + + deallocLock sync.RWMutex + isDeallocated bool // do not access directly + backend backEnd // do not access directly +} + +func (t *flushTask) markDeallocated() { + t.deallocLock.Lock() + defer t.deallocLock.Unlock() + + t.backend = nil + t.isDeallocated = true +} + +func (t *flushTask) GetBackEnd() backEnd { + t.deallocLock.RLock() + defer t.deallocLock.RUnlock() + + return t.backend } type heapSorter struct { @@ -104,6 +123,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { isEmptyFlush := h.heap.Len() == 0 var finishCh chan error if !isEmptyFlush { + failpoint.Inject("InjectErrorBackEndAlloc", func() { + failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause()) + }) + var err error backEnd, err = pool.alloc(ctx) if err != nil { @@ -121,17 +144,16 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { maxResolvedTs: maxResolvedTs, finished: finishCh, canceller: h.canceller, + isEmpty: isEmptyFlush, } h.taskCounter++ var oldHeap sortHeap if !isEmptyFlush { task.dealloc = func() error { - if atomic.SwapInt32(&task.isDeallocated, 1) == 1 { - return nil - } - if task.backend != nil { - task.backend = nil + backEnd := task.GetBackEnd() + if backEnd != nil { + defer task.markDeallocated() return pool.dealloc(backEnd) } return nil @@ -140,6 +162,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { h.heap = make(sortHeap, 0, 65536) } else { task.dealloc = func() error { + task.markDeallocated() return nil } } @@ -190,6 +213,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { close(task.finished) }() + failpoint.Inject("InjectErrorBackEndWrite", func() { + task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause() + failpoint.Return() + }) + counter := 0 for oldHeap.Len() > 0 { failpoint.Inject("asyncFlushInProcessDelay", func() { diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 4b4dd6bc78c..cc78ad5d51e 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -160,7 +160,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } if task.reader == nil { - task.reader, err = task.backend.reader() + task.reader, err = task.GetBackEnd().reader() if err != nil { return errors.Trace(err) } @@ -426,7 +426,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - if task.backend != nil { + if !task.isEmpty { pendingSet[task] = nil } // otherwise it is an empty flush diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index ad81e1c1e4f..73adff055af 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -21,8 +21,9 @@ import ( "testing" "time" + "go.uber.org/zap/zapcore" + "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" @@ -76,7 +77,8 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - testSorter(ctx, c, sorter, 10000, true) + err = testSorter(ctx, c, sorter, 10000, true) + c.Assert(err, check.ErrorMatches, ".*context cancel.*") } func (s *sorterSuite) TestSorterCancel(c *check.C) { @@ -102,7 +104,8 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { finishedCh := make(chan struct{}) go func() { - testSorter(ctx, c, sorter, 10000000, true) + err := testSorter(ctx, c, sorter, 10000000, true) + c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*") close(finishedCh) }() @@ -116,7 +119,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { log.Info("Sorter successfully cancelled") } -func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) { +func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) error { err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") if err != nil { log.Panic("Could not enable failpoint", zap.Error(err)) @@ -213,11 +216,7 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun } }) - err = errg.Wait() - if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded { - return - } - c.Assert(err, check.IsNil) + return errg.Wait() } func (s *sorterSuite) TestSortDirConfigLocal(c *check.C) { @@ -310,7 +309,125 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { for i := 0; i < 5; i++ { sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - testSorter(ctx, c, sorter, 100000000, true) + err := testSorter(ctx, c, sorter, 100000000, true) + c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*") cancel() } } + +func (s *sorterSuite) TestSorterIOError(c *check.C) { + defer testleak.AfterTest(c)() + defer UnifiedSorterCleanUp() + + conf := config.GetDefaultServerConfig() + conf.Sorter = &config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + NumWorkerPoolGoroutine: 4, + } + config.StoreGlobalServerConfig(conf) + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + }() + + finishedCh := make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 100000, true) + c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") + close(finishedCh) + }() + + after := time.After(60 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } + + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + // enable the failpoint to simulate backEnd write error (usually would happen when writing to a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite") + }() + + finishedCh = make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 100000, true) + c.Assert(err, check.ErrorMatches, ".*injected write error.*") + close(finishedCh) + }() + + after = time.After(60 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } +} + +func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { + defer testleak.AfterTest(c)() + defer UnifiedSorterCleanUp() + + log.SetLevel(zapcore.DebugLevel) + defer log.SetLevel(zapcore.InfoLevel) + + conf := config.GetDefaultServerConfig() + conf.Sorter = &config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + NumWorkerPoolGoroutine: 4, + } + config.StoreGlobalServerConfig(conf) + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay", "sleep(2000)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay") + }() + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + }() + + finishedCh := make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 100000, true) + c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") + close(finishedCh) + }() + + after := time.After(60 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } +} diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index ac87373bc51..f2e905b343a 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -173,6 +173,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { } // must wait for all writers to exit to close the channel. close(heapSorterCollectCh) + failpoint.Inject("InjectHeapSorterExitDelay", func() {}) }() select { @@ -223,6 +224,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: } err := sorter.poolHandle.AddEvent(subctx, event) + if cerror.ErrWorkerPoolHandleCancelled.Equal(err) { + // no need to report ErrWorkerPoolHandleCancelled, + // as it may confuse the user + return nil + } if err != nil { return errors.Trace(err) } @@ -240,6 +246,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: err := heapSorters[targetID].poolHandle.AddEvent(subctx, event) if err != nil { + if cerror.ErrWorkerPoolHandleCancelled.Equal(err) { + // no need to report ErrWorkerPoolHandleCancelled, + // as it may confuse the user + return nil + } return errors.Trace(err) } metricSorterConsumeCount.WithLabelValues("kv").Inc() diff --git a/errors.toml b/errors.toml index 44ca0ae987a..337ad0d2a52 100755 --- a/errors.toml +++ b/errors.toml @@ -716,6 +716,11 @@ error = ''' unified sorter backend is terminating ''' +["CDC:ErrUnifiedSorterIOError"] +error = ''' +unified sorter IO error +''' + ["CDC:ErrUnknownKVEventType"] error = ''' unknown kv event type: %v, entry: %v diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index acdb6da8661..d527dd15754 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -202,7 +202,7 @@ var ( ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter")) ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled")) - + ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError")) // processor errors ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely"))