Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4084
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Dec 28, 2021
1 parent 5a6fc5c commit 6c646c9
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 2 deletions.
11 changes: 10 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down
57 changes: 57 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,60 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
}

type flushFlowController struct {
mockFlowController
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
c.releaseCounter++
}

type flushSink struct {
mockSink
}

// use to simulate the situation that resolvedTs return from sink manager
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
// call flowController.Release to release the memory quota of the table to avoid
// deadlock if there is no error occur
func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-flushSink",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
},
})
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
sNode.barrierTs = 10

err := sNode.flushSink(context.Background(), uint64(8))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 1)
// resolvedTs will fall back in this call
err = sNode.flushSink(context.Background(), uint64(10))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 2)
}
111 changes: 111 additions & 0 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"go.uber.org/zap"
)

type tableSink struct {
tableID model.TableID
manager *Manager
buffer []*model.RowChangedEvent
redoManager redo.LogManager
}

var _ Sink = (*tableSink)(nil)

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
if t.redoManager.Enabled() {
return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...)
}
return nil
}

func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
// the table sink doesn't receive the DDL event
return nil
}

// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs
// is required to be no more than global resolvedTs, table barrierTs and table
// redo log watermarkTs.
func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if tableID != t.tableID {
log.Panic("inconsistent table sink",
zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID))
}
i := sort.Search(len(t.buffer), func(i int) bool {
return t.buffer[i].CommitTs > resolvedTs
})
if i == 0 {
return t.flushResolvedTs(ctx, resolvedTs)
}
resolvedRows := t.buffer[:i]
t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...)

err := t.manager.bufSink.EmitRowChangedEvents(ctx, resolvedRows...)
if err != nil {
return t.manager.getCheckpointTs(tableID), errors.Trace(err)
}
return t.flushResolvedTs(ctx, resolvedTs)
}

func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) {
redoTs, err := t.flushRedoLogs(ctx, resolvedTs)
if err != nil {
return t.manager.getCheckpointTs(t.tableID), err
}
if redoTs < resolvedTs {
resolvedTs = redoTs
}
return t.manager.flushBackendSink(ctx, t.tableID, resolvedTs)
}

// flushRedoLogs flush redo logs and returns redo log resolved ts which means
// all events before the ts have been persisted to redo log storage.
func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) {
if t.redoManager.Enabled() {
err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs)
if err != nil {
return 0, err
}
return t.redoManager.GetMinResolvedTs(), nil
}
return resolvedTs, nil
}

func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
// the table sink doesn't receive the checkpoint event
return nil
}

// Close once the method is called, no more events can be written to this table sink
func (t *tableSink) Close(ctx context.Context) error {
return t.manager.destroyTableSink(ctx, t.tableID)
}

// Barrier is not used in table sink
func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/pipeline/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp
return Message{}, nil
}

// MockNodeContext4Test creates a node context with a message and a output channel for tests.
// MockNodeContext4Test creates a node context with a message and an output channel for tests.
func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return newNodeContext(ctx, msg, outputCh)
}

0 comments on commit 6c646c9

Please sign in to comment.