Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/worker(engine): refactor BaseWorker (Part I) #5520

Merged
merged 8 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions engine/dm/dump_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (d *dumpWorker) Workload() model.RescUnit {
return 0
}

func (d *dumpWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("dumpWorker.OnMasterFailover")
return nil
}

func (d *dumpWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("dumpWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/dm/load_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ func (l *loadWorker) Workload() model.RescUnit {
return 0
}

func (l *loadWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("loadWorker.OnMasterFailover")
return nil
}

func (l *loadWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("loadWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/dm/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (s *syncWorker) Workload() model.RescUnit {
return 0
}

func (s *syncWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("syncWorker.OnMasterFailover")
return nil
}

func (s *syncWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("syncWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/executor/cvsTask/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ func (task *cvsTask) Workload() model.RescUnit {
return 1
}

// OnMasterFailover is called when the master is failed over.
func (task *cvsTask) OnMasterFailover(reason lib.MasterFailoverReason) error {
return nil
}

func (task *cvsTask) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
switch msg := message.(type) {
case *libModel.StatusChangeRequest:
Expand Down
11 changes: 0 additions & 11 deletions engine/jobmaster/cvsJob/cvsJobMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,11 @@ func (jm *JobMaster) Workload() model.RescUnit {
return 2
}

// OnMasterFailover implements JobMasterImpl.OnMasterFailover
func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error {
return nil
}

// OnMasterMessage implements JobMasterImpl.OnMasterMessage
func (jm *JobMaster) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
return nil
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (jm *JobMaster) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
log.L().Info("cvs jobmaster: OnJobManagerFailover", zap.Any("reason", reason))
return nil
}

// OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage
func (jm *JobMaster) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("cvs jobmaster: OnJobManagerMessage", zap.Any("message", message))
Expand Down
12 changes: 0 additions & 12 deletions engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,6 @@ func (jm *JobMaster) Workload() model.RescUnit {
return 2
}

// OnMasterFailover implements JobMasterImpl.OnMasterFailover
func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error {
// No need to do anything here
return nil
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (jm *JobMaster) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
// No need to do anything here
return nil
}

// IsJobMasterImpl implements JobMasterImpl.IsJobMasterImpl
func (jm *JobMaster) IsJobMasterImpl() {
panic("unreachable")
Expand Down
2 changes: 0 additions & 2 deletions engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ func (t *testDMJobmasterSuite) TestDMJobmaster() {
require.NoError(t.T(), jm.OnWorkerStatusUpdated(workerHandle1, &libModel.WorkerStatus{ExtBytes: bytes1}))
require.NoError(t.T(), jm.OnJobManagerMessage("", ""))
require.NoError(t.T(), jm.OnMasterMessage("", ""))
require.NoError(t.T(), jm.OnJobManagerFailover(lib.MasterFailoverReason{}))
require.NoError(t.T(), jm.OnMasterFailover(lib.MasterFailoverReason{}))
require.Equal(t.T(), jm.Workload(), model.RescUnit(2))
require.NoError(t.T(), jm.OnWorkerStatusUpdated(workerHandle1, &libModel.WorkerStatus{ExtBytes: bytes1}))
require.EqualError(t.T(), jm.OnWorkerMessage(workerHandle1, "", dmpkg.MessageWithID{}), "request 0 not found")
Expand Down
5 changes: 0 additions & 5 deletions engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ func (w *exampleWorker) Status() libModel.WorkerStatus {
return libModel.WorkerStatus{Code: code}
}

func (w *exampleWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("OnMasterFailover")
return nil
}

func (w *exampleWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type JobMasterImpl interface {
MasterImpl

Workload() model.RescUnit
OnJobManagerFailover(reason MasterFailoverReason) error
OnJobManagerMessage(topic p2p.Topic, message interface{}) error
// IsJobMasterImpl is an empty function used to prevent accidental implementation
// of this interface.
Expand Down Expand Up @@ -287,10 +286,6 @@ func (j *jobMasterImplAsWorkerImpl) Workload() model.RescUnit {
return j.inner.Workload()
}

func (j *jobMasterImplAsWorkerImpl) OnMasterFailover(reason MasterFailoverReason) error {
return j.inner.OnJobManagerFailover(reason)
}

func (j *jobMasterImplAsWorkerImpl) OnMasterMessage(topic p2p.Topic, message interface{}) error {
return j.inner.OnJobManagerMessage(topic, message)
}
Expand Down
8 changes: 0 additions & 8 deletions engine/lib/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ func (m *testJobMasterImpl) Workload() model.RescUnit {
return args.Get(0).(model.RescUnit)
}

func (m *testJobMasterImpl) OnJobManagerFailover(reason MasterFailoverReason) error {
m.mu.Lock()
defer m.mu.Unlock()

args := m.Called(reason)
return args.Error(0)
}

func (m *testJobMasterImpl) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
12 changes: 0 additions & 12 deletions engine/lib/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ type businessStatus struct {
status map[libModel.WorkerID]*dummyWorkerStatus
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (m *Master) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnJobManagerFailover", zap.Any("reason", reason))
return nil
}

// OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage
func (m *Master) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("FakeMaster: OnJobManagerMessage", zap.Any("message", message))
Expand Down Expand Up @@ -463,12 +457,6 @@ func (m *Master) CloseImpl(ctx context.Context) error {
return nil
}

// OnMasterFailover implements MasterImpl.OnMasterFailover
func (m *Master) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnMasterFailover", zap.Stack("stack"))
return nil
}

// OnMasterMessage implements MasterImpl.OnMasterMessage
func (m *Master) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("FakeMaster: OnMasterMessage", zap.Any("message", message))
Expand Down
4 changes: 0 additions & 4 deletions engine/lib/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,6 @@ func (d *dummyWorker) Workload() model.RescUnit {
return model.RescUnit(10)
}

func (d *dummyWorker) OnMasterFailover(_ lib.MasterFailoverReason) error {
return nil
}

func (d *dummyWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("fakeWorker: OnMasterMessage", zap.Any("message", message))
switch msg := message.(type) {
Expand Down
10 changes: 0 additions & 10 deletions engine/lib/mock_worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,6 @@ func (w *mockWorkerImpl) Status() libModel.WorkerStatus {
return args.Get(0).(libModel.WorkerStatus)
}

func (w *mockWorkerImpl) OnMasterFailover(reason MasterFailoverReason) error {
w.mu.Lock()
defer w.mu.Unlock()

w.failoverCount.Add(1)

args := w.Called(reason)
return args.Error(0)
}

func (w *mockWorkerImpl) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions engine/lib/statusutil/master_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type MasterInfoProvider interface {
MasterID() libModel.MasterID
MasterNode() p2p.NodeID
Epoch() libModel.Epoch
RefreshMasterInfo(ctx context.Context) error
SyncRefreshMasterInfo(ctx context.Context) error
}

// MockMasterInfoProvider defines a mock provider that implements MasterInfoProvider
Expand Down Expand Up @@ -66,8 +66,8 @@ func (p *MockMasterInfoProvider) Epoch() libModel.Epoch {
return p.epoch
}

// RefreshMasterInfo implements MasterInfoProvider.RefreshMasterInfo
func (p *MockMasterInfoProvider) RefreshMasterInfo(ctx context.Context) error {
// SyncRefreshMasterInfo implements MasterInfoProvider.RefreshMasterInfo
func (p *MockMasterInfoProvider) SyncRefreshMasterInfo(ctx context.Context) error {
p.refreshCount.Add(1)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/lib/statusutil/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (w *Writer) sendStatusMessageWithRetry(
})
if err != nil {
if derrors.ErrExecutorNotFoundForMessage.Equal(err) {
if err := w.masterInfo.RefreshMasterInfo(ctx); err != nil {
if err := w.masterInfo.SyncRefreshMasterInfo(ctx); err != nil {
log.L().Warn("failed to refresh master info",
zap.String("worker-id", w.workerID),
zap.String("master-id", w.masterInfo.MasterID()),
Expand Down
Loading