Skip to content

Commit

Permalink
lib/worker(engine): refactor BaseWorker (Part I) (#5520)
Browse files Browse the repository at this point in the history
ref #5521
  • Loading branch information
liuzix authored May 24, 2022
1 parent cca66c1 commit e9433f5
Show file tree
Hide file tree
Showing 20 changed files with 687 additions and 303 deletions.
5 changes: 0 additions & 5 deletions engine/dm/dump_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (d *dumpWorker) Workload() model.RescUnit {
return 0
}

func (d *dumpWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("dumpWorker.OnMasterFailover")
return nil
}

func (d *dumpWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("dumpWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/dm/load_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ func (l *loadWorker) Workload() model.RescUnit {
return 0
}

func (l *loadWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("loadWorker.OnMasterFailover")
return nil
}

func (l *loadWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("loadWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/dm/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (s *syncWorker) Workload() model.RescUnit {
return 0
}

func (s *syncWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("syncWorker.OnMasterFailover")
return nil
}

func (s *syncWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("syncWorker.OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/executor/cvsTask/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,6 @@ func (task *cvsTask) Workload() model.RescUnit {
return 1
}

// OnMasterFailover is called when the master is failed over.
func (task *cvsTask) OnMasterFailover(reason lib.MasterFailoverReason) error {
return nil
}

func (task *cvsTask) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
switch msg := message.(type) {
case *libModel.StatusChangeRequest:
Expand Down
11 changes: 0 additions & 11 deletions engine/jobmaster/cvsJob/cvsJobMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,11 @@ func (jm *JobMaster) Workload() model.RescUnit {
return 2
}

// OnMasterFailover implements JobMasterImpl.OnMasterFailover
func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error {
return nil
}

// OnMasterMessage implements JobMasterImpl.OnMasterMessage
func (jm *JobMaster) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
return nil
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (jm *JobMaster) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
log.L().Info("cvs jobmaster: OnJobManagerFailover", zap.Any("reason", reason))
return nil
}

// OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage
func (jm *JobMaster) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("cvs jobmaster: OnJobManagerMessage", zap.Any("message", message))
Expand Down
12 changes: 0 additions & 12 deletions engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,6 @@ func (jm *JobMaster) Workload() model.RescUnit {
return 2
}

// OnMasterFailover implements JobMasterImpl.OnMasterFailover
func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error {
// No need to do anything here
return nil
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (jm *JobMaster) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
// No need to do anything here
return nil
}

// IsJobMasterImpl implements JobMasterImpl.IsJobMasterImpl
func (jm *JobMaster) IsJobMasterImpl() {
panic("unreachable")
Expand Down
2 changes: 0 additions & 2 deletions engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ func (t *testDMJobmasterSuite) TestDMJobmaster() {
require.NoError(t.T(), jm.OnWorkerStatusUpdated(workerHandle1, &libModel.WorkerStatus{ExtBytes: bytes1}))
require.NoError(t.T(), jm.OnJobManagerMessage("", ""))
require.NoError(t.T(), jm.OnMasterMessage("", ""))
require.NoError(t.T(), jm.OnJobManagerFailover(lib.MasterFailoverReason{}))
require.NoError(t.T(), jm.OnMasterFailover(lib.MasterFailoverReason{}))
require.Equal(t.T(), jm.Workload(), model.RescUnit(2))
require.NoError(t.T(), jm.OnWorkerStatusUpdated(workerHandle1, &libModel.WorkerStatus{ExtBytes: bytes1}))
require.EqualError(t.T(), jm.OnWorkerMessage(workerHandle1, "", dmpkg.MessageWithID{}), "request 0 not found")
Expand Down
5 changes: 0 additions & 5 deletions engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ func (w *exampleWorker) Status() libModel.WorkerStatus {
return libModel.WorkerStatus{Code: code}
}

func (w *exampleWorker) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("OnMasterFailover")
return nil
}

func (w *exampleWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("OnMasterMessage", zap.Any("message", message))
return nil
Expand Down
5 changes: 0 additions & 5 deletions engine/lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type JobMasterImpl interface {
MasterImpl

Workload() model.RescUnit
OnJobManagerFailover(reason MasterFailoverReason) error
OnJobManagerMessage(topic p2p.Topic, message interface{}) error
// IsJobMasterImpl is an empty function used to prevent accidental implementation
// of this interface.
Expand Down Expand Up @@ -287,10 +286,6 @@ func (j *jobMasterImplAsWorkerImpl) Workload() model.RescUnit {
return j.inner.Workload()
}

func (j *jobMasterImplAsWorkerImpl) OnMasterFailover(reason MasterFailoverReason) error {
return j.inner.OnJobManagerFailover(reason)
}

func (j *jobMasterImplAsWorkerImpl) OnMasterMessage(topic p2p.Topic, message interface{}) error {
return j.inner.OnJobManagerMessage(topic, message)
}
Expand Down
8 changes: 0 additions & 8 deletions engine/lib/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ func (m *testJobMasterImpl) Workload() model.RescUnit {
return args.Get(0).(model.RescUnit)
}

func (m *testJobMasterImpl) OnJobManagerFailover(reason MasterFailoverReason) error {
m.mu.Lock()
defer m.mu.Unlock()

args := m.Called(reason)
return args.Error(0)
}

func (m *testJobMasterImpl) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
12 changes: 0 additions & 12 deletions engine/lib/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ type businessStatus struct {
status map[libModel.WorkerID]*dummyWorkerStatus
}

// OnJobManagerFailover implements JobMasterImpl.OnJobManagerFailover
func (m *Master) OnJobManagerFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnJobManagerFailover", zap.Any("reason", reason))
return nil
}

// OnJobManagerMessage implements JobMasterImpl.OnJobManagerMessage
func (m *Master) OnJobManagerMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("FakeMaster: OnJobManagerMessage", zap.Any("message", message))
Expand Down Expand Up @@ -463,12 +457,6 @@ func (m *Master) CloseImpl(ctx context.Context) error {
return nil
}

// OnMasterFailover implements MasterImpl.OnMasterFailover
func (m *Master) OnMasterFailover(reason lib.MasterFailoverReason) error {
log.L().Info("FakeMaster: OnMasterFailover", zap.Stack("stack"))
return nil
}

// OnMasterMessage implements MasterImpl.OnMasterMessage
func (m *Master) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("FakeMaster: OnMasterMessage", zap.Any("message", message))
Expand Down
4 changes: 0 additions & 4 deletions engine/lib/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,6 @@ func (d *dummyWorker) Workload() model.RescUnit {
return model.RescUnit(10)
}

func (d *dummyWorker) OnMasterFailover(_ lib.MasterFailoverReason) error {
return nil
}

func (d *dummyWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
log.L().Info("fakeWorker: OnMasterMessage", zap.Any("message", message))
switch msg := message.(type) {
Expand Down
10 changes: 0 additions & 10 deletions engine/lib/mock_worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,6 @@ func (w *mockWorkerImpl) Status() libModel.WorkerStatus {
return args.Get(0).(libModel.WorkerStatus)
}

func (w *mockWorkerImpl) OnMasterFailover(reason MasterFailoverReason) error {
w.mu.Lock()
defer w.mu.Unlock()

w.failoverCount.Add(1)

args := w.Called(reason)
return args.Error(0)
}

func (w *mockWorkerImpl) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue) error {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions engine/lib/statusutil/master_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type MasterInfoProvider interface {
MasterID() libModel.MasterID
MasterNode() p2p.NodeID
Epoch() libModel.Epoch
RefreshMasterInfo(ctx context.Context) error
SyncRefreshMasterInfo(ctx context.Context) error
}

// MockMasterInfoProvider defines a mock provider that implements MasterInfoProvider
Expand Down Expand Up @@ -66,8 +66,8 @@ func (p *MockMasterInfoProvider) Epoch() libModel.Epoch {
return p.epoch
}

// RefreshMasterInfo implements MasterInfoProvider.RefreshMasterInfo
func (p *MockMasterInfoProvider) RefreshMasterInfo(ctx context.Context) error {
// SyncRefreshMasterInfo implements MasterInfoProvider.RefreshMasterInfo
func (p *MockMasterInfoProvider) SyncRefreshMasterInfo(ctx context.Context) error {
p.refreshCount.Add(1)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/lib/statusutil/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (w *Writer) sendStatusMessageWithRetry(
})
if err != nil {
if derrors.ErrExecutorNotFoundForMessage.Equal(err) {
if err := w.masterInfo.RefreshMasterInfo(ctx); err != nil {
if err := w.masterInfo.SyncRefreshMasterInfo(ctx); err != nil {
log.L().Warn("failed to refresh master info",
zap.String("worker-id", w.workerID),
zap.String("master-id", w.masterInfo.MasterID()),
Expand Down
Loading

0 comments on commit e9433f5

Please sign in to comment.