Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: redundancy strategies openapi #4510

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
@@ -168,6 +168,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
responses:
"200":
description: Retrieved content specified by reference
@@ -316,6 +321,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
responses:
"200":
description: Ok
@@ -341,6 +351,11 @@ paths:
tags:
- BZZ
parameters:
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
- in: path
name: reference
schema:
11 changes: 11 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
@@ -945,6 +945,17 @@ components:
Add redundancy to the data being uploaded so that downloaders can download it with better UX.
0 value is default and does not add any redundancy to the file.
SwarmRedundancyStrategyParameter:
in: header
name: swarm-redundancy-parameter
schema:
type: integer
enum: [0, 1, 2, 3, 4]
required: false
description: >
Force different retrieve strategies on redundant data.
The mumbers stand for NONE, DATA, PROX and RACE, respectively.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be good to mention which is the default if the header is not specified?

Copy link
Collaborator

@ldeffenb ldeffenb Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 values, but only 4 words? Respectively doesn't seem to align?

Also "mumbers" should be "numbers".

ContentTypePreserved:
in: header
name: Content-Type
4 changes: 4 additions & 0 deletions pkg/file/pipeline/builder/builder.go
Original file line number Diff line number Diff line change
@@ -36,9 +36,11 @@ func NewPipelineBuilder(ctx context.Context, s storage.Putter, encrypt bool, rLe
func newPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
pipeline := newShortPipelineFunc(ctx, s)
tw := hashtrie.NewHashTrieWriter(
ctx,
swarm.HashSize,
redundancy.New(rLevel, false, pipeline),
pipeline,
s,
)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
@@ -61,9 +63,11 @@ func newShortPipelineFunc(ctx context.Context, s storage.Putter) func() pipeline
// with the unencrypted span is preserved.
func newEncryptionPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
tw := hashtrie.NewHashTrieWriter(
ctx,
swarm.HashSize+encryption.KeyLength,
redundancy.New(rLevel, true, newShortPipelineFunc(ctx, s)),
newShortEncryptionPipelineFunc(ctx, s),
s,
)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
38 changes: 33 additions & 5 deletions pkg/file/pipeline/hashtrie/hashtrie.go
Original file line number Diff line number Diff line change
@@ -5,11 +5,15 @@
package hashtrie

import (
"context"
"encoding/binary"
"errors"
"fmt"

"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/replicas"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

@@ -21,20 +25,29 @@ var (
const maxLevel = 8

type hashTrieWriter struct {
ctx context.Context // context for put function of dispersed replica chunks
refSize int
cursors []int // level cursors, key is level. level 0 is data level holds how many chunks were processed. Intermediate higher levels will always have LOWER cursor values.
buffer []byte // keeps intermediate level data
full bool // indicates whether the trie is full. currently we support (128^7)*4096 = 2305843009213693952 bytes
pipelineFn pipeline.PipelineFunc
rParams redundancy.IParams
parityChunkFn redundancy.ParityChunkCallback
chunkCounters []uint8 // counts the chunk references in intermediate chunks. key is the chunk level.
effectiveChunkCounters []uint8 // counts the effective chunk references in intermediate chunks. key is the chunk level.
maxChildrenChunks uint8 // maximum number of chunk references in intermediate chunks.
chunkCounters []uint8 // counts the chunk references in intermediate chunks. key is the chunk level.
effectiveChunkCounters []uint8 // counts the effective chunk references in intermediate chunks. key is the chunk level.
maxChildrenChunks uint8 // maximum number of chunk references in intermediate chunks.
replicaPutter storage.Putter // putter to save dispersed replicas of the root chunk
}

func NewHashTrieWriter(refLen int, rParams redundancy.IParams, pipelineFn pipeline.PipelineFunc) pipeline.ChainWriter {
func NewHashTrieWriter(
ctx context.Context,
refLen int,
rParams redundancy.IParams,
pipelineFn pipeline.PipelineFunc,
replicaPutter storage.Putter,
) pipeline.ChainWriter {
h := &hashTrieWriter{
ctx: ctx,
refSize: refLen,
cursors: make([]int, 9),
buffer: make([]byte, swarm.ChunkWithSpanSize*9*2), // double size as temp workaround for weak calculation of needed buffer space
@@ -43,6 +56,7 @@ func NewHashTrieWriter(refLen int, rParams redundancy.IParams, pipelineFn pipeli
chunkCounters: make([]uint8, 9),
effectiveChunkCounters: make([]uint8, 9),
maxChildrenChunks: uint8(rParams.MaxShards() + rParams.Parities(rParams.MaxShards())),
replicaPutter: replicaPutter,
}
h.parityChunkFn = func(level int, span, address []byte) error {
return h.writeToIntermediateLevel(level, true, span, address, []byte{})
@@ -245,5 +259,19 @@ func (h *hashTrieWriter) Sum() ([]byte, error) {

// return the hash in the highest level, that's all we need
data := h.buffer[0:h.cursors[maxLevel]]
return data[swarm.SpanSize:], nil
rootHash := data[swarm.SpanSize:]

// save disperse replicas of the root chunk
if h.rParams.Level() != redundancy.NONE {
rootData, err := h.rParams.GetRootData()
if err != nil {
return nil, err
}
putter := replicas.NewPutter(h.replicaPutter)
err = putter.Put(h.ctx, swarm.NewChunk(swarm.NewAddress(rootHash), rootData))
if err != nil {
return nil, fmt.Errorf("hashtrie: cannot put dispersed replica %s", err.Error())
}
}
return rootHash, nil
}
29 changes: 23 additions & 6 deletions pkg/file/pipeline/hashtrie/hashtrie_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/file/pipeline/mock"
"github.com/ethersphere/bee/pkg/file/pipeline/store"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/replicas"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/pkg/swarm"
@@ -52,6 +53,7 @@ func newErasureHashTrieWriter(
rLevel redundancy.Level,
encryptChunks bool,
intermediateChunkPipeline, parityChunkPipeline pipeline.ChainWriter,
replicaPutter storage.Putter,
) (redundancy.IParams, pipeline.ChainWriter) {
pf := func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, intermediateChunkPipeline)
@@ -75,7 +77,7 @@ func newErasureHashTrieWriter(
}

r := redundancy.New(rLevel, encryptChunks, ppf)
ht := hashtrie.NewHashTrieWriter(hashSize, r, pf)
ht := hashtrie.NewHashTrieWriter(ctx, hashSize, r, pf, replicaPutter)
return r, ht
}

@@ -143,7 +145,7 @@ func TestLevels(t *testing.T) {
return bmt.NewBmtWriter(lsw)
}

ht := hashtrie.NewHashTrieWriter(hashSize, redundancy.New(0, false, pf), pf)
ht := hashtrie.NewHashTrieWriter(ctx, hashSize, redundancy.New(0, false, pf), pf, s)

for i := 0; i < tc.writes; i++ {
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
@@ -196,7 +198,7 @@ func TestLevels_TrieFull(t *testing.T) {
Params: *r,
}

ht = hashtrie.NewHashTrieWriter(hashSize, rMock, pf)
ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s)
)

// to create a level wrap we need to do branching^(level-1) writes
@@ -237,7 +239,7 @@ func TestRegression(t *testing.T) {
lsw := store.NewStoreWriter(ctx, s, nil)
return bmt.NewBmtWriter(lsw)
}
ht = hashtrie.NewHashTrieWriter(hashSize, redundancy.New(0, false, pf), pf)
ht = hashtrie.NewHashTrieWriter(ctx, hashSize, redundancy.New(0, false, pf), pf, s)
)
binary.LittleEndian.PutUint64(span, 4096)

@@ -265,6 +267,16 @@ func TestRegression(t *testing.T) {
}
}

type replicaPutter struct {
storage.Putter
replicaCount uint8
}

func (r *replicaPutter) Put(ctx context.Context, chunk swarm.Chunk) error {
r.replicaCount++
return r.Putter.Put(ctx, chunk)
}

// TestRedundancy using erasure coding library and checks carrierChunk function and modified span in intermediate chunk
func TestRedundancy(t *testing.T) {
t.Parallel()
@@ -303,12 +315,14 @@ func TestRedundancy(t *testing.T) {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
subCtx := replicas.SetLevel(ctx, tc.level)

s := inmemchunkstore.New()
intermediateChunkCounter := mock.NewChainWriter()
parityChunkCounter := mock.NewChainWriter()
replicaChunkCounter := &replicaPutter{Putter: s}

r, ht := newErasureHashTrieWriter(ctx, s, tc.level, tc.encryption, intermediateChunkCounter, parityChunkCounter)
r, ht := newErasureHashTrieWriter(subCtx, s, tc.level, tc.encryption, intermediateChunkCounter, parityChunkCounter, replicaChunkCounter)

// write data to the hashTrie
var key []byte
@@ -336,7 +350,7 @@ func TestRedundancy(t *testing.T) {
t.Errorf("effective chunks should be %d. Got: %d", tc.writes, intermediateChunkCounter.ChainWriteCalls())
}

rootch, err := s.Get(ctx, swarm.NewAddress(ref[:swarm.HashSize]))
rootch, err := s.Get(subCtx, swarm.NewAddress(ref[:swarm.HashSize]))
if err != nil {
t.Fatal(err)
}
@@ -362,6 +376,9 @@ func TestRedundancy(t *testing.T) {
if expectedParities != parity {
t.Fatalf("want parity %d got %d", expectedParities, parity)
}
if tc.level.GetReplicaCount()+1 != int(replicaChunkCounter.replicaCount) { // +1 is the original chunk
t.Fatalf("unexpected number of replicas: want %d. Got: %d", tc.level.GetReplicaCount(), int(replicaChunkCounter.replicaCount))
}
})
}
}
414 changes: 150 additions & 264 deletions pkg/file/redundancy/getter/getter.go

Large diffs are not rendered by default.

416 changes: 270 additions & 146 deletions pkg/file/redundancy/getter/getter_test.go

Large diffs are not rendered by default.

90 changes: 90 additions & 0 deletions pkg/file/redundancy/getter/strategies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package getter

import (
"context"
"fmt"
"time"
)

var (
StrategyTimeout = 500 * time.Millisecond // timeout for each strategy
)

type Strategy = int

const (
NONE Strategy = iota // no prefetching and no decoding
DATA // just retrieve data shards no decoding
PROX // proximity driven selective fetching
RACE // aggressive fetching racing all chunks
strategyCnt
)

func (g *getter) prefetch(ctx context.Context, strategy int, strict bool) {
if strict && strategy == NONE {
return
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

run := func(s Strategy) error {
if s == PROX { // NOT IMPLEMENTED
return fmt.Errorf("strategy %d not implemented", s)
}

var stop <-chan time.Time
if s < RACE {
timer := time.NewTimer(StrategyTimeout)
defer timer.Stop()
stop = timer.C
}

prefetch(ctx, g, s)

select {
// successfully retrieved shardCnt number of chunks
case <-g.ready:
case <-stop:
return fmt.Errorf("prefetching with strategy %d timed out", s)
case <-ctx.Done():
return nil
}
// call the erasure decoder
// if decoding is successful terminate the prefetch loop
return g.recover(ctx) // context to cancel when shardCnt chunks are retrieved
}
var err error
for s := strategy; s == strategy || (err != nil && !strict && s < strategyCnt); s++ {
err = run(s)
}
}

// prefetch launches the retrieval of chunks based on the strategy
func prefetch(ctx context.Context, g *getter, s Strategy) {
var m []int
switch s {
case NONE:
return
case DATA:
// only retrieve data shards
m = g.missing()
case PROX:
// proximity driven selective fetching
// NOT IMPLEMENTED
case RACE:
// retrieve all chunks at once enabling race among chunks
m = g.missing()
for i := g.shardCnt; i < len(g.addrs); i++ {
m = append(m, i)
}
}
for _, i := range m {
i := i
g.wg.Add(1)
go func() {
g.fetch(ctx, i)
g.wg.Done()
}()
}
}
31 changes: 31 additions & 0 deletions pkg/file/redundancy/level.go
Original file line number Diff line number Diff line change
@@ -82,6 +82,21 @@ func (l Level) GetMaxEncShards() int {
return (swarm.Branches - p) / 2
}

// GetReplicaCount returns back the dispersed replica number
func (l Level) GetReplicaCount() int {
return replicaCounts[int(l)]
}

// GetReplicaIndexBase returns back the dispersed replica index base of the level
func (l Level) GetReplicaIndexBase() int {
return replicaIndexBases[int(l)-1]
}

// Decrement returns a weaker redundancy level compare to the current one
func (l Level) Decrement() Level {
return Level(uint8(l) - 1)
}

// TABLE INITS

var mediumEt = newErasureTable(
@@ -135,3 +150,19 @@ var encParanoidEt = newErasureTable(
55, 51, 48, 44, 39, 35, 30, 24,
},
)

// DISPERSED REPLICAS INIT

// GetReplicaCounts returns back the ascending dispersed replica counts for all redundancy levels
func GetReplicaCounts() [5]int {
c := replicaCounts
return c
}

// the actual number of replicas needed to keep the error rate below 1/10^6
// for the five levels of redundancy are 0, 2, 4, 5, 19
// we use an approximation as the successive powers of 2
var replicaCounts = [5]int{0, 2, 4, 8, 16}

// index bases needed to keep track how many addresses were mined for a level.
var replicaIndexBases = [5]int{0, 2, 6, 14}
15 changes: 15 additions & 0 deletions pkg/file/redundancy/redundancy.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ type IParams interface {
ChunkWrite(int, []byte, ParityChunkCallback) error
ElevateCarrierChunk(int, ParityChunkCallback) error
Encode(int, ParityChunkCallback) error
GetRootData() ([]byte, error)
}

type ErasureEncoder interface {
@@ -202,3 +203,17 @@ func (p *Params) ElevateCarrierChunk(chunkLevel int, callback ParityChunkCallbac
// not necessary to update current level since we will not work with it anymore
return p.chunkWrite(chunkLevel+1, p.buffer[chunkLevel][p.cursor[chunkLevel]-1], callback)
}

// GetRootData returns the topmost chunk in the tree.
// throws and error if the encoding has not been finished in the BMT
// OR redundancy is not used in the BMT
func (p *Params) GetRootData() ([]byte, error) {
if p.level == NONE {
return nil, fmt.Errorf("redundancy: no redundancy level is used for the file in order to cache root data")
}
lastBuffer := p.buffer[maxLevel-1]
if len(lastBuffer[0]) != swarm.ChunkWithSpanSize {
return nil, fmt.Errorf("redundancy: hashtrie sum has not finished in order to cache root data")
}
return lastBuffer[0], nil
}
1 change: 0 additions & 1 deletion pkg/replicas/export_test.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ package replicas
import "github.com/ethersphere/bee/pkg/storage"

var (
Counts = counts
Signer = signer
)

8 changes: 4 additions & 4 deletions pkg/replicas/getter.go
Original file line number Diff line number Diff line change
@@ -88,12 +88,12 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e
}
}()
// counters
n := 0 // counts the replica addresses tried
target := 2 // the number of replicas attempted to download in this batch
total := counts[g.level] // total number of replicas allowed (and makes sense) to retrieve
n := 0 // counts the replica addresses tried
target := 2 // the number of replicas attempted to download in this batch
total := g.level.GetReplicaCount()

//
rr := newReplicator(addr, uint8(g.level))
rr := newReplicator(addr, g.level)
next := rr.c
var wait <-chan time.Time // nil channel to disable case
// addresses used are doubling each period of search expansion
6 changes: 3 additions & 3 deletions pkg/replicas/getter_test.go
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@ func TestGetter(t *testing.T) {

var tests []test
for _, f := range failures {
for level, c := range replicas.Counts {
for level, c := range redundancy.GetReplicaCounts() {
for j := 0; j <= c*2+1; j++ {
tests = append(tests, test{
name: fmt.Sprintf("%s level %d count %d found %d", f.name, level, c, j),
@@ -257,10 +257,10 @@ func TestGetter(t *testing.T) {
})

t.Run("latency", func(t *testing.T) {

counts := redundancy.GetReplicaCounts()
for i, latency := range latencies {
multiplier := latency / replicas.RetryInterval
if multiplier > 0 && i < replicas.Counts[multiplier-1] {
if multiplier > 0 && i < counts[multiplier-1] {
t.Fatalf("incorrect latency for retrieving replica %d: %v", i, err)
}
}
6 changes: 3 additions & 3 deletions pkg/replicas/putter.go
Original file line number Diff line number Diff line change
@@ -29,14 +29,14 @@ func NewPutter(p storage.Putter) storage.Putter {

// Put makes the getter satisfy the storage.Getter interface
func (p *putter) Put(ctx context.Context, ch swarm.Chunk) (err error) {
rlevel := getLevelFromContext(ctx)
rlevel := GetLevelFromContext(ctx)
errs := []error{p.putter.Put(ctx, ch)}
if rlevel == 0 {
return errs[0]
}

rr := newReplicator(ch.Address(), uint8(rlevel))
errc := make(chan error, counts[rlevel])
rr := newReplicator(ch.Address(), rlevel)
errc := make(chan error, rlevel.GetReplicaCount())
wg := sync.WaitGroup{}
for r := range rr.c {
r := r
2 changes: 1 addition & 1 deletion pkg/replicas/putter_test.go
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@ func TestPutter(t *testing.T) {
}
})
t.Run("attempts", func(t *testing.T) {
count := replicas.Counts[tc.level]
count := tc.level.GetReplicaCount()
if len(addrs) != count {
t.Fatalf("incorrect number of attempts. want %v, got %v", count, len(addrs))
}
72 changes: 34 additions & 38 deletions pkg/replicas/replicas.go
Original file line number Diff line number Diff line change
@@ -27,25 +27,18 @@ var (
redundancyLevel redundancyLevelType
// RetryInterval is the duration between successive additional requests
RetryInterval = 300 * time.Millisecond
//
// counts of replicas used for levels of increasing security
// the actual number of replicas needed to keep the error rate below 1/10^6
// for the five levels of redundancy are 0, 2, 4, 5, 19
// we use an approximation as the successive powers of 2
counts = [5]int{0, 2, 4, 8, 16}
sums = [5]int{0, 2, 6, 14, 30}
privKey, _ = crypto.DecodeSecp256k1PrivateKey(append([]byte{1}, make([]byte, 31)...))
signer = crypto.NewDefaultSigner(privKey)
owner, _ = hex.DecodeString("dc5b20847f43d67928f49cd4f85d696b5a7617b5")
privKey, _ = crypto.DecodeSecp256k1PrivateKey(append([]byte{1}, make([]byte, 31)...))
signer = crypto.NewDefaultSigner(privKey)
owner, _ = hex.DecodeString("dc5b20847f43d67928f49cd4f85d696b5a7617b5")
)

// SetLevel sets the redundancy level in the context
func SetLevel(ctx context.Context, level redundancy.Level) context.Context {
return context.WithValue(ctx, redundancyLevel, level)
}

// getLevelFromContext is a helper function to extract the redundancy level from the context
func getLevelFromContext(ctx context.Context) redundancy.Level {
// GetLevelFromContext is a helper function to extract the redundancy level from the context
func GetLevelFromContext(ctx context.Context) redundancy.Level {
rlevel := redundancy.PARANOID
if val := ctx.Value(redundancyLevel); val != nil {
rlevel = val.(redundancy.Level)
@@ -55,21 +48,21 @@ func getLevelFromContext(ctx context.Context) redundancy.Level {

// replicator running the find for replicas
type replicator struct {
addr []byte // chunk address
queue [16]*replica // to sort addresses according to di
exist [30]bool // maps the 16 distinct nibbles on all levels
sizes [5]int // number of distinct neighnourhoods redcorded for each depth
c chan *replica
depth uint8
addr []byte // chunk address
queue [16]*replica // to sort addresses according to di
exist [30]bool // maps the 16 distinct nibbles on all levels
sizes [5]int // number of distinct neighnourhoods redcorded for each depth
c chan *replica
rLevel redundancy.Level
}

// newReplicator replicator constructor
func newReplicator(addr swarm.Address, depth uint8) *replicator {
func newReplicator(addr swarm.Address, rLevel redundancy.Level) *replicator {
rr := &replicator{
addr: addr.Bytes(),
sizes: counts,
c: make(chan *replica, 16),
depth: depth,
addr: addr.Bytes(),
sizes: redundancy.GetReplicaCounts(),
c: make(chan *replica, 16),
rLevel: rLevel,
}
go rr.replicas()
return rr
@@ -93,24 +86,18 @@ func (rr *replicator) replicate(i uint8) (sp *replica) {
return &replica{h.Sum(nil), id}
}

// nh returns the lookup key for the neighbourhood of depth d
// to be used as index to the replicators exist array
func (r *replica) nh(d uint8) (nh int) {
return sums[d-1] + int(r.addr[0]>>(8-d))
}

// replicas enumerates replica parameters (SOC ID) pushing it in a channel given as argument
// the order of replicas is so that addresses are always maximally dispersed
// in successive sets of addresses.
// I.e., the binary tree representing the new addresses prefix bits up to depth is balanced
func (rr *replicator) replicas() {
defer close(rr.c)
n := 0
for i := uint8(0); n < counts[rr.depth] && i < 255; i++ {
for i := uint8(0); n < rr.rLevel.GetReplicaCount() && i < 255; i++ {
// create soc replica (ID and address using constant owner)
// the soc is added to neighbourhoods of depths in the closed interval [from...to]
r := rr.replicate(i)
d, m := rr.add(r, rr.depth)
d, m := rr.add(r, rr.rLevel)
if d == 0 {
continue
}
@@ -125,21 +112,30 @@ func (rr *replicator) replicas() {
}

// add inserts the soc replica into a replicator so that addresses are balanced
func (rr *replicator) add(r *replica, d uint8) (depth int, rank int) {
if d == 0 {
func (rr *replicator) add(r *replica, rLevel redundancy.Level) (depth int, rank int) {
if rLevel == redundancy.NONE {
return 0, 0
}
nh := r.nh(d)
nh := nh(rLevel, r.addr)
if rr.exist[nh] {
return 0, 0
}
rr.exist[nh] = true
l, o := rr.add(r, d-1)
l, o := rr.add(r, rLevel.Decrement())
if l == 0 {
o = rr.sizes[d-1]
rr.sizes[d-1]++
o = rr.sizes[uint8(rLevel.Decrement())]
rr.sizes[uint8(rLevel.Decrement())]++
rr.queue[o] = r
l = int(d)
l = rLevel.GetReplicaCount()
}
return l, o
}

// UTILS

// nh returns the lookup key based on the redundancy level
// to be used as index to the replicators exist array
func nh(rLevel redundancy.Level, addr []byte) int {
d := uint8(rLevel)
return rLevel.GetReplicaIndexBase() + int(addr[0]>>(8-d))
}

Unchanged files with check annotations Beta

// Copyright 2020 The Swarm Authors. All rights reserved.

Check failure on line 1 in pkg/file/joiner/joiner.go

GitHub Actions / Lint

: # github.com/ethersphere/bee/pkg/file/joiner
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
return
}
sAddresses, pAddresses := file.ChunkAddresses(data[:pSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Lint

not enough arguments in call to getter.New

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Lint

not enough arguments in call to getter.New

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Init

not enough arguments in call to getter.New

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Test (flaky)

not enough arguments in call to getter.New

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Test (ubuntu-latest)

not enough arguments in call to getter.New

Check failure on line 158 in pkg/file/joiner/joiner.go

GitHub Actions / Test (macos-latest)

not enough arguments in call to getter.New
for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
return err
}
sAddresses, pAddresses := file.ChunkAddresses(data[:eSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Lint

not enough arguments in call to getter.New

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Lint

not enough arguments in call to getter.New

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Init

not enough arguments in call to getter.New

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Test (flaky)

not enough arguments in call to getter.New

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Test (ubuntu-latest)

not enough arguments in call to getter.New

Check failure on line 309 in pkg/file/joiner/joiner.go

GitHub Actions / Test (macos-latest)

not enough arguments in call to getter.New
for cursor := 0; cursor < len(data); cursor += j.refLength {
ref := data[cursor : cursor+j.refLength]
var reportAddr swarm.Address
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/file/joiner"

Check failure on line 16 in pkg/traversal/traversal.go

GitHub Actions / Lint

could not import github.com/ethersphere/bee/pkg/file/joiner (-: # github.com/ethersphere/bee/pkg/file/joiner
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/manifest/mantaray"
mtx.Lock()
bins[bin]++
totaldur += dur.Seconds()
peers = append(peers, peer{snapshot, dur, addr, bin, s.reserve.IsWithinStorageRadius(addr)})

Check failure on line 165 in pkg/salud/salud.go

GitHub Actions / Lint

s.reserve.IsWithinStorageRadius undefined (type reserve has no field or method IsWithinStorageRadius) (typecheck)
mtx.Unlock()
}()
return false, false, nil
}
selfHealth := true
if s.reserve.StorageRadius() != networkRadius {

Check failure on line 225 in pkg/salud/salud.go

GitHub Actions / Lint

s.reserve.StorageRadius undefined (type reserve has no field or method StorageRadius) (typecheck)
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepency", "self_radius", s.reserve.StorageRadius(), "network_radius", networkRadius)

Check failure on line 227 in pkg/salud/salud.go

GitHub Actions / Lint

s.reserve.StorageRadius undefined (type reserve has no field or method StorageRadius) (typecheck)
}
s.isSelfHealthy.Store(selfHealth)
"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"

Check failure on line 23 in cmd/bee/cmd/db.go

GitHub Actions / Lint

"github.com/ethersphere/bee/pkg/storage" imported and not used (typecheck)
"github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/cobra"