Skip to content

Commit

Permalink
scheduler(cdc): add ProcessorEpoch (#4768)
Browse files Browse the repository at this point in the history
close #4769
  • Loading branch information
liuzix authored Mar 7, 2022
1 parent ddd140a commit 0578db3
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 124 deletions.
25 changes: 18 additions & 7 deletions cdc/model/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func DispatchTableTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableMessage is the message body for dispatching a table.
type DispatchTableMessage struct {
OwnerRev int64 `json:"owner-rev"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
OwnerRev int64 `json:"owner-rev"`
Epoch ProcessorEpoch `json:"epoch"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
}

// DispatchTableResponseTopic returns a message topic for the result of
Expand All @@ -44,7 +45,8 @@ func DispatchTableResponseTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableResponseMessage is the message body for the result of dispatching a table.
type DispatchTableResponseMessage struct {
ID TableID `json:"id"`
ID TableID `json:"id"`
Epoch ProcessorEpoch `json:"epoch"`
}

// AnnounceTopic returns a message topic for announcing an ownership change.
Expand All @@ -64,14 +66,23 @@ func SyncTopic(changefeedID ChangeFeedID) p2p.Topic {
return fmt.Sprintf("send-status-resp/%s", changefeedID)
}

// ProcessorEpoch designates a continuous period of the processor working normally.
type ProcessorEpoch = string

// SyncMessage is the message body for syncing the current states of a processor.
// MsgPack serialization has been implemented to minimize the size of the message.
type SyncMessage struct {
// Sends the processor's version for compatibility check
ProcessorVersion string
Running []TableID
Adding []TableID
Removing []TableID

// Epoch is reset to a unique value when the processor has
// encountered an internal error or other events so that
// it has to re-sync its states with the Owner.
Epoch ProcessorEpoch

Running []TableID
Adding []TableID
Removing []TableID
}

// Marshal serializes the message into MsgPack format.
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ func makeVeryLargeSyncMessage() *SyncMessage {
func TestMarshalDispatchTableMessage(t *testing.T) {
msg := &DispatchTableMessage{
OwnerRev: 1,
Epoch: "test-epoch",
ID: TableID(1),
IsDelete: true,
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"owner-rev":1,"id":1,"is-delete":true}`, string(bytes))
require.Equal(t, `{"owner-rev":1,"epoch":"test-epoch","id":1,"is-delete":true}`, string(bytes))
}

func TestMarshalDispatchTableResponseMessage(t *testing.T) {
msg := &DispatchTableResponseMessage{
ID: TableID(1),
ID: TableID(1),
Epoch: "test-epoch",
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"id":1}`, string(bytes))
require.Equal(t, `{"id":1,"epoch":"test-epoch"}`, string(bytes))
}

func TestMarshalAnnounceMessage(t *testing.T) {
Expand Down
29 changes: 27 additions & 2 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,27 @@ func (s *schedulerV2) DispatchTable(
tableID model.TableID,
captureID model.CaptureID,
isDelete bool,
epoch model.ProcessorEpoch,
) (done bool, err error) {
topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
Epoch: epoch,
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: DispatchTable",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -155,13 +168,24 @@ func (s *schedulerV2) Announce(
ctx context.Context,
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
) (done bool, err error) {
topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: Announce",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -239,7 +263,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
func(sender string, messageI interface{}) error {
message := messageI.(*model.DispatchTableResponseMessage)
s.stats.RecordDispatchResponse()
s.OnAgentFinishedTableOperation(sender, message.ID)
s.OnAgentFinishedTableOperation(sender, message.ID, message.Epoch)
return nil
})
if err != nil {
Expand All @@ -256,6 +280,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
s.stats.RecordSync()
s.OnAgentSyncTaskStatuses(
sender,
message.Epoch,
message.Running,
message.Adding,
message.Removing)
Expand Down
97 changes: 77 additions & 20 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
stdContext "context"
"time"

"go.uber.org/zap/zapcore"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -161,31 +163,70 @@ func (a *agentImpl) Tick(ctx context.Context) error {
func (a *agentImpl) FinishTableOperation(
ctx context.Context,
tableID model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
epoch model.ProcessorEpoch,
) (done bool, err error) {
message := &model.DispatchTableResponseMessage{ID: tableID, Epoch: epoch}
defer func() {
if err != nil {
return
}
log.Info("SchedulerAgent: FinishTableOperation", zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx, a.ownerCaptureID,
model.DispatchTableResponseTopic(a.changeFeed),
&model.DispatchTableResponseMessage{ID: tableID})
message)
if err != nil {
return false, errors.Trace(err)
}
return done, nil
}

func (a *agentImpl) SyncTaskStatuses(
ctx context.Context,
running, adding, removing []model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
ctx context.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID,
) (done bool, err error) {
if !a.Barrier(ctx) {
// The Sync message needs to be strongly ordered w.r.t. other messages.
return false, nil
}

message := &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: epoch,
Running: running,
Adding: adding,
Removing: removing,
}

defer func() {
if err != nil {
return
}
if log.GetLevel() == zapcore.DebugLevel {
// The message can be REALLY large, so we do not print it
// unless the log level is debug.
log.Debug("SchedulerAgent: SyncTaskStatuses",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
return
}
log.Info("SchedulerAgent: SyncTaskStatuses",
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.SyncTopic(a.changeFeed),
&model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: running,
Adding: adding,
Removing: removing,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -196,15 +237,30 @@ func (a *agentImpl) SendCheckpoint(
ctx context.Context,
checkpointTs model.Ts,
resolvedTs model.Ts,
) (bool, error) {
done, err := a.trySendMessage(
) (done bool, err error) {
message := &model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
}

defer func() {
if err != nil {
return
}
// This log is very often, so we only print it if the
// log level is debug.
log.Debug("SchedulerAgent: SendCheckpoint",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.CheckpointTopic(a.changeFeed),
&model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -339,7 +395,8 @@ func (a *agentImpl) registerPeerMessageHandlers() (ret error) {
ownerCapture,
message.OwnerRev,
message.ID,
message.IsDelete)
message.IsDelete,
message.Epoch)
return nil
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestAgentBasics(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand All @@ -211,6 +212,7 @@ func TestAgentBasics(t *testing.T) {

_, err = suite.ownerMessageClient.SendMessage(suite.ctx, model.DispatchTableTopic("cf-1"), &model.DispatchTableMessage{
OwnerRev: 1,
Epoch: agent.CurrentEpoch(),
ID: 1,
IsDelete: false,
})
Expand Down Expand Up @@ -263,7 +265,8 @@ func TestAgentBasics(t *testing.T) {
return false
case msg := <-suite.dispatchResponseCh:
require.Equal(t, &model.DispatchTableResponseMessage{
ID: 1,
ID: 1,
Epoch: agent.CurrentEpoch(),
}, msg)
return true
default:
Expand Down Expand Up @@ -317,6 +320,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand Down Expand Up @@ -371,6 +375,7 @@ func TestAgentTolerateClientClosed(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand Down
Loading

0 comments on commit 0578db3

Please sign in to comment.