Skip to content

Commit

Permalink
modify for code review
Browse files Browse the repository at this point in the history
Signed-off-by: cormick <cormick1080@gmail.com>
  • Loading branch information
CormickKneey committed Nov 27, 2024
1 parent 9b91490 commit a9a2ff8
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 13 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
- name: Golangci lint
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8
with:
version: v1.54
args: --verbose
version: v1.62
args: --verbose --timeout=10m

- name: Markdown lint
uses: docker://avtodev/markdown-lint:v1@sha256:6aeedc2f49138ce7a1cd0adffc1b1c0321b841dc2102408967d9301c031949ee
Expand Down
2 changes: 1 addition & 1 deletion client-rs
Submodule client-rs updated 32 files
+1 −1 .github/workflows/ci.yml
+54 −91 Cargo.lock
+13 −14 Cargo.toml
+1 −1 dragonfly-client-backend/Cargo.toml
+0 −274 dragonfly-client-backend/src/hdfs.rs
+40 −73 dragonfly-client-backend/src/http.rs
+19 −57 dragonfly-client-backend/src/lib.rs
+50 −59 dragonfly-client-backend/src/object_storage.rs
+2 −2 dragonfly-client-config/src/dfdaemon.rs
+0 −71 dragonfly-client-init/src/container_runtime/containerd.rs
+0 −51 dragonfly-client-init/src/container_runtime/crio.rs
+24 −19 dragonfly-client-storage/src/content.rs
+31 −29 dragonfly-client-storage/src/lib.rs
+83 −80 dragonfly-client-storage/src/metadata.rs
+20 −18 dragonfly-client-storage/src/storage_engine/rocksdb.rs
+105 −29 dragonfly-client-util/src/http/mod.rs
+2 −2 dragonfly-client/Cargo.toml
+6 −21 dragonfly-client/src/announcer/mod.rs
+7 −28 dragonfly-client/src/bin/dfdaemon/main.rs
+16 −42 dragonfly-client/src/bin/dfget/main.rs
+26 −32 dragonfly-client/src/grpc/dfdaemon_download.rs
+31 −33 dragonfly-client/src/grpc/dfdaemon_upload.rs
+1 −1 dragonfly-client/src/grpc/mod.rs
+11 −11 dragonfly-client/src/grpc/scheduler.rs
+107 −207 dragonfly-client/src/metrics/mod.rs
+31 −13 dragonfly-client/src/proxy/header.rs
+164 −185 dragonfly-client/src/proxy/mod.rs
+21 −14 dragonfly-client/src/resource/persistent_cache_task.rs
+31 −46 dragonfly-client/src/resource/piece.rs
+13 −10 dragonfly-client/src/resource/piece_collector.rs
+75 −68 dragonfly-client/src/resource/task.rs
+2 −3 dragonfly-client/src/tracing/mod.rs
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.6.1
github.com/samber/lo v1.47.0
github.com/schollz/progressbar/v3 v3.17.1
github.com/shirou/gopsutil/v3 v3.24.5
github.com/soheilhy/cmux v0.1.5
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1382,8 +1382,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 h1:AJNDS0kP60X8wwWFvbLPwDuojxubj9pbfK7pjHw0vKg=
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
Expand Down
38 changes: 31 additions & 7 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/pkg/container/slice"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"github.com/samber/lo"
)

// SyncPeers is an interface for sync peers.
Expand Down Expand Up @@ -206,7 +206,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
if err != nil {
logger.Errorf("create sync peers in queue %v failed", queue, err)
logger.Errorf("create sync peers in queue %v failed: %v", queue, err)
return nil, err
}

Expand All @@ -229,11 +229,35 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) {
// Fetch existing peers from the database
var existingPeers []models.Peer
if err := s.db.Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).Find(&existingPeers).Error; err != nil {
log.Error(err)
var count int64

if err := s.db.Model(&models.Peer{}).
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
Count(&count).
Error; err != nil {
log.Error("failed to count existing peers: ", err)
return
}

log.Infof("total peers count: %d", count)

pageSize := s.config.Job.SyncPeers.BatchSize
totalPages := (count + int64(pageSize-1)) / int64(pageSize)

for page := 1; page <= int(totalPages); page++ {
var batchPeers []models.Peer
if err := s.db.Preload("SchedulerCluster").
Scopes(models.Paginate(page, pageSize)).
Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).
Find(&batchPeers).
Error; err != nil {
log.Error("Failed to fetch peers in batch: ", err)
return
}

existingPeers = append(existingPeers, batchPeers...)
}

// Calculate differences using diffPeers function
toUpsert, toDelete := diffPeers(existingPeers, results)

Expand Down Expand Up @@ -263,12 +287,12 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler,

func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) {
// Convert current peers to a map for quick lookup
currentPeersMap := lo.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string {
currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string {
return item.ID
})

// Convert existing peers to a map for quick lookup
existingPeersMap := lo.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string {
existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string {
return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal)
})

Expand All @@ -283,7 +307,7 @@ func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUp
}

// Peers left in existingPeersMap are to be deleted
toDelete = lo.Values(existingPeersMap)
toDelete = slice.Values(existingPeersMap)

return toUpsert, toDelete
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/container/slice/slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package slice

func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V {
result := make(map[K]V, len(collection))

for i := range collection {
k := iteratee(collection[i])
result[k] = collection[i]
}

return result
}

func Values[K comparable, V any](in ...map[K]V) []V {
size := 0
for i := range in {
size += len(in[i])
}
result := make([]V, 0, size)

for i := range in {
for k := range in[i] {
result = append(result, in[i][k])
}
}

return result
}
102 changes: 102 additions & 0 deletions pkg/container/slice/slice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package slice

import (
"reflect"
"sort"
"testing"
)

// TestKeyBy tests the KeyBy function
func TestKeyBy(t *testing.T) {
type Person struct {
ID int
Name string
}

people := []Person{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 3, Name: "Charlie"},
}

// Test case 1: Key by ID
keyByID := KeyBy(people, func(p Person) int {
return p.ID
})
expectedKeyByID := map[int]Person{
1: {ID: 1, Name: "Alice"},
2: {ID: 2, Name: "Bob"},
3: {ID: 3, Name: "Charlie"},
}
if !reflect.DeepEqual(keyByID, expectedKeyByID) {
t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID)
}

// Test case 2: Key by Name
keyByName := KeyBy(people, func(p Person) string {
return p.Name
})
expectedKeyByName := map[string]Person{
"Alice": {ID: 1, Name: "Alice"},
"Bob": {ID: 2, Name: "Bob"},
"Charlie": {ID: 3, Name: "Charlie"},
}
if !reflect.DeepEqual(keyByName, expectedKeyByName) {
t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName)
}
}

// TestValues tests the Values function
func TestValues(t *testing.T) {
map1 := map[int]string{
1: "one",
2: "two",
}
map2 := map[int]string{
3: "three",
4: "four",
}

// Test case 1: Values from one map
values1 := Values(map1)
expectedValues1 := []string{"one", "two"}

sort.Strings(values1)
sort.Strings(expectedValues1)
if !reflect.DeepEqual(values1, expectedValues1) {
t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1)
}

// Test case 2: Values from multiple maps
values2 := Values(map1, map2)
expectedValues2 := []string{"one", "two", "three", "four"}

sort.Strings(values2)
sort.Strings(expectedValues2)
if !reflect.DeepEqual(values2, expectedValues2) {
t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2)
}

// Test case 3: Values from empty maps
values3 := Values(map[int]string{}, map[int]string{})
expectedValues3 := []string{}
if !reflect.DeepEqual(values3, expectedValues3) {
t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3)
}
}

0 comments on commit a9a2ff8

Please sign in to comment.