Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: redundancy strategies openapi #4510

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
responses:
"200":
description: Retrieved content specified by reference
Expand Down Expand Up @@ -316,6 +321,11 @@ paths:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
name: swarm-cache
required: false
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
responses:
"200":
description: Ok
Expand All @@ -341,6 +351,11 @@ paths:
tags:
- BZZ
parameters:
- in: header
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
name: swarm-redundancy-strategy
required: false
- in: path
name: reference
schema:
Expand Down
11 changes: 11 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,17 @@ components:
Add redundancy to the data being uploaded so that downloaders can download it with better UX.
0 value is default and does not add any redundancy to the file.

SwarmRedundancyStrategyParameter:
in: header
name: swarm-redundancy-parameter
schema:
type: integer
enum: [0, 1, 2, 3, 4]
required: false
description: >
Force different retrieve strategies on redundant data.
The mumbers stand for NONE, DATA, PROX and RACE, respectively.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be good to mention which is the default if the header is not specified?

Copy link
Collaborator

@ldeffenb ldeffenb Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 values, but only 4 words? Respectively doesn't seem to align?

Also "mumbers" should be "numbers".


ContentTypePreserved:
in: header
name: Content-Type
Expand Down
4 changes: 4 additions & 0 deletions pkg/file/pipeline/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ func NewPipelineBuilder(ctx context.Context, s storage.Putter, encrypt bool, rLe
func newPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
pipeline := newShortPipelineFunc(ctx, s)
tw := hashtrie.NewHashTrieWriter(
ctx,
swarm.HashSize,
redundancy.New(rLevel, false, pipeline),
pipeline,
s,
)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
Expand All @@ -61,9 +63,11 @@ func newShortPipelineFunc(ctx context.Context, s storage.Putter) func() pipeline
// with the unencrypted span is preserved.
func newEncryptionPipeline(ctx context.Context, s storage.Putter, rLevel redundancy.Level) pipeline.Interface {
tw := hashtrie.NewHashTrieWriter(
ctx,
swarm.HashSize+encryption.KeyLength,
redundancy.New(rLevel, true, newShortPipelineFunc(ctx, s)),
newShortEncryptionPipelineFunc(ctx, s),
s,
)
lsw := store.NewStoreWriter(ctx, s, tw)
b := bmt.NewBmtWriter(lsw)
Expand Down
38 changes: 33 additions & 5 deletions pkg/file/pipeline/hashtrie/hashtrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
package hashtrie

import (
"context"
"encoding/binary"
"errors"
"fmt"

"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/replicas"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand All @@ -21,20 +25,29 @@ var (
const maxLevel = 8

type hashTrieWriter struct {
ctx context.Context // context for put function of dispersed replica chunks
refSize int
cursors []int // level cursors, key is level. level 0 is data level holds how many chunks were processed. Intermediate higher levels will always have LOWER cursor values.
buffer []byte // keeps intermediate level data
full bool // indicates whether the trie is full. currently we support (128^7)*4096 = 2305843009213693952 bytes
pipelineFn pipeline.PipelineFunc
rParams redundancy.IParams
parityChunkFn redundancy.ParityChunkCallback
chunkCounters []uint8 // counts the chunk references in intermediate chunks. key is the chunk level.
effectiveChunkCounters []uint8 // counts the effective chunk references in intermediate chunks. key is the chunk level.
maxChildrenChunks uint8 // maximum number of chunk references in intermediate chunks.
chunkCounters []uint8 // counts the chunk references in intermediate chunks. key is the chunk level.
effectiveChunkCounters []uint8 // counts the effective chunk references in intermediate chunks. key is the chunk level.
maxChildrenChunks uint8 // maximum number of chunk references in intermediate chunks.
replicaPutter storage.Putter // putter to save dispersed replicas of the root chunk
}

func NewHashTrieWriter(refLen int, rParams redundancy.IParams, pipelineFn pipeline.PipelineFunc) pipeline.ChainWriter {
func NewHashTrieWriter(
ctx context.Context,
refLen int,
rParams redundancy.IParams,
pipelineFn pipeline.PipelineFunc,
replicaPutter storage.Putter,
) pipeline.ChainWriter {
h := &hashTrieWriter{
ctx: ctx,
refSize: refLen,
cursors: make([]int, 9),
buffer: make([]byte, swarm.ChunkWithSpanSize*9*2), // double size as temp workaround for weak calculation of needed buffer space
Expand All @@ -43,6 +56,7 @@ func NewHashTrieWriter(refLen int, rParams redundancy.IParams, pipelineFn pipeli
chunkCounters: make([]uint8, 9),
effectiveChunkCounters: make([]uint8, 9),
maxChildrenChunks: uint8(rParams.MaxShards() + rParams.Parities(rParams.MaxShards())),
replicaPutter: replicaPutter,
}
h.parityChunkFn = func(level int, span, address []byte) error {
return h.writeToIntermediateLevel(level, true, span, address, []byte{})
Expand Down Expand Up @@ -245,5 +259,19 @@ func (h *hashTrieWriter) Sum() ([]byte, error) {

// return the hash in the highest level, that's all we need
data := h.buffer[0:h.cursors[maxLevel]]
return data[swarm.SpanSize:], nil
rootHash := data[swarm.SpanSize:]

// save disperse replicas of the root chunk
if h.rParams.Level() != redundancy.NONE {
rootData, err := h.rParams.GetRootData()
if err != nil {
return nil, err
}
putter := replicas.NewPutter(h.replicaPutter)
err = putter.Put(h.ctx, swarm.NewChunk(swarm.NewAddress(rootHash), rootData))
if err != nil {
return nil, fmt.Errorf("hashtrie: cannot put dispersed replica %s", err.Error())
}
}
return rootHash, nil
}
29 changes: 23 additions & 6 deletions pkg/file/pipeline/hashtrie/hashtrie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/file/pipeline/mock"
"github.com/ethersphere/bee/pkg/file/pipeline/store"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/replicas"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/pkg/swarm"
Expand Down Expand Up @@ -52,6 +53,7 @@ func newErasureHashTrieWriter(
rLevel redundancy.Level,
encryptChunks bool,
intermediateChunkPipeline, parityChunkPipeline pipeline.ChainWriter,
replicaPutter storage.Putter,
) (redundancy.IParams, pipeline.ChainWriter) {
pf := func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, intermediateChunkPipeline)
Expand All @@ -75,7 +77,7 @@ func newErasureHashTrieWriter(
}

r := redundancy.New(rLevel, encryptChunks, ppf)
ht := hashtrie.NewHashTrieWriter(hashSize, r, pf)
ht := hashtrie.NewHashTrieWriter(ctx, hashSize, r, pf, replicaPutter)
return r, ht
}

Expand Down Expand Up @@ -143,7 +145,7 @@ func TestLevels(t *testing.T) {
return bmt.NewBmtWriter(lsw)
}

ht := hashtrie.NewHashTrieWriter(hashSize, redundancy.New(0, false, pf), pf)
ht := hashtrie.NewHashTrieWriter(ctx, hashSize, redundancy.New(0, false, pf), pf, s)

for i := 0; i < tc.writes; i++ {
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
Expand Down Expand Up @@ -196,7 +198,7 @@ func TestLevels_TrieFull(t *testing.T) {
Params: *r,
}

ht = hashtrie.NewHashTrieWriter(hashSize, rMock, pf)
ht = hashtrie.NewHashTrieWriter(ctx, hashSize, rMock, pf, s)
)

// to create a level wrap we need to do branching^(level-1) writes
Expand Down Expand Up @@ -237,7 +239,7 @@ func TestRegression(t *testing.T) {
lsw := store.NewStoreWriter(ctx, s, nil)
return bmt.NewBmtWriter(lsw)
}
ht = hashtrie.NewHashTrieWriter(hashSize, redundancy.New(0, false, pf), pf)
ht = hashtrie.NewHashTrieWriter(ctx, hashSize, redundancy.New(0, false, pf), pf, s)
)
binary.LittleEndian.PutUint64(span, 4096)

Expand Down Expand Up @@ -265,6 +267,16 @@ func TestRegression(t *testing.T) {
}
}

type replicaPutter struct {
storage.Putter
replicaCount uint8
}

func (r *replicaPutter) Put(ctx context.Context, chunk swarm.Chunk) error {
r.replicaCount++
return r.Putter.Put(ctx, chunk)
}

// TestRedundancy using erasure coding library and checks carrierChunk function and modified span in intermediate chunk
func TestRedundancy(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -303,12 +315,14 @@ func TestRedundancy(t *testing.T) {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
subCtx := replicas.SetLevel(ctx, tc.level)

s := inmemchunkstore.New()
intermediateChunkCounter := mock.NewChainWriter()
parityChunkCounter := mock.NewChainWriter()
replicaChunkCounter := &replicaPutter{Putter: s}

r, ht := newErasureHashTrieWriter(ctx, s, tc.level, tc.encryption, intermediateChunkCounter, parityChunkCounter)
r, ht := newErasureHashTrieWriter(subCtx, s, tc.level, tc.encryption, intermediateChunkCounter, parityChunkCounter, replicaChunkCounter)

// write data to the hashTrie
var key []byte
Expand Down Expand Up @@ -336,7 +350,7 @@ func TestRedundancy(t *testing.T) {
t.Errorf("effective chunks should be %d. Got: %d", tc.writes, intermediateChunkCounter.ChainWriteCalls())
}

rootch, err := s.Get(ctx, swarm.NewAddress(ref[:swarm.HashSize]))
rootch, err := s.Get(subCtx, swarm.NewAddress(ref[:swarm.HashSize]))
if err != nil {
t.Fatal(err)
}
Expand All @@ -362,6 +376,9 @@ func TestRedundancy(t *testing.T) {
if expectedParities != parity {
t.Fatalf("want parity %d got %d", expectedParities, parity)
}
if tc.level.GetReplicaCount()+1 != int(replicaChunkCounter.replicaCount) { // +1 is the original chunk
t.Fatalf("unexpected number of replicas: want %d. Got: %d", tc.level.GetReplicaCount(), int(replicaChunkCounter.replicaCount))
}
})
}
}
Loading
Loading