Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler(cdc): add ProcessorEpoch #4768

Merged
merged 11 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cdc/model/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func DispatchTableTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableMessage is the message body for dispatching a table.
type DispatchTableMessage struct {
OwnerRev int64 `json:"owner-rev"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
OwnerRev int64 `json:"owner-rev"`
Epoch ProcessorEpoch `json:"epoch"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
}

// DispatchTableResponseTopic returns a message topic for the result of
Expand All @@ -44,7 +45,8 @@ func DispatchTableResponseTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableResponseMessage is the message body for the result of dispatching a table.
type DispatchTableResponseMessage struct {
ID TableID `json:"id"`
ID TableID `json:"id"`
Epoch ProcessorEpoch `json:"epoch"`
}

// AnnounceTopic returns a message topic for announcing an ownership change.
Expand All @@ -64,14 +66,23 @@ func SyncTopic(changefeedID ChangeFeedID) p2p.Topic {
return fmt.Sprintf("send-status-resp/%s", changefeedID)
}

// ProcessorEpoch designates a continuous period of the processor working normally.
type ProcessorEpoch = string

// SyncMessage is the message body for syncing the current states of a processor.
// MsgPack serialization has been implemented to minimize the size of the message.
type SyncMessage struct {
// Sends the processor's version for compatibility check
ProcessorVersion string
Running []TableID
Adding []TableID
Removing []TableID

// Epoch is reset to a unique value when the processor has
// encountered an internal error or other events so that
// it has to re-sync its states with the Owner.
Epoch ProcessorEpoch

Running []TableID
Adding []TableID
Removing []TableID
}

// Marshal serializes the message into MsgPack format.
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ func makeVeryLargeSyncMessage() *SyncMessage {
func TestMarshalDispatchTableMessage(t *testing.T) {
msg := &DispatchTableMessage{
OwnerRev: 1,
Epoch: "test-epoch",
ID: TableID(1),
IsDelete: true,
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"owner-rev":1,"id":1,"is-delete":true}`, string(bytes))
require.Equal(t, `{"owner-rev":1,"epoch":"test-epoch","id":1,"is-delete":true}`, string(bytes))
}

func TestMarshalDispatchTableResponseMessage(t *testing.T) {
msg := &DispatchTableResponseMessage{
ID: TableID(1),
ID: TableID(1),
Epoch: "test-epoch",
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"id":1}`, string(bytes))
require.Equal(t, `{"id":1,"epoch":"test-epoch"}`, string(bytes))
}

func TestMarshalAnnounceMessage(t *testing.T) {
Expand Down
29 changes: 27 additions & 2 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,27 @@ func (s *schedulerV2) DispatchTable(
tableID model.TableID,
captureID model.CaptureID,
isDelete bool,
epoch model.ProcessorEpoch,
) (done bool, err error) {
topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
Epoch: epoch,
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: DispatchTable",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -155,13 +168,24 @@ func (s *schedulerV2) Announce(
ctx context.Context,
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
) (done bool, err error) {
topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: Announce",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -239,7 +263,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
func(sender string, messageI interface{}) error {
message := messageI.(*model.DispatchTableResponseMessage)
s.stats.RecordDispatchResponse()
s.OnAgentFinishedTableOperation(sender, message.ID)
s.OnAgentFinishedTableOperation(sender, message.ID, message.Epoch)
return nil
})
if err != nil {
Expand All @@ -256,6 +280,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
s.stats.RecordSync()
s.OnAgentSyncTaskStatuses(
sender,
message.Epoch,
message.Running,
message.Adding,
message.Removing)
Expand Down
97 changes: 77 additions & 20 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
stdContext "context"
"time"

"go.uber.org/zap/zapcore"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -161,31 +163,70 @@ func (a *agentImpl) Tick(ctx context.Context) error {
func (a *agentImpl) FinishTableOperation(
ctx context.Context,
tableID model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
epoch model.ProcessorEpoch,
) (done bool, err error) {
message := &model.DispatchTableResponseMessage{ID: tableID, Epoch: epoch}
defer func() {
if err != nil {
return
}
log.Info("SchedulerAgent: FinishTableOperation", zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx, a.ownerCaptureID,
model.DispatchTableResponseTopic(a.changeFeed),
&model.DispatchTableResponseMessage{ID: tableID})
message)
if err != nil {
return false, errors.Trace(err)
}
return done, nil
}

func (a *agentImpl) SyncTaskStatuses(
ctx context.Context,
running, adding, removing []model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
ctx context.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID,
) (done bool, err error) {
if !a.Barrier(ctx) {
// The Sync message needs to be strongly ordered w.r.t. other messages.
return false, nil
}

message := &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: epoch,
Running: running,
Adding: adding,
Removing: removing,
}

defer func() {
if err != nil {
return
}
if log.GetLevel() == zapcore.DebugLevel {
// The message can be REALLY large, so we do not print it
// unless the log level is debug.
log.Debug("SchedulerAgent: SyncTaskStatuses",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
return
}
log.Info("SchedulerAgent: SyncTaskStatuses",
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.SyncTopic(a.changeFeed),
&model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: running,
Adding: adding,
Removing: removing,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -196,15 +237,30 @@ func (a *agentImpl) SendCheckpoint(
ctx context.Context,
checkpointTs model.Ts,
resolvedTs model.Ts,
) (bool, error) {
done, err := a.trySendMessage(
) (done bool, err error) {
message := &model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
}

defer func() {
if err != nil {
return
}
// This log is very often, so we only print it if the
// log level is debug.
log.Debug("SchedulerAgent: SendCheckpoint",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.CheckpointTopic(a.changeFeed),
&model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -339,7 +395,8 @@ func (a *agentImpl) registerPeerMessageHandlers() (ret error) {
ownerCapture,
message.OwnerRev,
message.ID,
message.IsDelete)
message.IsDelete,
message.Epoch)
return nil
})
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestAgentBasics(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand All @@ -211,6 +212,7 @@ func TestAgentBasics(t *testing.T) {

_, err = suite.ownerMessageClient.SendMessage(suite.ctx, model.DispatchTableTopic("cf-1"), &model.DispatchTableMessage{
OwnerRev: 1,
Epoch: agent.CurrentEpoch(),
ID: 1,
IsDelete: false,
})
Expand Down Expand Up @@ -263,7 +265,8 @@ func TestAgentBasics(t *testing.T) {
return false
case msg := <-suite.dispatchResponseCh:
require.Equal(t, &model.DispatchTableResponseMessage{
ID: 1,
ID: 1,
Epoch: agent.CurrentEpoch(),
}, msg)
return true
default:
Expand Down Expand Up @@ -317,6 +320,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand Down Expand Up @@ -371,6 +375,7 @@ func TestAgentTolerateClientClosed(t *testing.T) {
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: agent.CurrentEpoch(),
Running: nil,
Adding: nil,
Removing: nil,
Expand Down
Loading