Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller,processor: replace table ID with Span #7832

Merged
merged 8 commits into from
Dec 9, 2022
20 changes: 13 additions & 7 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
)
Expand All @@ -43,7 +44,9 @@ type managerTester struct {
// NewManager4Test creates a new processor manager for test
func NewManager4Test(
t *testing.T,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepb.TablePipeline, error),
createTablePipeline func(
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error),
liveness *model.Liveness,
) *managerImpl {
captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"}
Expand All @@ -61,10 +64,12 @@ func NewManager4Test(
}

func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
s.manager = NewManager4Test(t, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepb.TablePipeline, error) {
s.manager = NewManager4Test(t, func(
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
span: span,
name: fmt.Sprintf("`test`.`table%d`", span),
state: tablepb.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
Expand Down Expand Up @@ -296,9 +301,10 @@ func TestQueryTableCount(t *testing.T) {
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness).(*managerImpl)
ctx := context.TODO()
// Add some tables to processor.
m.processors[model.ChangeFeedID{ID: "test"}] = &processor{
tables: map[model.TableID]tablepb.TablePipeline{1: nil, 2: nil},
}
tables := spanz.NewMap[tablepb.TablePipeline]()
tables.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), nil)
tables.ReplaceOrInsert(spanz.TableIDToComparableSpan(2), nil)
m.processors[model.ChangeFeedID{ID: "test"}] = &processor{tableSpans: tables}

done := make(chan error, 1)
tableCh := make(chan int, 1)
Expand Down
19 changes: 6 additions & 13 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pipeline"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"golang.org/x/sync/errgroup"
Expand All @@ -32,34 +32,27 @@ type pullerNode struct {
tableName string // quoted schema and table, used in metircs only

plr puller.Puller
tableID model.TableID
span tablepb.Span
startTs model.Ts
changefeed model.ChangeFeedID
cancel context.CancelFunc
wg *errgroup.Group
}

func newPullerNode(
tableID model.TableID,
tableID tablepb.Span,
startTs model.Ts,
tableName string,
changefeed model.ChangeFeedID,
) *pullerNode {
return &pullerNode{
tableID: tableID,
span: tableID,
startTs: startTs,
tableName: tableName,
changefeed: changefeed,
}
}

func (n *pullerNode) tableSpan() []regionspan.Span {
// start table puller
spans := make([]regionspan.Span, 0, 4)
spans = append(spans, regionspan.GetTableSpan(n.tableID))
return spans
}

func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up *upstream.Upstream, wg *errgroup.Group,
sorter *sorterNode, filterLoop bool,
Expand All @@ -79,10 +72,10 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up.KVStorage,
up.PDClock,
n.startTs,
n.tableSpan(),
[]tablepb.Span{n.span},
kvCfg,
n.changefeed,
n.tableID,
n.span.TableID,
n.tableName,
filterLoop,
)
Expand Down
38 changes: 19 additions & 19 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type tableActor struct {
sinkStopped uberatomic.Bool

// TODO: try to reduce these config fields below in the future
tableID int64
span tablepb.Span
targetTs model.Ts
memoryQuota uint64
replicaInfo *model.TableReplicaInfo
Expand All @@ -112,7 +112,7 @@ func NewTableActor(
cdcCtx cdcContext.Context,
up *upstream.Upstream,
mg entry.MounterGroup,
tableID model.TableID,
span tablepb.Span,
tableName string,
replicaInfo *model.TableReplicaInfo,
sinkV1 sinkv1.Sink,
Expand All @@ -139,7 +139,7 @@ func NewTableActor(
cancel: cancel,

state: tablepb.TableStatePreparing,
tableID: tableID,
span: span,
tableName: tableName,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upstream: up,
Expand Down Expand Up @@ -175,7 +175,7 @@ func NewTableActor(
log.Info("table actor started",
zap.String("namespace", table.changefeedID.Namespace),
zap.String("changefeed", table.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Stringer("span", &span),
zap.String("tableName", tableName),
zap.Uint64("checkpointTs", replicaInfo.StartTs),
zap.Uint64("quota", table.memoryQuota),
Expand Down Expand Up @@ -212,7 +212,7 @@ func (t *tableActor) Poll(ctx context.Context, msgs []message.Message[pmessage.M
if err != nil {
log.Error("failed to process message, stop table actor ",
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.Any("message", msgs[i]),
zap.Error(err))
t.handleError(err)
Expand All @@ -224,7 +224,7 @@ func (t *tableActor) Poll(ctx context.Context, msgs []message.Message[pmessage.M
if err := t.handleDataMsg(ctx); err != nil {
log.Error("failed to process message, stop table actor ",
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID), zap.Error(err))
zap.Stringer("span", &t.span), zap.Error(err))
t.handleError(err)
break
}
Expand All @@ -235,7 +235,7 @@ func (t *tableActor) Poll(ctx context.Context, msgs []message.Message[pmessage.M
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID))
zap.Stringer("span", &t.span))
return false
}
return true
Expand Down Expand Up @@ -289,15 +289,15 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
log.Panic("start an already started table",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.String("tableName", t.tableName))
}

splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
sorterNode := newSorterNode(t.tableName, t.tableID,
sorterNode := newSorterNode(t.tableName, t.span.TableID,
t.replicaInfo.StartTs, flowController,
t.mg, &t.state, t.changefeedID, t.redoManager.Enabled(),
t.upstream.PDClient,
Expand All @@ -310,12 +310,12 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
if err := startSorter(t, sortActorNodeContext); err != nil {
log.Error("sorter fails to start",
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.Error(err))
return err
}

pullerNode := newPullerNode(t.tableID, t.replicaInfo.StartTs, t.tableName, t.changefeedVars.ID)
pullerNode := newPullerNode(t.span, t.replicaInfo.StartTs, t.tableName, t.changefeedVars.ID)
pullerActorNodeContext := newContext(sdtTableContext,
t.tableName,
t.globalVars.TableActorSystem.Router(),
Expand All @@ -324,7 +324,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
if err := startPuller(t, pullerActorNodeContext); err != nil {
log.Error("puller fails to start",
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.Error(err))
return err
}
Expand All @@ -333,7 +333,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
}

actorSinkNode := newSinkNode(
t.tableID,
t.span.TableID,
t.tableSinkV1,
t.tableSinkV2,
t.replicaInfo.StartTs, t.targetTs, flowController, t.redoManager,
Expand Down Expand Up @@ -394,7 +394,7 @@ func (t *tableActor) stop() {
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID))
zap.Stringer("span", &t.span))
}

// handleError stops the table actor at first and then reports the error to processor
Expand All @@ -414,7 +414,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if t.redoManager.Enabled() {
return t.redoManager.GetResolvedTs(t.tableID)
return t.redoManager.GetResolvedTs(t.span.TableID)
}
return t.sortNode.ResolvedTs()
}
Expand All @@ -436,7 +436,7 @@ func (t *tableActor) UpdateBarrierTs(ts model.Ts) {
log.Warn("send fails",
zap.Any("msg", msg),
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.Error(err))
}
}
Expand Down Expand Up @@ -506,7 +506,7 @@ func (t *tableActor) State() tablepb.TableState {

// ID returns the ID of source table and mark table
func (t *tableActor) ID() int64 {
return t.tableID
return t.span.TableID
}

// Name returns the quoted schema and table name
Expand All @@ -524,7 +524,7 @@ func (t *tableActor) Cancel() {
if err := t.router.Send(t.mb.ID(), message.ValueMessage(msg)); err != nil {
log.Warn("fails to send Stop message",
zap.String("tableName", t.tableName),
zap.Int64("tableID", t.tableID),
zap.Stringer("span", &t.span),
zap.Error(err))
}
}
Expand Down Expand Up @@ -556,7 +556,7 @@ var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
eventSorter, err := createSorter(ctx, t.tableName, t.tableID)
eventSorter, err := createSorter(ctx, t.tableName, t.span.TableID)
if err != nil {
return errors.Trace(err)
}
Expand Down
19 changes: 12 additions & 7 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand All @@ -48,7 +49,7 @@ func TestAsyncStopFailed(t *testing.T) {

tbl := &tableActor{
stopped: 0,
tableID: 1,
span: spanz.TableIDToComparableSpan(1),
router: tableActorRouter,
redoManager: redo.NewDisabledManager(),
cancel: func() {},
Expand All @@ -71,7 +72,7 @@ func TestAsyncStopFailed(t *testing.T) {

func TestTableActorInterface(t *testing.T) {
table := &tableActor{
tableID: 1,
span: spanz.TableIDToComparableSpan(1),
redoManager: redo.NewDisabledManager(),
tableName: "t1",
state: tablepb.TableStatePreparing,
Expand Down Expand Up @@ -100,9 +101,9 @@ func TestTableActorInterface(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
table.redoManager, _ = redo.NewMockManager(ctx)
table.redoManager.AddTable(table.tableID, 0)
table.redoManager.AddTable(table.span.TableID, 0)
require.Equal(t, model.Ts(0), table.ResolvedTs())
table.redoManager.UpdateResolvedTs(ctx, table.tableID, model.Ts(6))
table.redoManager.UpdateResolvedTs(ctx, table.span.TableID, model.Ts(6))
require.Eventually(t, func() bool { return table.ResolvedTs() == model.Ts(6) },
time.Second*5, time.Millisecond*500)
table.redoManager.Cleanup(ctx)
Expand All @@ -124,7 +125,7 @@ func TestTableActorCancel(t *testing.T) {
tbl := &tableActor{
state: tablepb.TableStatePreparing,
stopped: 0,
tableID: 1,
span: spanz.TableIDToComparableSpan(1),
redoManager: redo.NewDisabledManager(),
router: tableActorRouter,
cancel: func() {},
Expand Down Expand Up @@ -406,7 +407,9 @@ func TestNewTableActor(t *testing.T) {
startSorter = func(t *tableActor, ctx *actorNodeContext) error {
return nil
}
tbl, err := NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1",
tbl, err := NewTableActor(
cctx, upstream.NewUpstream4Test(&mockPD{}), nil,
spanz.TableIDToComparableSpan(1), "t1",
&model.TableReplicaInfo{
StartTs: 0,
}, mocksink.NewNormalMockSink(), nil, redo.NewDisabledManager(), 10)
Expand All @@ -422,7 +425,9 @@ func TestNewTableActor(t *testing.T) {
return errors.New("failed to start puller")
}

tbl, err = NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1",
tbl, err = NewTableActor(
cctx, upstream.NewUpstream4Test(&mockPD{}), nil,
spanz.TableIDToComparableSpan(1), "t1",
&model.TableReplicaInfo{
StartTs: 0,
}, mocksink.NewNormalMockSink(), nil, redo.NewDisabledManager(), 10)
Expand Down
Loading