From a9a2ff8f0ea303ae17eaf6c29141d859fa168150 Mon Sep 17 00:00:00 2001 From: cormick Date: Tue, 26 Nov 2024 22:18:10 +0800 Subject: [PATCH] modify for code review Signed-off-by: cormick --- .github/workflows/lint.yml | 4 +- client-rs | 2 +- go.mod | 1 - go.sum | 2 - manager/job/sync_peers.go | 38 +++++++++-- pkg/container/slice/slice.go | 44 +++++++++++++ pkg/container/slice/slice_test.go | 102 ++++++++++++++++++++++++++++++ 7 files changed, 180 insertions(+), 13 deletions(-) create mode 100644 pkg/container/slice/slice.go create mode 100644 pkg/container/slice/slice_test.go diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c5c4546f06c..9e45c51e079 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -23,8 +23,8 @@ jobs: - name: Golangci lint uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 with: - version: v1.54 - args: --verbose + version: v1.62 + args: --verbose --timeout=10m - name: Markdown lint uses: docker://avtodev/markdown-lint:v1@sha256:6aeedc2f49138ce7a1cd0adffc1b1c0321b841dc2102408967d9301c031949ee diff --git a/client-rs b/client-rs index 10bab190a28..ee21989120b 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit 10bab190a28552cd05760b7bd75626990834877a +Subproject commit ee21989120b16db4e8456e7d163f9ba06aad98c9 diff --git a/go.mod b/go.mod index 4a6b842d63d..25ed66ef2ca 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.6.1 - github.com/samber/lo v1.47.0 github.com/schollz/progressbar/v3 v3.17.1 github.com/shirou/gopsutil/v3 v3.24.5 github.com/soheilhy/cmux v0.1.5 diff --git a/go.sum b/go.sum index 7b20edf87e9..abfdcfbebc2 100644 --- a/go.sum +++ b/go.sum @@ -1382,8 +1382,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= -github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 h1:AJNDS0kP60X8wwWFvbLPwDuojxubj9pbfK7pjHw0vKg= github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index c1047f014d5..a1a52649056 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -35,10 +35,10 @@ import ( internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/pkg/container/slice" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" - "github.com/samber/lo" ) // SyncPeers is an interface for sync peers. @@ -206,7 +206,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu logger.Infof("create sync peers in queue %v, task: %#v", queue, task) asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task) if err != nil { - logger.Errorf("create sync peers in queue %v failed", queue, err) + logger.Errorf("create sync peers in queue %v failed: %v", queue, err) return nil, err } @@ -229,11 +229,35 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) { // Fetch existing peers from the database var existingPeers []models.Peer - if err := s.db.Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).Find(&existingPeers).Error; err != nil { - log.Error(err) + var count int64 + + if err := s.db.Model(&models.Peer{}). + Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). + Count(&count). + Error; err != nil { + log.Error("failed to count existing peers: ", err) return } + log.Infof("total peers count: %d", count) + + pageSize := s.config.Job.SyncPeers.BatchSize + totalPages := (count + int64(pageSize-1)) / int64(pageSize) + + for page := 1; page <= int(totalPages); page++ { + var batchPeers []models.Peer + if err := s.db.Preload("SchedulerCluster"). + Scopes(models.Paginate(page, pageSize)). + Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). + Find(&batchPeers). + Error; err != nil { + log.Error("Failed to fetch peers in batch: ", err) + return + } + + existingPeers = append(existingPeers, batchPeers...) + } + // Calculate differences using diffPeers function toUpsert, toDelete := diffPeers(existingPeers, results) @@ -263,12 +287,12 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) { // Convert current peers to a map for quick lookup - currentPeersMap := lo.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string { + currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string { return item.ID }) // Convert existing peers to a map for quick lookup - existingPeersMap := lo.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string { + existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string { return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal) }) @@ -283,7 +307,7 @@ func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUp } // Peers left in existingPeersMap are to be deleted - toDelete = lo.Values(existingPeersMap) + toDelete = slice.Values(existingPeersMap) return toUpsert, toDelete } diff --git a/pkg/container/slice/slice.go b/pkg/container/slice/slice.go new file mode 100644 index 00000000000..432b81f76eb --- /dev/null +++ b/pkg/container/slice/slice.go @@ -0,0 +1,44 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package slice + +func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V { + result := make(map[K]V, len(collection)) + + for i := range collection { + k := iteratee(collection[i]) + result[k] = collection[i] + } + + return result +} + +func Values[K comparable, V any](in ...map[K]V) []V { + size := 0 + for i := range in { + size += len(in[i]) + } + result := make([]V, 0, size) + + for i := range in { + for k := range in[i] { + result = append(result, in[i][k]) + } + } + + return result +} diff --git a/pkg/container/slice/slice_test.go b/pkg/container/slice/slice_test.go new file mode 100644 index 00000000000..8fd3846a446 --- /dev/null +++ b/pkg/container/slice/slice_test.go @@ -0,0 +1,102 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package slice + +import ( + "reflect" + "sort" + "testing" +) + +// TestKeyBy tests the KeyBy function +func TestKeyBy(t *testing.T) { + type Person struct { + ID int + Name string + } + + people := []Person{ + {ID: 1, Name: "Alice"}, + {ID: 2, Name: "Bob"}, + {ID: 3, Name: "Charlie"}, + } + + // Test case 1: Key by ID + keyByID := KeyBy(people, func(p Person) int { + return p.ID + }) + expectedKeyByID := map[int]Person{ + 1: {ID: 1, Name: "Alice"}, + 2: {ID: 2, Name: "Bob"}, + 3: {ID: 3, Name: "Charlie"}, + } + if !reflect.DeepEqual(keyByID, expectedKeyByID) { + t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID) + } + + // Test case 2: Key by Name + keyByName := KeyBy(people, func(p Person) string { + return p.Name + }) + expectedKeyByName := map[string]Person{ + "Alice": {ID: 1, Name: "Alice"}, + "Bob": {ID: 2, Name: "Bob"}, + "Charlie": {ID: 3, Name: "Charlie"}, + } + if !reflect.DeepEqual(keyByName, expectedKeyByName) { + t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName) + } +} + +// TestValues tests the Values function +func TestValues(t *testing.T) { + map1 := map[int]string{ + 1: "one", + 2: "two", + } + map2 := map[int]string{ + 3: "three", + 4: "four", + } + + // Test case 1: Values from one map + values1 := Values(map1) + expectedValues1 := []string{"one", "two"} + + sort.Strings(values1) + sort.Strings(expectedValues1) + if !reflect.DeepEqual(values1, expectedValues1) { + t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1) + } + + // Test case 2: Values from multiple maps + values2 := Values(map1, map2) + expectedValues2 := []string{"one", "two", "three", "four"} + + sort.Strings(values2) + sort.Strings(expectedValues2) + if !reflect.DeepEqual(values2, expectedValues2) { + t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2) + } + + // Test case 3: Values from empty maps + values3 := Values(map[int]string{}, map[int]string{}) + expectedValues3 := []string{} + if !reflect.DeepEqual(values3, expectedValues3) { + t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3) + } +}