Skip to content

Commit

Permalink
modify for code review
Browse files Browse the repository at this point in the history
Signed-off-by: cormick <cormick1080@gmail.com>
  • Loading branch information
CormickKneey committed Nov 17, 2024
1 parent 74b3663 commit 2b4a44c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 20 deletions.
4 changes: 2 additions & 2 deletions manager/database/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func newMysql(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

// AsyncSyncPeers migration.
// Run migration.
if mysqlCfg.Migrate {
if err := migrate(db); err != nil {
return nil, err
}
}

// AsyncSyncPeers seed.
// Run seed.
if err := seed(db); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions manager/database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func newPostgres(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

// AsyncSyncPeers migration.
// Run migration.
if postgresCfg.Migrate {
if err := migrate(db); err != nil {
return nil, err
}
}

// AsyncSyncPeers seed.
// Run seed.
if err := seed(db); err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions manager/job/mocks/sync_peers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 13 additions & 9 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (

// SyncPeers is an interface for sync peers.
type SyncPeers interface {
// AsyncSyncPeers execute action to sync peers, which is async.
AsyncSyncPeers(context.Context, SyncPeersArgs) error
// Run execute action to sync peers, which is async.
Run(context.Context, SyncPeersArgs) error

// Serve started sync peers server.
Serve()
Expand Down Expand Up @@ -70,15 +70,17 @@ type SyncPeersArgs struct {
// newSyncPeers returns a new SyncPeers.
func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) {
return &syncPeers{
config: cfg,
db: gdb,
job: job,
done: make(chan struct{}),
config: cfg,
db: gdb,
job: job,
done: make(chan struct{}),
workChan: make(chan SyncPeersArgs),
syncLocker: sync.Mutex{},
}, nil
}

// AsyncSyncPeers start to sync peers.
func (s *syncPeers) AsyncSyncPeers(ctx context.Context, args SyncPeersArgs) error {
// Run start to sync peers.
func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error {
if len(args.CandidateSchedulerClusters) == 0 {
if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil {
return fmt.Errorf("failed to get candidate scheduler clusters: %v", err)
Expand All @@ -95,10 +97,12 @@ func (s *syncPeers) Serve() {
for {
select {
case <-tick.C:
logger.Debugf("start to sync peerrs periodically")
if err := s.syncPeers(context.Background(), nil); err != nil {
logger.Errorf("sync peers failed periodically: %v", err)
}
case args := <-s.workChan:
logger.Debugf("start to sync peerrs for request")
err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters)
if err != nil {
logger.Errorf("sync peers failed for request: %v", err)
Expand All @@ -110,7 +114,7 @@ func (s *syncPeers) Serve() {
if err == nil {
state = machineryv1tasks.StateSuccess
}
if updateErr := s.db.WithContext(context.Background()).First(&job, args.TaskID).Updates(models.Job{
if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{
State: state,
}).Error; updateErr != nil {
logger.Errorf("update sync peers job result failed for request: %v", updateErr)
Expand Down
8 changes: 7 additions & 1 deletion manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ import (
)

func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
args, err := structure.StructToMap(json)
if err != nil {
return nil, err
}

candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil)
if err != nil {
return nil, err
Expand All @@ -48,7 +53,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP

taskID := fmt.Sprintf("manager_%v", uuid.New().String())

if err = s.job.SyncPeers.AsyncSyncPeers(ctx, job.SyncPeersArgs{
if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{
CandidateSchedulerClusters: candidateClusters,
TaskID: taskID,
}); err != nil {
Expand All @@ -59,6 +64,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP
job := models.Job{
TaskID: taskID,
BIO: json.BIO,
Args: args,
Type: json.Type,
State: machineryv1tasks.StateStarted,
UserID: json.UserID,
Expand Down

0 comments on commit 2b4a44c

Please sign in to comment.