Skip to content

Commit

Permalink
feat(sample): swip21 changes (#4848)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 8, 2024
1 parent e4e4520 commit d52e3fc
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 53 deletions.
15 changes: 7 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ func NewBee(
}
}(b)

if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity

stateStore, stateStoreMetrics, err := InitStateStore(logger, o.DataDir, o.StatestoreCacheCapacity)
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,14 +359,6 @@ func NewBee(
var batchStore postage.Storer = new(postage.NoOpBatchStore)
var evictFn func([]byte) error

var reserveCapacity int

if o.ReserveCapacityDoubling >= 0 && o.ReserveCapacityDoubling <= 1 {
reserveCapacity = 1 << (22 + o.ReserveCapacityDoubling)
} else {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}

if chainEnabled {
batchStore, err = batchstore.New(
stateStore,
Expand Down Expand Up @@ -735,6 +733,7 @@ func NewBee(
lo.ReserveWakeUpDuration = reserveWakeUpDuration
lo.ReserveMinEvictCount = reserveMinEvictCount
lo.RadiusSetter = kad
lo.ReserveCapacityDoubling = o.ReserveCapacityDoubling
}

localStore, err := storer.New(ctx, path, lo)
Expand Down
10 changes: 6 additions & 4 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type peerStatus interface {

type reserve interface {
storer.RadiusChecker
ReserveSize() int
ReserveCapacityDoubling() int
}

type service struct {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-1) {
if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
Expand All @@ -221,10 +221,12 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + uint8(s.reserve.ReserveCapacityDoubling())

selfHealth := true
if nHoodRadius == networkRadius && s.reserve.StorageRadius() != networkRadius {
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius)
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius)
}

s.isSelfHealthy.Store(selfHealth)
Expand Down
47 changes: 40 additions & 7 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
topMock "github.com/ethersphere/bee/v2/pkg/topology/mock"
"github.com/ethersphere/bee/v2/pkg/util/testutil"
)

type peer struct {
Expand All @@ -37,11 +38,11 @@ func TestSalud(t *testing.T) {
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// healthy since radius >= most common radius - 1
// healthy since radius >= most common radius - 2
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},

// radius too low
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 6, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},

// dur too long
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
Expand Down Expand Up @@ -116,6 +117,7 @@ func TestSelfUnhealthyRadius(t *testing.T) {
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand All @@ -127,10 +129,44 @@ func TestSelfUnhealthyRadius(t *testing.T) {
if service.IsHealthy() {
t.Fatalf("self should NOT be healthy")
}
}

if err := service.Close(); err != nil {
func TestSelfHealthyCapacityDoubling(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
}

statusM := &statusMock{make(map[string]peer)}
addrs := make([]swarm.Address, 0, len(peers))
for _, p := range peers {
addrs = append(addrs, p.addr)
statusM.peers[p.addr.ByteString()] = p
}

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
})
if err != nil {
t.Fatal(err)
}

if !service.IsHealthy() {
t.Fatalf("self should be healthy")
}
}

func TestSubToRadius(t *testing.T) {
Expand Down Expand Up @@ -182,6 +218,7 @@ func TestUnsub(t *testing.T) {
topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
unsub()
Expand All @@ -191,10 +228,6 @@ func TestUnsub(t *testing.T) {
t.Fatal("should not have received an address")
case <-time.After(time.Second):
}

if err := service.Close(); err != nil {
t.Fatal(err)
}
}

type statusMock struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (db *DB) cacheWorker(ctx context.Context) {
}

evict := size - capc
if evict < db.opts.cacheMinEvictCount { // evict at least a min count
evict = db.opts.cacheMinEvictCount
if evict < db.reserveOptions.cacheMinEvictCount { // evict at least a min count
evict = db.reserveOptions.cacheMinEvictCount
}

dur := captureDuration(time.Now())
Expand Down
1 change: 0 additions & 1 deletion pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo
PrefixAtStart: true,
}, func(res storage.Result) (bool, error) {
item := res.Entry.(*ChunkBinItem)

stop, err := cb(item)
if stop || err != nil {
return true, err
Expand Down
15 changes: 13 additions & 2 deletions pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func WithReserveSize(s int) Option {
})
}

func WithCapacityDoubling(s int) Option {
return optionFunc(func(p *ReserveStore) {
p.capacityDoubling = s
})
}

func WithPutHook(f func(swarm.Chunk) error) Option {
return optionFunc(func(p *ReserveStore) {
p.putHook = f
Expand Down Expand Up @@ -106,8 +112,9 @@ type ReserveStore struct {
cursorsErr error
epoch uint64

radius uint8
reservesize int
radius uint8
reservesize int
capacityDoubling int

subResponses []chunksResponse
putHook func(swarm.Chunk) error
Expand Down Expand Up @@ -171,6 +178,10 @@ func (s *ReserveStore) ReserveSize() int {
return s.reservesize
}

func (s *ReserveStore) ReserveCapacityDoubling() int {
return s.capacityDoubling
}

func (s *ReserveStore) ReserveLastBinIDs() (curs []uint64, epoch uint64, err error) {
return s.cursors, s.epoch, s.cursorsErr
}
Expand Down
106 changes: 100 additions & 6 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"math/bits"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (db *DB) startReserveWorkers(
go db.reserveWorker(ctx)

select {
case <-time.After(db.opts.reserveWarmupDuration):
case <-time.After(db.reserveOptions.warmupDuration):
case <-db.quit:
return
}
Expand Down Expand Up @@ -84,7 +85,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) {
radius := db.StorageRadius()

evictBatches := make(map[string]bool)

err := db.reserve.IterateChunksItems(0, func(ci *reserve.ChunkBinItem) (bool, error) {
if ci.Bin >= radius {
count++
Expand Down Expand Up @@ -121,7 +121,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
overCapTrigger, overCapUnsub := db.events.Subscribe(reserveOverCapacity)
defer overCapUnsub()

thresholdTicker := time.NewTicker(db.opts.reserveWakeupDuration)
thresholdTicker := time.NewTicker(db.reserveOptions.wakeupDuration)
defer thresholdTicker.Stop()

_, _ = db.countWithinRadius(ctx)
Expand Down Expand Up @@ -159,7 +159,7 @@ func (db *DB) reserveWorker(ctx context.Context) {
continue
}

if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.opts.minimumRadius {
if count < threshold(db.reserve.Capacity()) && db.syncer.SyncRate() == 0 && radius > db.reserveOptions.minimumRadius {
radius--
if err := db.reserve.SetRadius(radius); err != nil {
db.logger.Error(err, "reserve set radius")
Expand Down Expand Up @@ -362,8 +362,8 @@ func (db *DB) unreserve(ctx context.Context) (err error) {
}

evict := target - totalEvicted
if evict < int(db.opts.reserveMinEvictCount) { // evict at least a min count
evict = int(db.opts.reserveMinEvictCount)
if evict < int(db.reserveOptions.minEvictCount) { // evict at least a min count
evict = int(db.reserveOptions.minEvictCount)
}

binEvicted, err := db.evictBatch(ctx, b, evict, radius)
Expand Down Expand Up @@ -412,6 +412,10 @@ func (db *DB) StorageRadius() uint8 {
return db.reserve.Radius()
}

func (db *DB) ReserveCapacityDoubling() int {
return db.reserveOptions.capacityDoubling
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
Expand Down Expand Up @@ -493,6 +497,96 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
}, errC
}

type NeighborhoodStat struct {
Address swarm.Address
ChunkCount int
}

func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) {

radius := db.StorageRadius()

networkRadius := radius + uint8(db.reserveOptions.capacityDoubling)

prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling)
neighs := make([]*NeighborhoodStat, len(prefixes))
for i, n := range prefixes {
neighs[i] = &NeighborhoodStat{Address: n}
}

err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) {
for _, n := range neighs {
if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius {
n.ChunkCount++
break
}
}
return false, nil
})
if err != nil {
return nil, err
}

return neighs, err
}

func neighborhoodPrefixes(base swarm.Address, radius int, suffixLength int) []swarm.Address {
bitCombinationsCount := int(math.Pow(2, float64(suffixLength)))
bitSuffixes := make([]uint8, bitCombinationsCount)

for i := 0; i < bitCombinationsCount; i++ {
bitSuffixes[i] = uint8(i)
}

binPrefixes := make([]swarm.Address, bitCombinationsCount)

// copy base address
for i := range binPrefixes {
binPrefixes[i] = base.Clone()
}

for j := range binPrefixes {
pseudoAddrBytes := binPrefixes[j].Bytes()

// set pseudo suffix
bitSuffixPos := suffixLength - 1
for l := radius + 0; l < radius+suffixLength+1; l++ {
index, pos := l/8, l%8

if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}

bitSuffixPos--
}

// clear rest of the bits
for l := radius + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
}

return binPrefixes
}

// Clears the bit at pos in n.
func clearBit(n, pos uint8) uint8 {
mask := ^(uint8(1) << pos)
return n & mask
}

// Sets the bit at pos in the integer n.
func setBit(n, pos uint8) uint8 {
return n | 1<<pos
}

func hasBit(n, pos uint8) bool {
return n&(1<<pos) > 0
}

// expiredBatchItem is a storage.Item implementation for expired batches.
type expiredBatchItem struct {
BatchID []byte
Expand Down
Loading

0 comments on commit d52e3fc

Please sign in to comment.