From 6c646c962fe5fb29c38e79e6515e280ac53e703d Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 28 Dec 2021 12:55:50 +0800 Subject: [PATCH] This is an automated cherry-pick of #4084 Signed-off-by: ti-chi-bot --- cdc/processor/pipeline/sink.go | 11 ++- cdc/processor/pipeline/sink_test.go | 57 ++++++++++++++ cdc/sink/table_sink.go | 111 ++++++++++++++++++++++++++++ pkg/pipeline/test.go | 2 +- 4 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 cdc/sink/table_sink.go diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index b436df2a050..85a961e2496 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -140,12 +140,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if err != nil { return errors.Trace(err) } + + // we must call flowController.Release immediately after we call + // FlushRowChangedEvents to prevent deadlock cause by checkpointTs + // fall back + n.flowController.Release(checkpointTs) + + // the checkpointTs may fall back in some situation such as: + // 1. This table is newly added to the processor + // 2. There is one table in the processor that has a smaller + // checkpointTs than this one if checkpointTs <= n.checkpointTs { return nil } atomic.StoreUint64(&n.checkpointTs, checkpointTs) - n.flowController.Release(checkpointTs) return nil } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 0dccd87e5e0..c544d41a0d6 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -561,3 +561,60 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2) c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0) } + +type flushFlowController struct { + mockFlowController + releaseCounter int +} + +func (c *flushFlowController) Release(resolvedTs uint64) { + c.releaseCounter++ +} + +type flushSink struct { + mockSink +} + +// use to simulate the situation that resolvedTs return from sink manager +// fall back +var fallBackResolvedTs = uint64(10) + +func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { + if resolvedTs == fallBackResolvedTs { + return 0, nil + } + return resolvedTs, nil +} + +// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always +// call flowController.Release to release the memory quota of the table to avoid +// deadlock if there is no error occur +func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + cfg := config.GetDefaultReplicaConfig() + cfg.EnableOldValue = false + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-flushSink", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: cfg, + }, + }) + flowController := &flushFlowController{} + sink := &flushSink{} + // sNode is a sinkNode + sNode := newSinkNode(1, sink, 0, 10, flowController) + c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) + sNode.barrierTs = 10 + + err := sNode.flushSink(context.Background(), uint64(8)) + c.Assert(err, check.IsNil) + c.Assert(sNode.checkpointTs, check.Equals, uint64(8)) + c.Assert(flowController.releaseCounter, check.Equals, 1) + // resolvedTs will fall back in this call + err = sNode.flushSink(context.Background(), uint64(10)) + c.Assert(err, check.IsNil) + c.Assert(sNode.checkpointTs, check.Equals, uint64(8)) + c.Assert(flowController.releaseCounter, check.Equals, 2) +} diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go new file mode 100644 index 00000000000..15adf52f5bc --- /dev/null +++ b/cdc/sink/table_sink.go @@ -0,0 +1,111 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "go.uber.org/zap" +) + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + redoManager redo.LogManager +} + +var _ Sink = (*tableSink)(nil) + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) + } + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + if tableID != t.tableID { + log.Panic("inconsistent table sink", + zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID)) + } + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + return t.flushResolvedTs(ctx, resolvedTs) + } + resolvedRows := t.buffer[:i] + t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) + + err := t.manager.bufSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return t.manager.getCheckpointTs(tableID), errors.Trace(err) + } + return t.flushResolvedTs(ctx, resolvedTs) +} + +func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) { + redoTs, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return t.manager.getCheckpointTs(t.tableID), err + } + if redoTs < resolvedTs { + resolvedTs = redoTs + } + return t.manager.flushBackendSink(ctx, t.tableID, resolvedTs) +} + +// flushRedoLogs flush redo logs and returns redo log resolved ts which means +// all events before the ts have been persisted to redo log storage. +func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { + if t.redoManager.Enabled() { + err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) + if err != nil { + return 0, err + } + return t.redoManager.GetMinResolvedTs(), nil + } + return resolvedTs, nil +} + +func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +// Close once the method is called, no more events can be written to this table sink +func (t *tableSink) Close(ctx context.Context) error { + return t.manager.destroyTableSink(ctx, t.tableID) +} + +// Barrier is not used in table sink +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { + return nil +} diff --git a/pkg/pipeline/test.go b/pkg/pipeline/test.go index bfc25ac69b8..e0d3cd7f68c 100644 --- a/pkg/pipeline/test.go +++ b/pkg/pipeline/test.go @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp return Message{}, nil } -// MockNodeContext4Test creates a node context with a message and a output channel for tests. +// MockNodeContext4Test creates a node context with a message and an output channel for tests. func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext { return newNodeContext(ctx, msg, outputCh) }