Skip to content

Commit

Permalink
add get proof by namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Nov 10, 2022
1 parent bcad107 commit 8c9ec98
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 12 deletions.
60 changes: 60 additions & 0 deletions share/get_proof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package share

import (
"context"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"

"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/nmt/namespace"
)

// ShareWithProof contains data with corresponding Merkle Proof
type SharesWithProofs struct {
// Share is a full data including namespace
Shares []Share
// Proof is a Merkle Proof of current share
Proof nmt.Proof
}

// GetSharesWithProofsByNamespace walks the tree of a given root and returns its shares within the given namespace.ID.
// If a share could not be retrieved, err is not nil, and the returned array
// contains nil shares in place of the shares it was unable to retrieve.
func GetSharesWithProofsByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
maxShares int,
) (*SharesWithProofs, error) {
ctx, span := tracer.Start(ctx, "get-shares-by-namespace")
defer span.End()

nodes, err := ipld.GetLeavesWithProofsByNamespace(ctx, bGetter, root, nID, maxShares)
if nodes == nil {
return nil, err
}

shares := make([]Share, 0, nodes.ProofEnd-nodes.ProofStart)
for _, leaf := range nodes.Leaves {
if leaf != nil {
shares = append(shares, leafToShare(leaf))
}
}

// pack proofs for nmt.Proof
rangeProofs := make([][]byte, 0, len(nodes.Proofs))
for i := len(nodes.Proofs) - 1; i >= 0; i-- {
if nodes.Proofs[i] != nil {
rangeProofs = append(rangeProofs, nodes.Proofs[i])
}

}

return &SharesWithProofs{
Shares: shares,
Proof: nmt.NewInclusionProof(nodes.ProofStart, nodes.ProofEnd, rangeProofs, true),
}, nil
}
88 changes: 88 additions & 0 deletions share/get_proof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package share

import (
"context"
"math/rand"
"strconv"
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/minio/sha256-simd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/ipld"
)

func TestGetSharesWithProofsByNamespace(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bServ := mdutils.Bserv()

var tests = []struct {
rawData []Share
}{
{rawData: RandShares(t, 4)},
{rawData: RandShares(t, 16)},
{rawData: RandShares(t, 64)},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
rand.Seed(time.Now().UnixNano())
// choose random range in shares f
from := rand.Intn(len(tt.rawData) - 1)
to := rand.Intn(len(tt.rawData) - 1)

//
expected := tt.rawData[from]
nID := expected[:NamespaceSize]

if to < from {
tmp := from
from, to = to, tmp
}

// change rawData to contain several shares with same nID
for i := from; i <= to; i++ {
tt.rawData[i] = expected
}

// put raw data in BlockService
eds, err := AddShares(ctx, tt.rawData, bServ)
require.NoError(t, err)

var shares []Share
for _, row := range eds.RowRoots() {
rcid := ipld.MustCidFromNamespacedSha256(row)
rowShares, err := GetSharesWithProofsByNamespace(ctx, bServ, rcid, nID, len(eds.RowRoots()))
require.NoError(t, err)
if rowShares != nil {
// append shares to check integrity later
shares = append(shares, rowShares.Shares...)

// construct nodes from shares by prepending namespace
var leafs [][]byte
for _, sh := range rowShares.Shares {
leafs = append(leafs, append(sh[:NamespaceSize], sh...))
}

// validate proof
verified := rowShares.Proof.VerifyNamespace(
sha256.New(),
nID,
leafs,
ipld.NamespacedSha256FromCID(rcid))
require.True(t, verified)
}
}

// validate shares
assert.Equal(t, to-from+1, len(shares))
for _, share := range shares {
assert.Equal(t, expected, share)
}
})
}
}
185 changes: 173 additions & 12 deletions share/ipld/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func GetLeaves(ctx context.Context,

span.SetAttributes(
attribute.String("cid", j.id.String()),
attribute.Int("pos", j.pos),
attribute.Int("pos", j.sharePos),
)

nd, err := GetNode(ctx, bGetter, j.id)
Expand All @@ -141,7 +141,7 @@ func GetLeaves(ctx context.Context,
// successfully fetched a share/leaf
// ladies and gentlemen, we got em!
span.SetStatus(codes.Ok, "")
put(j.pos, nd)
put(j.sharePos, nd)
return
}
// ok, we found more links
Expand All @@ -152,7 +152,7 @@ func GetLeaves(ctx context.Context,
id: lnk.Cid,
// calc position for children nodes (bin-tree-feat),
// s.t. 'if' above knows where to put a share
pos: j.pos*2 + i,
sharePos: j.sharePos*2 + i,
// we pass the context to job so that spans are tracked in a tree
// structure
ctx: ctx,
Expand Down Expand Up @@ -235,7 +235,7 @@ func GetLeavesByNamespace(

span.SetAttributes(
attribute.String("cid", j.id.String()),
attribute.Int("pos", j.pos),
attribute.Int("pos", j.sharePos),
)

// if an error is likely to be returned or not depends on
Expand All @@ -245,21 +245,21 @@ func GetLeavesByNamespace(
singleErr.Do(func() {
retrievalErr = err
})
log.Errorw("getSharesByNamespace: could not retrieve node", "nID", nID, "pos", j.pos, "err", err)
log.Errorw("getSharesByNamespace: could not retrieve node", "nID", nID, "pos", j.sharePos, "err", err)
span.SetStatus(codes.Error, err.Error())
// we still need to update the bounds
bounds.update(int64(j.pos))
bounds.update(int64(j.sharePos))
return
}

links := nd.Links()
if len(links) == 0 {
// successfully fetched a leaf belonging to the namespace
span.SetStatus(codes.Ok, "")
leaves[j.pos] = nd
leaves[j.sharePos] = nd
// we found a leaf, so we update the bounds
// the update routine is repeated until the atomic swap is successful
bounds.update(int64(j.pos))
bounds.update(int64(j.sharePos))
return
}

Expand All @@ -269,7 +269,7 @@ func GetLeavesByNamespace(
id: lnk.Cid,
// position represents the index in a flattened binary tree,
// so we can return a slice of leaves in order
pos: j.pos*2 + i,
sharePos: j.sharePos*2 + i,
// we pass the context to job so that spans are tracked in a tree
// structure
ctx: ctx,
Expand Down Expand Up @@ -382,7 +382,168 @@ func (b *fetchedBounds) update(index int64) {
// job represents an encountered node to investigate during the `GetLeaves`
// and `GetLeavesByNamespace` routines.
type job struct {
id cid.Cid
pos int
ctx context.Context
id cid.Cid
sharePos int
pos int
ctx context.Context
}

// LeavesWithProofs contains data with corresponding Merkle Proof
type LeavesWithProofs struct {
Leaves []ipld.Node
Proofs [][]byte
ProofStart, ProofEnd int
}

// GetLeavesWithProofsByNamespace works same as GetLeavesByNamespace,
// but also returns inclusion proof
func GetLeavesWithProofsByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
nID namespace.ID,
maxShares int,
) (*LeavesWithProofs, error) {
if len(nID) != NamespaceSize {
return nil, fmt.Errorf("expected namespace ID of size %d, got %d", NamespaceSize, len(nID))
}

ctx, span := tracer.Start(ctx, "get-leaves-with-proofs-by-namespace")
defer span.End()

span.SetAttributes(
attribute.String("namespace", nID.String()),
attribute.String("root", root.String()),
)

// we don't know where in the tree the leaves in the namespace are,
// so we keep track of the bounds to return the correct slice
// maxShares acts as a sentinel to know if we find any leaves
bounds := fetchedBounds{int64(maxShares), 0}

// buffer the jobs to avoid blocking, we only need as many
// queued as the number of shares in the second-to-last layer
jobs := make(chan *job, (maxShares+1)/2)
jobs <- &job{id: root, ctx: ctx}

var wg chanGroup
wg.jobs = jobs
wg.add(1)

var (
singleErr sync.Once
retrievalErr error
)

// we overallocate space for leaves since we do not know how many we will find
// on the level above, the length of the Row is passed in as maxShares
leaves := make([]ipld.Node, maxShares)

// TODO: (@walldiss) this is massively overallocating and should be optimized with clever append
rightProofs := make([][]byte, BatchSize((maxShares)))
leftProofs := make([][]byte, BatchSize((maxShares)))
for {
select {
case j, ok := <-jobs:
if !ok {
// if there were no leaves under the given root in the given namespace,
// both return values are nil. otherwise, the error will also be non-nil.
if bounds.lowest == int64(maxShares) {
return nil, retrievalErr
}

var proofs [][]byte
// left side traversed in bottom-up order
for i := range leftProofs {
if leftProofs[i] != nil {
proofs = append(proofs, leftProofs[i])
}
}
// right side of the tree wil be traversed from up to bottom, so append in reversed order
for i := range rightProofs {
if rightProofs[len(rightProofs)-i-1] != nil {
proofs = append(proofs, rightProofs[len(leftProofs)-i-1])
}
}

return &LeavesWithProofs{
Leaves: leaves,
Proofs: proofs,
ProofStart: int(bounds.lowest),
ProofEnd: int(bounds.highest) + 1,
}, retrievalErr
}
pool.Submit(func() {
ctx, span := tracer.Start(j.ctx, "process-job")
defer span.End()
defer wg.done()

span.SetAttributes(
attribute.String("cid", j.id.String()),
attribute.Int("pos", j.sharePos),
)

// if an error is likely to be returned or not depends on
// the underlying impl of the blockservice, currently it is not a realistic probability
nd, err := GetNode(ctx, bGetter, j.id)
if err != nil {
singleErr.Do(func() {
retrievalErr = err
})
log.Errorw("getLeavesWithProofsByNamespace: could not retrieve node", "nID", nID, "pos", j.sharePos, "err", err)
span.SetStatus(codes.Error, err.Error())
// we still need to update the bounds
bounds.update(int64(j.sharePos))
return
}

links := nd.Links()
if len(links) == 0 {
// successfully fetched a leaf belonging to the namespace
span.SetStatus(codes.Ok, "")
leaves[j.sharePos] = nd
// we found a leaf, so we update the bounds
// the update routine is repeated until the atomic swap is successful
bounds.update(int64(j.sharePos))
return
}

// this node has links in the namespace, so keep walking
for i, lnk := range links {
newJob := &job{
id: lnk.Cid,
// sharePos represents potential share position in share slice
sharePos: j.sharePos*2 + i,
// position represents the index in a flattened binary tree,
// so we can return a slice of leaves in order
pos: j.pos*2 + i + 1,
// we pass the context to job so that spans are tracked in a tree
// structure
ctx: ctx,
}
// if the link's nID isn't in range we don't need to create a new job for it
jobNid := NamespacedSha256FromCID(newJob.id)
if nID.Less(nmt.MinNamespace(jobNid, nID.Size())) {
leftProofs[newJob.pos] = jobNid
continue
}
if !nID.LessOrEqual(nmt.MaxNamespace(jobNid, nID.Size())) {
rightProofs[newJob.pos] = jobNid
continue
}

// by passing the previous check, we know we will have one more node to process
// note: it is important to increase the counter before sending to the channel
wg.add(1)
select {
case jobs <- newJob:
case <-ctx.Done():
return
}
}
})
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

0 comments on commit 8c9ec98

Please sign in to comment.