From 78a4c5015b65d705e7fbca060626766585dae8c7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 3 Aug 2021 18:51:07 +0800 Subject: [PATCH 1/4] logSink: revert defaultBufferChanSize, add a trigger to flush logSink.units when chennal is full (#2431) (#2444) --- cdc/sink/cdclog/file.go | 3 ++- cdc/sink/cdclog/s3.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index 070ea4257ef..dc49dc6052b 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -87,7 +87,8 @@ func (ts *tableStream) isEmpty() bool { } func (ts *tableStream) shouldFlush() bool { - return ts.sendSize.Load() > maxRowFileSize + // if sendSize > 5 MB or data chennal is full, flush it + return ts.sendSize.Load() > maxPartFlushSize || ts.sendEvents.Load() == defaultBufferChanSize } func (ts *tableStream) flush(ctx context.Context, sink *logSink) error { diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index 24750c705f4..302fb8e8025 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -37,7 +37,7 @@ const ( maxCompletePartSize = 100 << 20 // rotate row changed event file if one complete file larger than 100Mb maxDDLFlushSize = 10 << 20 // rotate ddl event file if one complete file larger than 10Mb - defaultBufferChanSize = 1280000 + defaultBufferChanSize = 20480 defaultFlushRowChangedEventDuration = 5 * time.Second // TODO make it as a config ) @@ -78,7 +78,8 @@ func (tb *tableBuffer) isEmpty() bool { } func (tb *tableBuffer) shouldFlush() bool { - return tb.sendSize.Load() > maxPartFlushSize + // if sendSize > 5 MB or data chennal is full, flush it + return tb.sendSize.Load() > maxPartFlushSize || tb.sendEvents.Load() == defaultBufferChanSize } func (tb *tableBuffer) flush(ctx context.Context, sink *logSink) error { From f14deb2d9b05652d000cac1bfbc0e328c6794159 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 4 Aug 2021 11:47:07 +0800 Subject: [PATCH 2/4] puller: remove memory buffer and buffer limitter (#2328) (#2393) --- cdc/processor.go | 6 - cdc/processor/pipeline/puller.go | 4 - cdc/processor/pipeline/table.go | 4 +- cdc/processor/processor.go | 6 - cdc/puller/buffer.go | 188 ------------------------------- cdc/puller/buffer_test.go | 135 ---------------------- 6 files changed, 1 insertion(+), 342 deletions(-) delete mode 100644 cdc/puller/buffer.go delete mode 100644 cdc/puller/buffer_test.go diff --git a/cdc/processor.go b/cdc/processor.go index 35616d45fec..58bdb86e39b 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -53,9 +53,6 @@ import ( ) const ( - // defaultMemBufferCapacity is the default memory buffer per change feed. - defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G - defaultSyncResolvedBatch = 1024 schemaStorageGCLag = time.Minute * 20 @@ -71,7 +68,6 @@ type oldProcessor struct { captureInfo model.CaptureInfo changefeedID string changefeed model.ChangeFeedInfo - limitter *puller.BlurResourceLimitter stopped int32 pdCli pd.Client @@ -163,7 +159,6 @@ func newProcessor( ) (*oldProcessor, error) { etcdCli := session.Client() cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli) - limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity) log.Info("start processor with startts", zap.Uint64("startts", checkpointTs), util.ZapFieldChangefeed(ctx)) @@ -193,7 +188,6 @@ func newProcessor( p := &oldProcessor{ id: uuid.New().String(), - limitter: limitter, captureInfo: captureInfo, changefeedID: changefeedID, changefeed: changefeed, diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 0eb44529013..ab316d8e62b 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -29,8 +29,6 @@ import ( ) type pullerNode struct { - limitter *puller.BlurResourceLimitter - tableName string // quoted schema and table, used in metircs only tableID model.TableID @@ -40,10 +38,8 @@ type pullerNode struct { } func newPullerNode( - limitter *puller.BlurResourceLimitter, tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node { return &pullerNode{ - limitter: limitter, tableID: tableID, replicaInfo: replicaInfo, tableName: tableName, diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index ee4efff35a5..4713b2609f3 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/puller" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/cdc/sink/common" serverConfig "github.com/pingcap/ticdc/pkg/config" @@ -162,7 +161,6 @@ const defaultRunnersSize = 5 // NewTablePipeline creates a table pipeline // TODO(leoppro): implement a mock kvclient to test the table pipeline func NewTablePipeline(ctx cdcContext.Context, - limitter *puller.BlurResourceLimitter, mounter entry.Mounter, tableID model.TableID, tableName string, @@ -191,7 +189,7 @@ func NewTablePipeline(ctx cdcContext.Context, runnerSize++ } p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) - p.AppendNode(ctx, "puller", newPullerNode(limitter, tableID, replicaInfo, tableName)) + p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", newSorterNode(tableName, tableID, flowController)) p.AppendNode(ctx, "mounter", newMounterNode(mounter)) if cyclicEnabled { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index faf7fe1cc1b..f4f141a0f53 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -46,9 +46,6 @@ import ( ) const ( - // defaultMemBufferCapacity is the default memory buffer per change feed. - defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G - schemaStorageGCLag = time.Minute * 20 ) @@ -59,7 +56,6 @@ type processor struct { tables map[model.TableID]tablepipeline.TablePipeline - limitter *puller.BlurResourceLimitter schemaStorage entry.SchemaStorage filter *filter.Filter mounter entry.Mounter @@ -86,7 +82,6 @@ func newProcessor(ctx cdcContext.Context) *processor { changefeedID := ctx.ChangefeedVars().ID advertiseAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr p := &processor{ - limitter: puller.NewBlurResourceLimmter(defaultMemBufferCapacity), tables: make(map[model.TableID]tablepipeline.TablePipeline), errCh: make(chan error, 1), changefeedID: changefeedID, @@ -720,7 +715,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs) table := tablepipeline.NewTablePipeline( ctx, - p.limitter, p.mounter, tableID, tableNameStr, diff --git a/cdc/puller/buffer.go b/cdc/puller/buffer.go deleted file mode 100644 index db9a64fb75b..00000000000 --- a/cdc/puller/buffer.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "sync" - "sync/atomic" - "unsafe" - - "github.com/edwingeng/deque" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - cerror "github.com/pingcap/ticdc/pkg/errors" -) - -const ( - defaultBufferSize = 128000 -) - -// EventBuffer in a interface for communicating kv entries. -type EventBuffer interface { - // AddEntry adds an entry to the buffer, return ErrReachLimit if reach budget limit. - AddEntry(ctx context.Context, entry model.RegionFeedEvent) error - Get(ctx context.Context) (model.RegionFeedEvent, error) -} - -// ChanBuffer buffers kv entries -type ChanBuffer chan model.RegionFeedEvent - -var _ EventBuffer = makeChanBuffer() - -func makeChanBuffer() ChanBuffer { - return make(ChanBuffer, defaultBufferSize) -} - -// AddEntry adds an entry to the buffer -func (b ChanBuffer) AddEntry(ctx context.Context, entry model.RegionFeedEvent) error { - select { - case <-ctx.Done(): - return ctx.Err() - case b <- entry: - return nil - } -} - -// Get waits for an entry from the input channel but will stop with respect to the context -func (b ChanBuffer) Get(ctx context.Context) (model.RegionFeedEvent, error) { - select { - case <-ctx.Done(): - return model.RegionFeedEvent{}, ctx.Err() - case e := <-b: - return e, nil - } -} - -var _ EventBuffer = &memBuffer{} - -type memBuffer struct { - limitter *BlurResourceLimitter - - mu struct { - sync.Mutex - entries deque.Deque - } - signalCh chan struct{} -} - -// Passing nil will make a unlimited buffer. -func makeMemBuffer(limitter *BlurResourceLimitter) *memBuffer { - return &memBuffer{ - limitter: limitter, - mu: struct { - sync.Mutex - entries deque.Deque - }{ - entries: deque.NewDeque(), - }, - - signalCh: make(chan struct{}, 1), - } -} - -// AddEntry implements EventBuffer interface. -func (b *memBuffer) AddEntry(ctx context.Context, entry model.RegionFeedEvent) error { - b.mu.Lock() - if b.limitter != nil && b.limitter.OverBucget() { - b.mu.Unlock() - return cerror.ErrBufferReachLimit.GenWithStackByArgs() - } - - b.mu.entries.PushBack(entry) - if b.limitter != nil { - b.limitter.Add(int64(entrySize(entry))) - } - b.mu.Unlock() - - select { - case b.signalCh <- struct{}{}: - default: - } - - return nil -} - -// Get implements EventBuffer interface. -func (b *memBuffer) Get(ctx context.Context) (model.RegionFeedEvent, error) { - for { - b.mu.Lock() - if !b.mu.entries.Empty() { - e := b.mu.entries.PopFront().(model.RegionFeedEvent) - if b.limitter != nil { - b.limitter.Add(int64(-entrySize(e))) - } - b.mu.Unlock() - return e, nil - } - - b.mu.Unlock() - - select { - case <-ctx.Done(): - return model.RegionFeedEvent{}, ctx.Err() - case <-b.signalCh: - } - } -} - -// Size returns the memory size of memBuffer -func (b *memBuffer) Size() int64 { - b.mu.Lock() - defer b.mu.Unlock() - if b.limitter == nil { - return 0 - } - return atomic.LoadInt64(&b.limitter.used) -} - -var ( - sizeOfVal = unsafe.Sizeof(model.RawKVEntry{}) - sizeOfResolve = unsafe.Sizeof(model.ResolvedSpan{}) -) - -func entrySize(e model.RegionFeedEvent) int { - if e.Val != nil { - return int(sizeOfVal) + len(e.Val.Key) + len(e.Val.Value) - } else if e.Resolved != nil { - return int(sizeOfResolve) - } else { - log.Panic("unknow event type") - } - - return 0 -} - -// BlurResourceLimitter limit resource use. -type BlurResourceLimitter struct { - budget int64 - used int64 -} - -// NewBlurResourceLimmter create a BlurResourceLimitter. -func NewBlurResourceLimmter(budget int64) *BlurResourceLimitter { - return &BlurResourceLimitter{ - budget: budget, - } -} - -// Add used resource into limmter -func (rl *BlurResourceLimitter) Add(n int64) { - atomic.AddInt64(&rl.used, n) -} - -// OverBucget retun true if over budget. -func (rl *BlurResourceLimitter) OverBucget() bool { - return atomic.LoadInt64(&rl.used) >= atomic.LoadInt64(&rl.budget) -} diff --git a/cdc/puller/buffer_test.go b/cdc/puller/buffer_test.go deleted file mode 100644 index b1e71380893..00000000000 --- a/cdc/puller/buffer_test.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/ticdc/cdc/model" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/regionspan" - "github.com/pingcap/ticdc/pkg/util/testleak" -) - -type bufferSuite struct{} - -var _ = check.Suite(&bufferSuite{}) - -func (bs *bufferSuite) TestCanAddAndReadEntriesInOrder(c *check.C) { - defer testleak.AfterTest(c)() - b := makeChanBuffer() - ctx := context.Background() - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - first, err := b.Get(ctx) - c.Assert(err, check.IsNil) - c.Assert(first.Val.CRTs, check.Equals, uint64(110)) - second, err := b.Get(ctx) - c.Assert(err, check.IsNil) - c.Assert(second.Resolved.ResolvedTs, check.Equals, uint64(111)) - }() - - err := b.AddEntry(ctx, model.RegionFeedEvent{ - Val: &model.RawKVEntry{CRTs: 110}, - }) - c.Assert(err, check.IsNil) - err = b.AddEntry(ctx, model.RegionFeedEvent{ - Resolved: &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{}, - ResolvedTs: 111, - }, - }) - c.Assert(err, check.IsNil) - - wg.Wait() -} - -func (bs *bufferSuite) TestWaitsCanBeCanceled(c *check.C) { - defer testleak.AfterTest(c)() - b := makeChanBuffer() - ctx := context.Background() - - timeout, cancel := context.WithTimeout(ctx, time.Millisecond) - defer cancel() - stopped := make(chan struct{}) - // sleep here to let context timeout first - time.Sleep(time.Millisecond) - go func() { - for { - err := b.AddEntry(timeout, model.RegionFeedEvent{ - Resolved: &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{}, - ResolvedTs: 111, - }, - }) - if err == context.DeadlineExceeded { - close(stopped) - return - } - c.Assert(err, check.Equals, nil) - } - }() - select { - case <-stopped: - case <-time.After(10 * time.Millisecond): - c.Fatal("AddEntry doesn't stop in time.") - } -} - -type memBufferSuite struct{} - -var _ = check.Suite(&memBufferSuite{}) - -func (bs *memBufferSuite) TestMemBuffer(c *check.C) { - defer testleak.AfterTest(c)() - limitter := NewBlurResourceLimmter(1024 * 1024) - bf := makeMemBuffer(limitter) - - var err error - var entries []model.RegionFeedEvent - for { - entry := model.RegionFeedEvent{ - Val: &model.RawKVEntry{ - Value: make([]byte, 1024), - }, - } - err = bf.AddEntry(context.Background(), entry) - if err != nil { - break - } - - entries = append(entries, entry) - } - - c.Assert(cerror.ErrBufferReachLimit.Equal(err), check.IsTrue) - num := float64(bf.mu.entries.Len()) - nearNum := 1024.0 - c.Assert(num >= nearNum*0.9, check.IsTrue) - c.Assert(num <= nearNum*1.1, check.IsTrue) - - // Check can get back the entries. - var getEntries []model.RegionFeedEvent - for len(getEntries) < len(entries) { - entry, err := bf.Get(context.Background()) - c.Assert(err, check.IsNil) - getEntries = append(getEntries, entry) - } - c.Assert(getEntries, check.DeepEquals, entries) -} From a9197fbb5b7c6f7b96ebc6a8ae00d748c0cf30e6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 4 Aug 2021 15:53:08 +0800 Subject: [PATCH 3/4] processor: cleanup some unnecessary error logs (#2443) (#2450) --- cdc/processor/processor.go | 4 +++- pkg/pipeline/pipeline.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index f4f141a0f53..2859d8f7340 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -500,7 +500,9 @@ func (p *processor) sendError(err error) { select { case p.errCh <- err: default: - log.Error("processor receives redundant error", zap.Error(err)) + if errors.Cause(err) != context.Canceled { + log.Error("processor receives redundant error", zap.Error(err)) + } } } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 12ab3fa9517..ead3f445b22 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -96,7 +96,9 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne err := runner.run(ctx) if err != nil { ctx.Throw(err) - log.Error("found error when running the node", zap.String("name", runner.getName()), zap.Error(err)) + if cerror.ErrTableProcessorStoppedSafely.NotEqual(err) { + log.Error("found error when running the node", zap.String("name", runner.getName()), zap.Error(err)) + } } } From d2277db69398b4722e7e7ebc38a4c0feb0166401 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 4 Aug 2021 23:17:08 +0800 Subject: [PATCH 4/4] owner: ignore duplicated DDL job in ddl puller (#2423) (#2457) --- cdc/owner/ddl_puller.go | 6 ++++++ cdc/owner/ddl_puller_test.go | 5 ++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 6018b5957eb..32e96556000 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -52,6 +52,7 @@ type ddlPullerImpl struct { mu sync.Mutex resolvedTS uint64 pendingDDLJobs []*timodel.Job + lastDDLJobID int64 cancel context.CancelFunc } @@ -117,9 +118,14 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { log.Info("discard the ddl job", zap.Int64("jobID", job.ID), zap.String("query", job.Query)) return nil } + if job.ID == h.lastDDLJobID { + log.Warn("ignore duplicated DDL job", zap.Any("job", job)) + return nil + } h.mu.Lock() defer h.mu.Unlock() h.pendingDDLJobs = append(h.pendingDDLJobs, job) + h.lastDDLJobID = job.ID return nil } diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index 371c0f5232a..acb9e301642 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -187,9 +187,8 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) { resolvedTs, ddl = p.PopFrontDDL() c.Assert(resolvedTs, check.Equals, uint64(25)) c.Assert(ddl.ID, check.Equals, int64(3)) - resolvedTs, ddl = p.PopFrontDDL() - c.Assert(resolvedTs, check.Equals, uint64(25)) - c.Assert(ddl.ID, check.Equals, int64(3)) + _, ddl = p.PopFrontDDL() + c.Assert(ddl, check.IsNil) waitResolvedTsGrowing(c, p, 30) resolvedTs, ddl = p.PopFrontDDL()