diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c5c4546f06c..9e45c51e079 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -23,8 +23,8 @@ jobs: - name: Golangci lint uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 with: - version: v1.54 - args: --verbose + version: v1.62 + args: --verbose --timeout=10m - name: Markdown lint uses: docker://avtodev/markdown-lint:v1@sha256:6aeedc2f49138ce7a1cd0adffc1b1c0321b841dc2102408967d9301c031949ee diff --git a/.golangci.yml b/.golangci.yml index 2680b7f03d0..877253d2c70 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,11 +24,9 @@ linters: enable: - gci - gofmt - - golint - misspell - govet - goconst - - deadcode - gocyclo - staticcheck - errcheck diff --git a/client-rs b/client-rs index 10bab190a28..ee21989120b 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit 10bab190a28552cd05760b7bd75626990834877a +Subproject commit ee21989120b16db4e8456e7d163f9ba06aad98c9 diff --git a/manager/config/config.go b/manager/config/config.go index d8e2dd1d149..4760ff459cd 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -338,6 +338,9 @@ type SyncPeersConfig struct { // Timeout is the timeout for syncing peers information from the single scheduler. Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` + + // BatchSize is the batch size when operating gorm. + BatchSize int `yaml:"batchSize" mapstructure:"batchSize"` } type PreheatTLSClientConfig struct { @@ -447,8 +450,9 @@ func New() *Config { TLS: PreheatTLSClientConfig{}, }, SyncPeers: SyncPeersConfig{ - Interval: DefaultJobSyncPeersInterval, - Timeout: DefaultJobSyncPeersTimeout, + Interval: DefaultJobSyncPeersInterval, + Timeout: DefaultJobSyncPeersTimeout, + BatchSize: DefaultJobSyncPeersBatchSize, }, }, ObjectStorage: ObjectStorageConfig{ diff --git a/manager/config/constants.go b/manager/config/constants.go index e8b1082e3d5..79f9a2d9ec6 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -107,6 +107,9 @@ const ( // DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster. DefaultClusterJobRateLimit = 10 + + // DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler. + DefaultJobSyncPeersBatchSize = 500 ) const ( diff --git a/manager/handlers/job.go b/manager/handlers/job.go index f7f17ae582a..4172948eae1 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -62,6 +62,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + ctx.JSON(http.StatusOK, job) + case job.SyncPeersJob: + var json types.CreateSyncPeersJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + ctx.JSON(http.StatusOK, job) case job.GetTaskJob: var json types.CreateGetTaskJobRequest diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go index 5cf9824e5fb..49f2ace24c1 100644 --- a/manager/job/mocks/sync_peers_mock.go +++ b/manager/job/mocks/sync_peers_mock.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + job "d7y.io/dragonfly/v2/manager/job" gomock "go.uber.org/mock/gomock" ) @@ -41,17 +42,17 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder { } // Run mocks base method. -func (m *MockSyncPeers) Run(arg0 context.Context) error { +func (m *MockSyncPeers) Run(arg0 context.Context, arg1 job.SyncPeersArgs) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", arg0) + ret := m.ctrl.Call(m, "Run", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Run indicates an expected call of Run. -func (mr *MockSyncPeersMockRecorder) Run(arg0 any) *gomock.Call { +func (mr *MockSyncPeersMockRecorder) Run(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0, arg1) } // Serve mocks base method. diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index 177a68752b2..9862a8b5a40 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -21,17 +21,20 @@ package job import ( "context" "fmt" + "sync" "time" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" + "gorm.io/gorm/clause" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/pkg/container/slice" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" @@ -39,8 +42,8 @@ import ( // SyncPeers is an interface for sync peers. type SyncPeers interface { - // Run sync peers. - Run(context.Context) error + // Run execute action to sync peers, which is async. + Run(context.Context, SyncPeersArgs) error // Serve started sync peers server. Serve() @@ -55,24 +58,92 @@ type syncPeers struct { job *internaljob.Job db *gorm.DB done chan struct{} + + syncLocker sync.Mutex + workChan chan SyncPeersArgs +} + +type SyncPeersArgs struct { + CandidateSchedulerClusters []models.SchedulerCluster + TaskID string } // newSyncPeers returns a new SyncPeers. func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) { return &syncPeers{ - config: cfg, - db: gdb, - job: job, - done: make(chan struct{}), + config: cfg, + db: gdb, + job: job, + done: make(chan struct{}), + workChan: make(chan SyncPeersArgs, 10), + syncLocker: sync.Mutex{}, }, nil } -// Run sync peers. -func (s *syncPeers) Run(ctx context.Context) error { - // Find all of the scheduler clusters that has active schedulers. - var candidateSchedulerClusters []models.SchedulerCluster - if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { - return err +// Run start to sync peers. +func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error { + if len(args.CandidateSchedulerClusters) == 0 { + if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil { + return fmt.Errorf("failed to get candidate scheduler clusters: %v", err) + } + } + + s.workChan <- args + return nil +} + +// Serve started sync peers server. +func (s *syncPeers) Serve() { + ticker := time.NewTicker(s.config.Job.SyncPeers.Interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + logger.Debugf("start to sync peers periodically") + if err := s.syncPeers(context.Background(), nil); err != nil { + logger.Errorf("sync peers failed periodically: %v", err) + } + case args := <-s.workChan: + logger.Debugf("start to sync peers for request") + err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters) + if err != nil { + logger.Errorf("sync peers failed for request: %v", err) + } + + if args.TaskID != "" { + job := models.Job{} + state := machineryv1tasks.StateFailure + if err == nil { + state = machineryv1tasks.StateSuccess + } + if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{ + State: state, + }).Error; updateErr != nil { + logger.Errorf("update sync peers job result failed for request: %v", updateErr) + } + } + case <-s.done: + return + } + } +} + +// Stop sync peers server. +func (s *syncPeers) Stop() { + close(s.done) +} + +// syncPeers is the real working function in synchronous mode. +func (s *syncPeers) syncPeers(ctx context.Context, candidateSchedulerClusters []models.SchedulerCluster) error { + if !s.syncLocker.TryLock() { + return fmt.Errorf("another sync peers is already running") + } + defer s.syncLocker.Unlock() + + if len(candidateSchedulerClusters) == 0 { + if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { + return err + } } // Find all of the schedulers that has active scheduler cluster. @@ -111,26 +182,6 @@ func (s *syncPeers) Run(ctx context.Context) error { return nil } -// Serve started sync peers server. -func (s *syncPeers) Serve() { - tick := time.NewTicker(s.config.Job.SyncPeers.Interval) - for { - select { - case <-tick.C: - if err := s.Run(context.Background()); err != nil { - logger.Errorf("sync peers failed: %v", err) - } - case <-s.done: - return - } - } -} - -// Stop sync peers server. -func (s *syncPeers) Stop() { - close(s.done) -} - // createSyncPeers creates sync peers. func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) { var span trace.Span @@ -154,7 +205,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu logger.Infof("create sync peers in queue %v, task: %#v", queue, task) asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task) if err != nil { - logger.Errorf("create sync peers in queue %v failed", queue, err) + logger.Errorf("create sync peers in queue %v failed: %v", queue, err) return nil, err } @@ -175,84 +226,111 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu // Merge sync peer results with the data in the peer table. func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) { - // Convert sync peer results from slice to map. - syncPeers := make(map[string]*resource.Host) - for _, result := range results { - syncPeers[result.ID] = result - } + // Fetch existing peers from the database + var existingPeers []models.Peer + var count int64 - rows, err := s.db.Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).Rows() - if err != nil { - log.Error(err) + if err := s.db.Model(&models.Peer{}). + Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). + Count(&count). + Error; err != nil { + log.Error("failed to count existing peers: ", err) return } - defer rows.Close() - for rows.Next() { - peer := models.Peer{} - if err := s.db.ScanRows(rows, &peer); err != nil { - log.Error(err) - continue + log.Infof("total peers count: %d", count) + + pageSize := s.config.Job.SyncPeers.BatchSize + totalPages := (count + int64(pageSize-1)) / int64(pageSize) + + for page := 1; page <= int(totalPages); page++ { + var batchPeers []models.Peer + if err := s.db.Preload("SchedulerCluster"). + Scopes(models.Paginate(page, pageSize)). + Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). + Find(&batchPeers). + Error; err != nil { + log.Error("Failed to fetch peers in batch: ", err) + return } - // If the peer exists in the sync peer results, update the peer data in the database with - // the sync peer results and delete the sync peer from the sync peers map. - isSeedPeer := types.ParseHostType(peer.Type) != types.HostTypeNormal - id := idgen.HostIDV2(peer.IP, peer.Hostname, isSeedPeer) - if syncPeer, ok := syncPeers[id]; ok { - if err := s.db.WithContext(ctx).First(&models.Peer{}, peer.ID).Updates(models.Peer{ - Type: syncPeer.Type.Name(), - IDC: syncPeer.Network.IDC, - Location: syncPeer.Network.Location, - Port: syncPeer.Port, - DownloadPort: syncPeer.DownloadPort, - ObjectStoragePort: syncPeer.ObjectStoragePort, - State: models.PeerStateActive, - OS: syncPeer.OS, - Platform: syncPeer.Platform, - PlatformFamily: syncPeer.PlatformFamily, - PlatformVersion: syncPeer.PlatformVersion, - KernelVersion: syncPeer.KernelVersion, - GitVersion: syncPeer.Build.GitVersion, - GitCommit: syncPeer.Build.GitCommit, - BuildPlatform: syncPeer.Build.Platform, - }).Error; err != nil { - log.Error(err) - } + existingPeers = append(existingPeers, batchPeers...) + } - // Delete the sync peer from the sync peers map. - delete(syncPeers, id) - } else { - // If the peer does not exist in the sync peer results, delete the peer in the database. - if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil { - log.Error(err) - } + // Calculate differences using diffPeers function + toUpsert, toDelete := diffPeers(existingPeers, results) + + // Perform batch upsert + if len(toUpsert) > 0 { + // Construct the upsert query + if err := s.db.WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }). + CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize). + Error; err != nil { + log.Error(err) } } - // Insert the sync peers that do not exist in the database into the peer table. - for _, syncPeer := range syncPeers { - if err := s.db.WithContext(ctx).Create(&models.Peer{ - Hostname: syncPeer.Hostname, - Type: syncPeer.Type.Name(), - IDC: syncPeer.Network.IDC, - Location: syncPeer.Network.Location, - IP: syncPeer.IP, - Port: syncPeer.Port, - DownloadPort: syncPeer.DownloadPort, - ObjectStoragePort: syncPeer.ObjectStoragePort, - State: models.PeerStateActive, - OS: syncPeer.OS, - Platform: syncPeer.Platform, - PlatformFamily: syncPeer.PlatformFamily, - PlatformVersion: syncPeer.PlatformVersion, - KernelVersion: syncPeer.KernelVersion, - GitVersion: syncPeer.Build.GitVersion, - GitCommit: syncPeer.Build.GitCommit, - BuildPlatform: syncPeer.Build.Platform, - SchedulerClusterID: uint(syncPeer.SchedulerClusterID), - }).Error; err != nil { + // Perform batch delete + if len(toDelete) > 0 { + if err := s.db.WithContext(ctx). + Delete(&toDelete). + Error; err != nil { log.Error(err) } } } + +func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) { + // Convert current peers to a map for quick lookup + currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string { + return item.ID + }) + + // Convert existing peers to a map for quick lookup + existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string { + return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal) + }) + + // Calculate differences + for id, currentPeer := range currentPeersMap { + if _, ok := existingPeersMap[id]; ok { + // Remove from existingPeersMap to mark it as processed + delete(existingPeersMap, id) + } + // Add all current peers to upsert list + toUpsert = append(toUpsert, convertToModelPeer(*currentPeer)) + } + + // Peers left in existingPeersMap are to be deleted + toDelete = slice.Values(existingPeersMap) + + return toUpsert, toDelete +} + +// Helper function to convert resource.Host to models.Peer +func convertToModelPeer(peer resource.Host) models.Peer { + return models.Peer{ + Hostname: peer.Hostname, + Type: peer.Type.Name(), + IDC: peer.Network.IDC, + Location: peer.Network.Location, + IP: peer.IP, + Port: peer.Port, + DownloadPort: peer.DownloadPort, + ObjectStoragePort: peer.ObjectStoragePort, + State: models.PeerStateActive, + OS: peer.OS, + Platform: peer.Platform, + PlatformFamily: peer.PlatformFamily, + PlatformVersion: peer.PlatformVersion, + KernelVersion: peer.KernelVersion, + GitVersion: peer.Build.GitVersion, + GitCommit: peer.Build.GitCommit, + BuildPlatform: peer.Build.Platform, + SchedulerClusterID: uint(peer.SchedulerClusterID), + } +} diff --git a/manager/job/sync_peers_test.go b/manager/job/sync_peers_test.go new file mode 100644 index 00000000000..caf7891fb72 --- /dev/null +++ b/manager/job/sync_peers_test.go @@ -0,0 +1,118 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package job + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/pkg/idgen" + "d7y.io/dragonfly/v2/pkg/types" + resource "d7y.io/dragonfly/v2/scheduler/resource/standard" +) + +func Test_diffPeers(t *testing.T) { + type args struct { + existingPeers []models.Peer + currentPeers []*resource.Host + } + tests := []struct { + name string + args args + wantToUpsert []models.Peer + wantToDelete []models.Peer + }{ + { + name: "append", + args: args{ + existingPeers: []models.Peer{ + // delete for not existing + generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed), + // delete for original HostTypeNormal + generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal), + // delete for type changed + generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal), + // update for port changed + generateModePeer("127.0.0.1", "foo1", 80, 443, types.HostTypeSuperSeed), + // update for type changed + generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeStrongSeed), + }, + currentPeers: []*resource.Host{ + resource.NewHost( + idgen.HostIDV2("127.0.0.1", "foo1", true), + "127.0.0.1", + "foo1", + 80, + 80, + types.HostTypeSuperSeed), + resource.NewHost( + idgen.HostIDV2("127.0.0.2", "foo2", true), + "127.0.0.2", + "foo2", + 80, + 80, + types.HostTypeSuperSeed), + resource.NewHost( + idgen.HostIDV2("127.0.0.3", "foo3", true), + "127.0.0.3", + "foo3", + 80, + 80, + types.HostTypeSuperSeed), // append only + }, + }, + wantToUpsert: []models.Peer{ + generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed), + generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed), + generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed), + }, + wantToDelete: []models.Peer{ + generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal), + generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal), + generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers) + // sort the result to compare + sort.Slice(gotToUpdate, func(i, j int) bool { + return gotToUpdate[i].IP < gotToUpdate[j].IP + }) + sort.Slice(gotToDelete, func(i, j int) bool { + return gotToDelete[i].IP < gotToDelete[j].IP + }) + assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) + assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) + }) + } +} + +func generateModePeer(ip, hostname string, port, downloadPort int32, typ types.HostType) models.Peer { + return models.Peer{ + Hostname: hostname, + Type: typ.Name(), + IP: ip, + Port: port, + State: models.PeerStateActive, + DownloadPort: downloadPort, + } +} diff --git a/manager/service/job.go b/manager/service/job.go index f786eaaadff..2fdbbbf3dea 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -22,9 +22,11 @@ import ( "fmt" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" @@ -34,6 +36,49 @@ import ( "d7y.io/dragonfly/v2/pkg/structure" ) +func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) { + args, err := structure.StructToMap(json) + if err != nil { + return nil, err + } + + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil) + if err != nil { + return nil, err + } + + var candidateClusters []models.SchedulerCluster + for _, scheduler := range candidateSchedulers { + candidateClusters = append(candidateClusters, scheduler.SchedulerCluster) + } + + taskID := fmt.Sprintf("manager_%v", uuid.New().String()) + + if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{ + CandidateSchedulerClusters: candidateClusters, + TaskID: taskID, + }); err != nil { + return nil, err + } + + // job here is a local one controlled by the manager self. + job := models.Job{ + TaskID: taskID, + BIO: json.BIO, + Args: args, + Type: json.Type, + State: machineryv1tasks.StateStarted, + UserID: json.UserID, + SchedulerClusters: candidateClusters, + } + + if err = s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + return &job, nil +} + func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) { if json.Args.Scope == "" { json.Args.Scope = types.SinglePeerScope diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index b0eb8cbdf97..638b46b3d2f 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -340,6 +340,21 @@ func (mr *MockServiceMockRecorder) CreateSeedPeerCluster(arg0, arg1 any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSeedPeerCluster", reflect.TypeOf((*MockService)(nil).CreateSeedPeerCluster), arg0, arg1) } +// CreateSyncPeersJob mocks base method. +func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateSyncPeersJob", ctx, json) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateSyncPeersJob indicates an expected call of CreateSyncPeersJob. +func (mr *MockServiceMockRecorder) CreateSyncPeersJob(ctx, json any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSyncPeersJob", reflect.TypeOf((*MockService)(nil).CreateSyncPeersJob), ctx, json) +} + // CreateV1Preheat mocks base method. func (m *MockService) CreateV1Preheat(arg0 context.Context, arg1 types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) { m.ctrl.T.Helper() diff --git a/manager/service/service.go b/manager/service/service.go index 746361e0369..b849b006445 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -115,6 +115,7 @@ type Service interface { GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error) CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error) + CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error diff --git a/manager/types/job.go b/manager/types/job.go index 3036e5981f9..e0e4f6e13dc 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -103,6 +103,14 @@ type PreheatArgs struct { Timeout time.Duration `json:"timeout" binding:"omitempty"` } +type CreateSyncPeersJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + Timeout time.Duration `json:"timeout" binding:"omitempty"` +} + type CreateGetTaskJobRequest struct { BIO string `json:"bio" binding:"omitempty"` Type string `json:"type" binding:"required"` diff --git a/pkg/container/slice/slice.go b/pkg/container/slice/slice.go new file mode 100644 index 00000000000..432b81f76eb --- /dev/null +++ b/pkg/container/slice/slice.go @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package slice + +func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V { + result := make(map[K]V, len(collection)) + + for i := range collection { + k := iteratee(collection[i]) + result[k] = collection[i] + } + + return result +} + +func Values[K comparable, V any](in ...map[K]V) []V { + size := 0 + for i := range in { + size += len(in[i]) + } + result := make([]V, 0, size) + + for i := range in { + for k := range in[i] { + result = append(result, in[i][k]) + } + } + + return result +} diff --git a/pkg/container/slice/slice_test.go b/pkg/container/slice/slice_test.go new file mode 100644 index 00000000000..8fd3846a446 --- /dev/null +++ b/pkg/container/slice/slice_test.go @@ -0,0 +1,102 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package slice + +import ( + "reflect" + "sort" + "testing" +) + +// TestKeyBy tests the KeyBy function +func TestKeyBy(t *testing.T) { + type Person struct { + ID int + Name string + } + + people := []Person{ + {ID: 1, Name: "Alice"}, + {ID: 2, Name: "Bob"}, + {ID: 3, Name: "Charlie"}, + } + + // Test case 1: Key by ID + keyByID := KeyBy(people, func(p Person) int { + return p.ID + }) + expectedKeyByID := map[int]Person{ + 1: {ID: 1, Name: "Alice"}, + 2: {ID: 2, Name: "Bob"}, + 3: {ID: 3, Name: "Charlie"}, + } + if !reflect.DeepEqual(keyByID, expectedKeyByID) { + t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID) + } + + // Test case 2: Key by Name + keyByName := KeyBy(people, func(p Person) string { + return p.Name + }) + expectedKeyByName := map[string]Person{ + "Alice": {ID: 1, Name: "Alice"}, + "Bob": {ID: 2, Name: "Bob"}, + "Charlie": {ID: 3, Name: "Charlie"}, + } + if !reflect.DeepEqual(keyByName, expectedKeyByName) { + t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName) + } +} + +// TestValues tests the Values function +func TestValues(t *testing.T) { + map1 := map[int]string{ + 1: "one", + 2: "two", + } + map2 := map[int]string{ + 3: "three", + 4: "four", + } + + // Test case 1: Values from one map + values1 := Values(map1) + expectedValues1 := []string{"one", "two"} + + sort.Strings(values1) + sort.Strings(expectedValues1) + if !reflect.DeepEqual(values1, expectedValues1) { + t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1) + } + + // Test case 2: Values from multiple maps + values2 := Values(map1, map2) + expectedValues2 := []string{"one", "two", "three", "four"} + + sort.Strings(values2) + sort.Strings(expectedValues2) + if !reflect.DeepEqual(values2, expectedValues2) { + t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2) + } + + // Test case 3: Values from empty maps + values3 := Values(map[int]string{}, map[int]string{}) + expectedValues3 := []string{} + if !reflect.DeepEqual(values3, expectedValues3) { + t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3) + } +} diff --git a/scheduler/resource/standard/host.go b/scheduler/resource/standard/host.go index 74e9157538b..09f02239165 100644 --- a/scheduler/resource/standard/host.go +++ b/scheduler/resource/standard/host.go @@ -387,7 +387,7 @@ func NewHost( h := &Host{ ID: id, - Type: types.HostType(typ), + Type: typ, IP: ip, Hostname: hostname, Port: port,