From 17988b1a27383ba936ffdd90347d71db73ed29a6 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 8 Apr 2021 14:20:53 +0800 Subject: [PATCH 1/9] fix Unified Sorter error handling --- cdc/puller/sorter/heap_sorter.go | 9 ++++ cdc/puller/sorter/merger.go | 4 ++ cdc/puller/sorter/sorter_test.go | 83 +++++++++++++++++++++++++---- cdc/puller/sorter/unified_sorter.go | 10 ++++ pkg/errors/errors.go | 2 +- 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go index 70aae9251fb..794832db481 100644 --- a/cdc/puller/sorter/heap_sorter.go +++ b/cdc/puller/sorter/heap_sorter.go @@ -104,6 +104,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { isEmptyFlush := h.heap.Len() == 0 var finishCh chan error if !isEmptyFlush { + failpoint.Inject("InjectErrorBackEndAlloc", func() { + failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error"))).FastGenWithCause() + }) + var err error backEnd, err = pool.alloc(ctx) if err != nil { @@ -190,6 +194,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { close(task.finished) }() + failpoint.Inject("InjectErrorBackEndWrite", func() { + task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause() + failpoint.Return() + }) + counter := 0 for oldHeap.Len() > 0 { failpoint.Inject("asyncFlushInProcessDelay", func() { diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 4b4dd6bc78c..970ece82677 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -426,6 +426,10 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } + if atomic.LoadInt32(&task.isDeallocated) == 1 { + log.Panic("task has been deallocated, report a bug") + } + if task.backend != nil { pendingSet[task] = nil } // otherwise it is an empty flush diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index ad81e1c1e4f..938c69a7bdb 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" @@ -76,7 +75,8 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - testSorter(ctx, c, sorter, 10000, true) + err = testSorter(ctx, c, sorter, 10000, true) + c.Assert(err, check.ErrorMatches, ".*context cancel.*") } func (s *sorterSuite) TestSorterCancel(c *check.C) { @@ -102,7 +102,8 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { finishedCh := make(chan struct{}) go func() { - testSorter(ctx, c, sorter, 10000000, true) + err := testSorter(ctx, c, sorter, 10000000, true) + c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*") close(finishedCh) }() @@ -116,7 +117,7 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) { log.Info("Sorter successfully cancelled") } -func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) { +func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, count int, needWorkerPool bool) error { err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") if err != nil { log.Panic("Could not enable failpoint", zap.Error(err)) @@ -213,11 +214,7 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun } }) - err = errg.Wait() - if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded { - return - } - c.Assert(err, check.IsNil) + return errg.Wait() } func (s *sorterSuite) TestSortDirConfigLocal(c *check.C) { @@ -310,7 +307,73 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) { for i := 0; i < 5; i++ { sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - testSorter(ctx, c, sorter, 100000000, true) + err := testSorter(ctx, c, sorter, 100000000, true) + c.Assert(err, check.ErrorMatches, ".*context deadline exceeded.*") cancel() } } + +func (s *sorterSuite) TestSorterIOError(c *check.C) { + defer testleak.AfterTest(c)() + defer UnifiedSorterCleanUp() + + conf := config.GetDefaultServerConfig() + conf.Sorter = &config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + NumWorkerPoolGoroutine: 4, + } + config.StoreGlobalServerConfig(conf) + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + }() + + finishedCh := make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 10000000, true) + c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") + close(finishedCh) + }() + + after := time.After(10 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } + + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + // enable the failpoint to simulate backEnd write error (usually would happen when writing to a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite") + }() + + finishedCh = make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 10000000, true) + c.Assert(err, check.ErrorMatches, ".*injected write error.*") + close(finishedCh) + }() + + after = time.After(10 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } +} diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index ac87373bc51..d6847aaa6be 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -223,6 +223,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: } err := sorter.poolHandle.AddEvent(subctx, event) + if cerror.ErrWorkerPoolHandleCancelled.Equal(errors.Cause(err)) { + // no need to report ErrWorkerPoolHandleCancelled, + // as it may confuse the user + return nil + } if err != nil { return errors.Trace(err) } @@ -240,6 +245,11 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: err := heapSorters[targetID].poolHandle.AddEvent(subctx, event) if err != nil { + if cerror.ErrWorkerPoolHandleCancelled.Equal(errors.Cause(err)) { + // no need to report ErrWorkerPoolHandleCancelled, + // as it may confuse the user + return nil + } return errors.Trace(err) } metricSorterConsumeCount.WithLabelValues("kv").Inc() diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index acdb6da8661..d527dd15754 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -202,7 +202,7 @@ var ( ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter")) ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled")) - + ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError")) // processor errors ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely")) From e9170ca6857b7b457b2460ddee5a8ae863922e60 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 8 Apr 2021 17:37:23 +0800 Subject: [PATCH 2/9] fix unit test --- cdc/puller/sorter/heap_sorter.go | 35 ++++++++++++++++----- cdc/puller/sorter/merger.go | 8 ++--- cdc/puller/sorter/sorter_test.go | 49 +++++++++++++++++++++++++++++ cdc/puller/sorter/unified_sorter.go | 1 + 4 files changed, 79 insertions(+), 14 deletions(-) diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go index 794832db481..93d3ec5e5fa 100644 --- a/cdc/puller/sorter/heap_sorter.go +++ b/cdc/puller/sorter/heap_sorter.go @@ -38,16 +38,35 @@ const ( type flushTask struct { taskID int heapSorterID int - backend backEnd reader backEndReader tsLowerBound uint64 maxResolvedTs uint64 finished chan error dealloc func() error - isDeallocated int32 dataSize int64 lastTs uint64 // for debugging TODO remove canceller *asyncCanceller + + isEmpty bool // ready only field + + deallocLock sync.RWMutex + isDeallocated bool // do not access directly + backend backEnd // do not access directly +} + +func (t *flushTask) markDeallocated() { + t.deallocLock.Lock() + defer t.deallocLock.Unlock() + + t.backend = nil + t.isDeallocated = true +} + +func (t *flushTask) GetBackEnd() backEnd { + t.deallocLock.RLock() + defer t.deallocLock.RUnlock() + + return t.backend } type heapSorter struct { @@ -105,7 +124,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { var finishCh chan error if !isEmptyFlush { failpoint.Inject("InjectErrorBackEndAlloc", func() { - failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error"))).FastGenWithCause() + failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause()) }) var err error @@ -125,17 +144,16 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { maxResolvedTs: maxResolvedTs, finished: finishCh, canceller: h.canceller, + isEmpty: isEmptyFlush, } h.taskCounter++ var oldHeap sortHeap if !isEmptyFlush { task.dealloc = func() error { - if atomic.SwapInt32(&task.isDeallocated, 1) == 1 { - return nil - } - if task.backend != nil { - task.backend = nil + backEnd := task.GetBackEnd() + if backEnd != nil { + defer task.markDeallocated() return pool.dealloc(backEnd) } return nil @@ -144,6 +162,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { h.heap = make(sortHeap, 0, 65536) } else { task.dealloc = func() error { + task.markDeallocated() return nil } } diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go index 970ece82677..cc78ad5d51e 100644 --- a/cdc/puller/sorter/merger.go +++ b/cdc/puller/sorter/merger.go @@ -160,7 +160,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch } if task.reader == nil { - task.reader, err = task.backend.reader() + task.reader, err = task.GetBackEnd().reader() if err != nil { return errors.Trace(err) } @@ -426,11 +426,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch return nil } - if atomic.LoadInt32(&task.isDeallocated) == 1 { - log.Panic("task has been deallocated, report a bug") - } - - if task.backend != nil { + if !task.isEmpty { pendingSet[task] = nil } // otherwise it is an empty flush diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 938c69a7bdb..73f5fe627db 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -377,3 +377,52 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { case <-finishedCh: } } + +func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { + defer testleak.AfterTest(c)() + defer UnifiedSorterCleanUp() + + conf := config.GetDefaultServerConfig() + conf.Sorter = &config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + NumWorkerPoolGoroutine: 4, + } + config.StoreGlobalServerConfig(conf) + + err := os.MkdirAll("/tmp/sorter", 0o755) + c.Assert(err, check.IsNil) + sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file) + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay", "sleep(2000)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectHeapSorterExitDelay") + }() + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc", "return(true)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndAlloc") + }() + + finishedCh := make(chan struct{}) + go func() { + err := testSorter(ctx, c, sorter, 10000000, true) + c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") + close(finishedCh) + }() + + after := time.After(10 * time.Second) + select { + case <-after: + c.Fatal("TestSorterIOError timed out") + case <-finishedCh: + } +} diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index d6847aaa6be..33ee051c347 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -173,6 +173,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { } // must wait for all writers to exit to close the channel. close(heapSorterCollectCh) + failpoint.Inject("InjectHeapSorterExitDelay", func() {}) }() select { From e7e99c78bc5b05892d8f4cada0d11e4a08f8d8a0 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 8 Apr 2021 19:31:15 +0800 Subject: [PATCH 3/9] update errors.toml --- errors.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/errors.toml b/errors.toml index 44ca0ae987a..337ad0d2a52 100755 --- a/errors.toml +++ b/errors.toml @@ -716,6 +716,11 @@ error = ''' unified sorter backend is terminating ''' +["CDC:ErrUnifiedSorterIOError"] +error = ''' +unified sorter IO error +''' + ["CDC:ErrUnknownKVEventType"] error = ''' unknown kv event type: %v, entry: %v From 2f78f736f6826b703f81ac373990b5c9ef8c68b8 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 12 Apr 2021 11:30:12 +0800 Subject: [PATCH 4/9] address comments --- cdc/puller/sorter/heap_sorter.go | 2 +- cdc/puller/sorter/unified_sorter.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go index 93d3ec5e5fa..a82c1cde67d 100644 --- a/cdc/puller/sorter/heap_sorter.go +++ b/cdc/puller/sorter/heap_sorter.go @@ -47,7 +47,7 @@ type flushTask struct { lastTs uint64 // for debugging TODO remove canceller *asyncCanceller - isEmpty bool // ready only field + isEmpty bool // read only field deallocLock sync.RWMutex isDeallocated bool // do not access directly diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index 33ee051c347..f2e905b343a 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -224,7 +224,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: } err := sorter.poolHandle.AddEvent(subctx, event) - if cerror.ErrWorkerPoolHandleCancelled.Equal(errors.Cause(err)) { + if cerror.ErrWorkerPoolHandleCancelled.Equal(err) { // no need to report ErrWorkerPoolHandleCancelled, // as it may confuse the user return nil @@ -246,7 +246,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error { default: err := heapSorters[targetID].poolHandle.AddEvent(subctx, event) if err != nil { - if cerror.ErrWorkerPoolHandleCancelled.Equal(errors.Cause(err)) { + if cerror.ErrWorkerPoolHandleCancelled.Equal(err) { // no need to report ErrWorkerPoolHandleCancelled, // as it may confuse the user return nil From da95f39e507078653e299d70ab41e942d156b241 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 15 Apr 2021 11:02:26 +0800 Subject: [PATCH 5/9] adjust unit test timeout --- cdc/puller/sorter/sorter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 73f5fe627db..3e6812618a8 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -348,7 +348,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { close(finishedCh) }() - after := time.After(10 * time.Second) + after := time.After(20 * time.Second) select { case <-after: c.Fatal("TestSorterIOError timed out") From 5b406ec6298b618fa5faf32f071e2625070035ff Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 15 Apr 2021 13:05:49 +0800 Subject: [PATCH 6/9] adjust unit test timeout --- cdc/puller/sorter/sorter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 3e6812618a8..2554563460b 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -370,7 +370,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { close(finishedCh) }() - after = time.After(10 * time.Second) + after = time.After(20 * time.Second) select { case <-after: c.Fatal("TestSorterIOError timed out") From 1dd20c4883698211e1093fba35c235af420229a0 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 15 Apr 2021 19:01:16 +0800 Subject: [PATCH 7/9] adjust unit test timeout --- cdc/puller/sorter/sorter_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 2554563460b..a43479ff55f 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -348,7 +348,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { close(finishedCh) }() - after := time.After(20 * time.Second) + after := time.After(60 * time.Second) select { case <-after: c.Fatal("TestSorterIOError timed out") @@ -370,7 +370,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { close(finishedCh) }() - after = time.After(20 * time.Second) + after = time.After(60 * time.Second) select { case <-after: c.Fatal("TestSorterIOError timed out") @@ -419,7 +419,7 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { close(finishedCh) }() - after := time.After(10 * time.Second) + after := time.After(60 * time.Second) select { case <-after: c.Fatal("TestSorterIOError timed out") From e6c8a5a7a7f460bdcd93296cceb3142cf9ca5b19 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 15 Apr 2021 19:16:27 +0800 Subject: [PATCH 8/9] add debug log --- cdc/puller/sorter/backend_pool.go | 5 +++++ cdc/puller/sorter/sorter_test.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index 14f9f0f80ad..04583853f3e 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -232,10 +232,12 @@ func (p *backEndPool) terminate() { defer close(p.cancelCh) // the background goroutine can be considered terminated here + log.Debug("Unified Sorter terminating...") p.cancelRWLock.Lock() defer p.cancelRWLock.Unlock() p.isTerminating = true + log.Debug("Unified Sorter cleaning up before exiting") // any new allocs and deallocs will not succeed from this point // accessing p.cache without atomics is safe from now @@ -258,11 +260,14 @@ func (p *backEndPool) terminate() { log.Warn("Unified Sorter clean-up failed", zap.Error(err)) } for _, file := range files { + log.Debug("Unified Sorter backEnd removing file", zap.String("file", file)) err = os.RemoveAll(file) if err != nil { log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err)) } } + + log.Debug("Unified Sorter backEnd terminated") } func (p *backEndPool) sorterMemoryUsage() int64 { diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index a43479ff55f..468440f882e 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "go.uber.org/zap/zapcore" + "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -382,6 +384,9 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { defer testleak.AfterTest(c)() defer UnifiedSorterCleanUp() + log.SetLevel(zapcore.DebugLevel) + defer log.SetLevel(zapcore.InfoLevel) + conf := config.GetDefaultServerConfig() conf.Sorter = &config.SorterConfig{ NumConcurrentWorker: 8, From 156e52e721ab23c31ea8d6d9df9f88df320f43b5 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 16 Apr 2021 12:40:19 +0800 Subject: [PATCH 9/9] adjust unit test event count --- cdc/puller/sorter/sorter_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 468440f882e..73adff055af 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -345,7 +345,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { finishedCh := make(chan struct{}) go func() { - err := testSorter(ctx, c, sorter, 10000000, true) + err := testSorter(ctx, c, sorter, 100000, true) c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") close(finishedCh) }() @@ -367,7 +367,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) { finishedCh = make(chan struct{}) go func() { - err := testSorter(ctx, c, sorter, 10000000, true) + err := testSorter(ctx, c, sorter, 100000, true) c.Assert(err, check.ErrorMatches, ".*injected write error.*") close(finishedCh) }() @@ -401,7 +401,7 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { c.Assert(err, check.IsNil) sorter := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() // enable the failpoint to simulate backEnd allocation error (usually would happen when creating a file) @@ -419,7 +419,7 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) { finishedCh := make(chan struct{}) go func() { - err := testSorter(ctx, c, sorter, 10000000, true) + err := testSorter(ctx, c, sorter, 100000, true) c.Assert(err, check.ErrorMatches, ".*injected alloc error.*") close(finishedCh) }()