Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tp: implement replicationManager #5606

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ type captureManager struct {
func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager {
return &captureManager{
OwnerRev: rev,
Captures: make(map[string]*CaptureStatus),
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: heartbeatTick,
}
}

func (c *captureManager) captureTableSets() map[model.CaptureID]*CaptureStatus {
func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus {
return c.Captures
}

func (c *captureManager) checkAllCaptureInitialized() bool {
func (c *captureManager) CheckAllCaptureInitialized() bool {
for _, captrueStatus := range c.Captures {
if captrueStatus.State == CaptureStateUninitialize {
return false
Expand All @@ -103,7 +103,7 @@ func (c *captureManager) checkAllCaptureInitialized() bool {
return true
}

func (c *captureManager) tick() []*schedulepb.Message {
func (c *captureManager) Tick() []*schedulepb.Message {
c.tickCounter++
if c.tickCounter < c.heartbeatTick {
return nil
Expand All @@ -120,10 +120,10 @@ func (c *captureManager) tick() []*schedulepb.Message {
return msgs
}

func (c *captureManager) poll(
func (c *captureManager) Poll(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
msgs []*schedulepb.Message,
) ([]*schedulepb.Message, bool) {
) []*schedulepb.Message {
outMsgs := c.onAliveCaptureUpdate(aliveCaptures)
for _, msg := range msgs {
if msg.MsgType == schedulepb.MsgHeartbeatResponse {
Expand All @@ -135,7 +135,7 @@ func (c *captureManager) poll(
msg.GetHeartbeatResponse(), msg.Header.ProcessorEpoch)
}
}
return outMsgs, c.checkAllCaptureInitialized()
return outMsgs
}

func (c *captureManager) onAliveCaptureUpdate(
Expand Down
41 changes: 26 additions & 15 deletions cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,45 @@ func TestCaptureManagerPoll(t *testing.T) {
cm := newCaptureManager(rev, 2)

// Initial poll for alive captures.
msgs, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)
msgs := cm.Poll(ms, nil)
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
}, msgs)
require.False(t, cm.CheckAllCaptureInitialized())

// Poll one response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "1",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.False(t, hasInit)
require.Empty(t, msgs)
require.False(t, cm.CheckAllCaptureInitialized())

// Poll another response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "2",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.True(t, hasInit, "%v %v", cm.Captures["1"], cm.Captures["2"])
require.Empty(t, msgs)
require.True(t, cm.CheckAllCaptureInitialized(), "%v %v", cm.Captures["1"], cm.Captures["2"])

// Poll unknown capture response
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "unknown",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.Empty(t, msgs)
require.True(t, cm.CheckAllCaptureInitialized())
}

func TestCaptureManagerTick(t *testing.T) {
Expand All @@ -94,22 +105,22 @@ func TestCaptureManagerTick(t *testing.T) {
cm := newCaptureManager(rev, 2)

// No heartbeat if there is no capture.
msgs := cm.tick()
msgs := cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.Empty(t, msgs)

ms := map[model.CaptureID]*model.CaptureInfo{
"1": {},
"2": {},
}
_, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)
cm.Poll(ms, nil)
require.False(t, cm.CheckAllCaptureInitialized())

// Heartbeat even if capture is uninitialize.
msgs = cm.tick()
msgs = cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
Expand All @@ -119,10 +130,10 @@ func TestCaptureManagerTick(t *testing.T) {
for _, s := range []CaptureState{CaptureStateInitialized, CaptureStateStopping} {
cm.Captures["1"].State = s
cm.Captures["2"].State = s
require.True(t, cm.checkAllCaptureInitialized())
msgs = cm.tick()
require.True(t, cm.CheckAllCaptureInitialized())
msgs = cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
Expand Down
94 changes: 87 additions & 7 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ package tp

import (
"context"
"log"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)

type scheduler interface {
Expand All @@ -34,12 +40,38 @@ type scheduler interface {
var _ internal.Scheduler = (*coordinator)(nil)

type coordinator struct {
version string
revision schedulepb.OwnerRevision
trans transport
scheduler []scheduler
replicationM *replicationManager
captureM *captureManager
}

// NewCoordinator returns a two phase scheduler.
func NewCoordinator(
ctx context.Context,
changeFeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := newTranport(ctx, changeFeedID, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
revision := schedulepb.OwnerRevision{Revision: ownerRevision}
return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
trans: trans,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
captureM: newCaptureManager(revision, cfg.HeartbeatTick),
}, nil
}

func (c *coordinator) Tick(
ctx context.Context,
// Latest global checkpoint of the changefeed
Expand Down Expand Up @@ -68,33 +100,81 @@ func (c *coordinator) poll(
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) error {
recvMsgs, err := c.trans.Recv(ctx)
recvMsgs, err := c.recvMsgs(ctx)
if err != nil {
return errors.Trace(err)
}
sentMsgs, hasInit := c.captureM.poll(aliveCaptures, recvMsgs)
if !hasInit {

sentMsgs := c.captureM.Tick()
msgs := c.captureM.Poll(aliveCaptures, recvMsgs)
sentMsgs = append(sentMsgs, msgs...)
if c.captureM.CheckAllCaptureInitialized() {
// Skip polling replication manager as not all capture are initialized.
err := c.trans.Send(ctx, sentMsgs)
return errors.Trace(err)
}

captureTables := c.captureM.captureTableSets()
// Handling received messages to advance replication set.
msgs, err = c.replicationM.HandleMessage(recvMsgs)
if err != nil {
return errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)

// Generate schedule tasks based on the current status.
captureTables := c.captureM.CaptureTableSets()
allTasks := make([]*scheduleTask, 0)
for _, sched := range c.scheduler {
tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, captureTables)
allTasks = append(allTasks, tasks...)
}
msgs, err := c.replicationM.poll(
ctx, checkpointTs, currentTables, aliveCaptures, recvMsgs, allTasks)

// Handling generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
if err != nil {
return errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)
err = c.trans.Send(ctx, sentMsgs)

// Send new messages.
err = c.sendMsgs(ctx, sentMsgs)
if err != nil {
return errors.Trace(err)
}

// checkpoint calcuation
return nil
}

func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) {
recvMsgs, err := c.trans.Recv(ctx)
if err != nil {
return nil, errors.Trace(err)
}

n := 0
for _, val := range recvMsgs {
// Filter stale messages.
if val.Header.OwnerRevision == c.revision {
recvMsgs[n] = val
n++
}
}
return recvMsgs[:n], nil
}

func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error {
for i := range msgs {
m := msgs[i]
m.Header = &schedulepb.Message_Header{
Version: c.version,
OwnerRevision: c.revision,
}
// Correctness check.
if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown {
log.Panic("invalid message no destination or unknown message type",
zap.Any("message", m))
}
}
return c.trans.Send(ctx, msgs)
}
76 changes: 76 additions & 0 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,79 @@
// limitations under the License.

package tp

import (
"context"
"testing"

"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/stretchr/testify/require"
)

type mockTrans struct {
send func(ctx context.Context, msgs []*schedulepb.Message) error
recv func(ctx context.Context) ([]*schedulepb.Message, error)
}

func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error {
return m.send(ctx, msgs)
}
func (m *mockTrans) Recv(ctx context.Context) ([]*schedulepb.Message, error) {
return m.recv(ctx)
}

func TestCoordinatorSendMsgs(t *testing.T) {
t.Parallel()
ctx := context.Background()
trans := &mockTrans{}
cood := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
}
trans.send = func(ctx context.Context, msgs []*schedulepb.Message) error {
require.EqualValues(t, []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
Version: cood.version,
OwnerRevision: cood.revision,
},
To: "1", MsgType: schedulepb.MsgDispatchTableRequest,
}}, msgs)
return nil
}
cood.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})
}

func TestCoordinatorRecvMsgs(t *testing.T) {
t.Parallel()

ctx := context.Background()
trans := &mockTrans{}
cood := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
}
trans.recv = func(ctx context.Context) ([]*schedulepb.Message, error) {
return []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
OwnerRevision: cood.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
}, {
Header: &schedulepb.Message_Header{
OwnerRevision: schedulepb.OwnerRevision{Revision: 4},
},
From: "2", MsgType: schedulepb.MsgDispatchTableResponse,
}}, nil
}
msgs, err := cood.recvMsgs(ctx)
require.Nil(t, err)
require.EqualValues(t, []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
OwnerRevision: cood.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
}}, msgs)
}
Loading