Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(redundancy/getter): RS decoder rewrite (#4507)
Browse files Browse the repository at this point in the history
zelig authored Dec 17, 2023
1 parent 653e0de commit 49f6f2f
Showing 8 changed files with 923 additions and 541 deletions.
144 changes: 98 additions & 46 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"io"
"sync"
"sync/atomic"
"time"

"github.com/ethersphere/bee/pkg/bmt"
"github.com/ethersphere/bee/pkg/encryption"
@@ -33,20 +34,74 @@ type joiner struct {
rootParity int
maxBranching int // maximum branching in an intermediate chunk

ctx context.Context
getter storage.Getter
putter storage.Putter // required to save recovered data

ctx context.Context
decoders *decoderCache
chunkToSpan func(data []byte) (redundancy.Level, int64) // returns parity and span value from chunkData
}

// decoderCache is cache of decoders for intermediate chunks
type decoderCache struct {
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
mu sync.Mutex // mutex to protect cache
cache map[string]storage.Getter // map from chunk address to RS getter
strategy getter.Strategy // strategy to use for retrieval
strict bool // strict mode
fetcherTimeout time.Duration // timeout for each fetch
}

// NewDecoderCache creates a new decoder cache
func NewDecoderCache(g storage.Getter, p storage.Putter, strategy getter.Strategy, strict bool, fetcherTimeout time.Duration) *decoderCache {
return &decoderCache{
fetcher: g,
putter: p,
cache: make(map[string]storage.Getter),
strategy: strategy,
strict: strict,
fetcherTimeout: fetcherTimeout,
}
}

func fingerprint(addrs []swarm.Address) string {
h := swarm.NewHasher()
for _, addr := range addrs {
_, _ = h.Write(addr.Bytes())
}
return string(h.Sum(nil))
}

// GetOrCreate returns a decoder for the given chunk address
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {
if len(addrs) == shardCnt {
return g.fetcher
}
key := fingerprint(addrs)
g.mu.Lock()
defer g.mu.Unlock()
d, ok := g.cache[key]
if ok {
if d == nil {
return g.fetcher
}
return d
}
remove := func() {
g.mu.Lock()
defer g.mu.Unlock()
g.cache[key] = nil
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, g.strategy, g.strict, g.fetcherTimeout, remove)
g.cache[key] = d
return d
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, getter storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
func New(ctx context.Context, g storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
// retrieve the root chunk to read the total data length the be retrieved
rLevel := replicas.GetLevelFromContext(ctx)
rootChunkGetter := store.New(getter)
rootChunkGetter := store.New(g)
if rLevel != redundancy.NONE {
rootChunkGetter = store.New(replicas.NewGetter(getter, rLevel))
rootChunkGetter = store.New(replicas.NewGetter(g, rLevel))
}
rootChunk, err := rootChunkGetter.Get(ctx, address)
if err != nil {
@@ -56,17 +111,21 @@ func New(ctx context.Context, getter storage.Getter, putter storage.Putter, addr
chunkData := rootChunk.Data()
rootData := chunkData[swarm.SpanSize:]
refLength := len(address.Bytes())
encryption := refLength != swarm.HashSize
encryption := refLength == encryption.ReferenceSize
rLevel, span := chunkToSpan(chunkData)
rootParity := 0
maxBranching := swarm.ChunkSize / refLength
spanFn := func(data []byte) (redundancy.Level, int64) {
return 0, int64(bmt.LengthFromSpan(data[:swarm.SpanSize]))
}
var strategy getter.Strategy
var strict bool
var fetcherTimeout time.Duration
// override stuff if root chunk has redundancy
if rLevel != redundancy.NONE {
_, parities := file.ReferenceCount(uint64(span), rLevel, encryption)
rootParity = parities
strategy, strict, fetcherTimeout = getter.GetParamsFromContext(ctx)
spanFn = chunkToSpan
if encryption {
maxBranching = rLevel.GetMaxEncShards()
@@ -79,8 +138,7 @@ func New(ctx context.Context, getter storage.Getter, putter storage.Putter, addr
addr: rootChunk.Address(),
refLength: refLength,
ctx: ctx,
getter: getter,
putter: putter,
decoders: NewDecoderCache(g, putter, strategy, strict, fetcherTimeout),
span: span,
rootData: rootData,
rootParity: rootParity,
@@ -148,16 +206,16 @@ func (j *joiner) readAtOffset(
atomic.AddInt64(bytesRead, int64(n))
return
}

pSize, err := file.ChunkPayloadSize(data)
if err != nil {
eg.Go(func() error {
return err
})
return
}
sAddresses, pAddresses := file.ChunkAddresses(data[:pSize], parity, j.refLength)
getter := store.New(getter.New(sAddresses, pAddresses, j.getter, j.putter))

addrs, shardCnt := file.ChunkAddresses(data[:pSize], parity, j.refLength)
g := store.New(j.decoders.GetOrCreate(addrs, shardCnt))
for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
@@ -171,7 +229,7 @@ func (j *joiner) readAtOffset(
}

// if we are here it means that we are within the bounds of the data we need to read
address := swarm.NewAddress(data[cursor : cursor+j.refLength])
addr := swarm.NewAddress(data[cursor : cursor+j.refLength])

subtrieSpan := sec
subtrieSpanLimit := sec
@@ -188,14 +246,14 @@ func (j *joiner) readAtOffset(

func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) {
eg.Go(func() error {
ch, err := getter.Get(j.ctx, address)
ch, err := g.Get(j.ctx, addr)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, subtrieParity := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)
_, subtrieParity := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength == encryption.ReferenceSize)

if subtrieSpan > subtrieSpanLimit {
return ErrMalformedTrie
@@ -204,7 +262,7 @@ func (j *joiner) readAtOffset(
j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg)
return nil
})
}(address, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)
}(addr, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)

bufferOffset += currentReadSize
bytesToRead -= currentReadSize
@@ -307,45 +365,39 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
if err != nil {
return err
}
sAddresses, pAddresses := file.ChunkAddresses(data[:eSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
ref := data[cursor : cursor+j.refLength]
var reportAddr swarm.Address
address := swarm.NewAddress(ref)
if len(ref) == encryption.ReferenceSize {
reportAddr = swarm.NewAddress(ref[:swarm.HashSize])
} else {
reportAddr = swarm.NewAddress(ref)
}

if err := fn(reportAddr); err != nil {
addrs, shardCnt := file.ChunkAddresses(data[:eSize], parity, j.refLength)
g := store.New(j.decoders.GetOrCreate(addrs, shardCnt))
for i, addr := range addrs {
if err := fn(addr); err != nil {
return err
}

cursor := i * swarm.HashSize
if j.refLength == encryption.ReferenceSize {
cursor += swarm.HashSize * min(i, shardCnt)
}
sec := j.subtrieSection(data, cursor, eSize, parity, subTrieSize)
if sec <= swarm.ChunkSize {
continue
}

func(address swarm.Address, eg *errgroup.Group) {
wg.Add(1)

eg.Go(func() error {
defer wg.Done()
wg.Add(1)
eg.Go(func() error {
defer wg.Done()

ch, err := getter.Get(ectx, address)
if err != nil {
return err
}
if j.refLength == encryption.ReferenceSize && i < shardCnt {
addr = swarm.NewAddress(data[cursor : cursor+swarm.HashSize*2])
}
ch, err := g.Get(ectx, addr)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)
chunkData := ch.Data()[8:]
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
})
}(address, eg)
return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
})

wg.Wait()
}
Loading

0 comments on commit 49f6f2f

Please sign in to comment.