Skip to content

Commit

Permalink
feat: keep using the namespace proof for GetBlob and GetAll (#3633)
Browse files Browse the repository at this point in the history
Closes #3629

Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
rach-id and Wondertan authored Oct 2, 2024
1 parent 32d3137 commit d3e653a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 20 deletions.
5 changes: 4 additions & 1 deletion blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/celestiaorg/celestia-node/share"
)

//nolint:unused
var errEmptyShares = errors.New("empty shares")

// Proof constructs the proof of a blob to the data root.
Expand All @@ -32,6 +31,10 @@ type Proof struct {
RowToDataRootProof coretypes.RowProof
}

// namespaceToRowRootProof a proof of a set of namespace shares to the row
// roots they belong to.
type namespaceToRowRootProof []*nmt.Proof

// Verify takes a blob and a data root and verifies if the
// provided blob was committed to the given data root.
func (p *Proof) Verify(blob *Blob, dataRoot []byte) (bool, error) {
Expand Down
5 changes: 0 additions & 5 deletions blob/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ type parser struct {

// set tries to find the first blob's share by skipping padding shares and
// sets the metadata of the blob(index and length)
//
//nolint:unused
func (p *parser) set(index int, shrs []shares.Share) ([]shares.Share, error) {
if len(shrs) == 0 {
return nil, errEmptyShares
Expand Down Expand Up @@ -115,8 +113,6 @@ func (p *parser) parse() (*Blob, error) {

// skipPadding iterates through the shares until non-padding share will be found. It guarantees that
// the returned set of shares will start with non-padding share(or empty set of shares).
//
//nolint:unused
func (p *parser) skipPadding(shares []shares.Share) ([]shares.Share, error) {
if len(shares) == 0 {
return nil, errEmptyShares
Expand Down Expand Up @@ -148,7 +144,6 @@ func (p *parser) verify(blob *Blob) bool {
return p.verifyFn(blob)
}

//nolint:unused
func (p *parser) isEmpty() bool {
return p.index == 0 && p.length == 0 && len(p.shares) == 0
}
Expand Down
160 changes: 154 additions & 6 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func (s *Service) GetProof(
return blob.compareCommitments(commitment)
}}

_, proof, err = s.retrieve(ctx, height, namespace, sharesParser)
return proof, nil
_, proof, err = s.retrieveBlobProof(ctx, height, namespace, sharesParser)
return proof, err
}

// GetAll returns all blobs under the given namespaces at the given height.
Expand Down Expand Up @@ -341,7 +341,7 @@ func (s *Service) retrieve(
height uint64,
namespace share.Namespace,
sharesParser *parser,
) (_ *Blob, _ *Proof, err error) {
) (_ *Blob, _ *namespaceToRowRootProof, err error) {
log.Infow("requesting blob",
"height", height,
"namespace", namespace.String())
Expand All @@ -354,14 +354,153 @@ func (s *Service) retrieve(
return nil, nil, err
}

eds, err := s.shareGetter.GetEDS(ctx, header)
headerGetterSpan.SetStatus(codes.Ok, "")
headerGetterSpan.AddEvent("received header", trace.WithAttributes(
attribute.Int64("eds-size", int64(len(header.DAH.RowRoots)))))

rowIndex := -1
for i, row := range header.DAH.RowRoots {
if !namespace.IsOutsideRange(row, row) {
rowIndex = i
break
}
}

// Note: there is no need to check whether the row index is different from -1
// because it will be handled at the end to return the correct error.

getCtx, getSharesSpan := tracer.Start(ctx, "get-shares-by-namespace")

// collect shares for the requested namespace
namespacedShares, err := s.shareGetter.GetSharesByNamespace(getCtx, header, namespace)
if err != nil {
if errors.Is(err, share.ErrNotFound) {
err = ErrBlobNotFound
}
getSharesSpan.SetStatus(codes.Error, err.Error())
return nil, nil, err
}
headerGetterSpan.SetStatus(codes.Ok, "")
headerGetterSpan.AddEvent("received eds", trace.WithAttributes(

getSharesSpan.SetStatus(codes.Ok, "")
getSharesSpan.AddEvent("received shares", trace.WithAttributes(
attribute.Int64("eds-size", int64(len(header.DAH.RowRoots)))))

var (
appShares = make([]shares.Share, 0)
proofs = make(namespaceToRowRootProof, 0)
)

for _, row := range namespacedShares {
if len(row.Shares) == 0 {
// the above condition means that we've faced with an Absence Proof.
// This Proof proves that the namespace was not found in the DAH, so
// we can return `ErrBlobNotFound`.
return nil, nil, ErrBlobNotFound
}

appShares, err = toAppShares(row.Shares...)
if err != nil {
return nil, nil, err
}

proofs = append(proofs, row.Proof)
index := row.Proof.Start()

for {
var (
isComplete bool
shrs []shares.Share
wasEmpty = sharesParser.isEmpty()
)

if wasEmpty {
// create a parser if it is empty
shrs, err = sharesParser.set(rowIndex*len(header.DAH.RowRoots)+index, appShares)
if err != nil {
if errors.Is(err, errEmptyShares) {
// reset parser as `skipPadding` can update next blob's index
sharesParser.reset()
appShares = nil
break
}
return nil, nil, err
}

// update index and shares if padding shares were detected.
if len(appShares) != len(shrs) {
index += len(appShares) - len(shrs)
appShares = shrs
}
}

shrs, isComplete = sharesParser.addShares(appShares)
// move to the next row if the blob is incomplete
if !isComplete {
appShares = nil
break
}
// otherwise construct blob
blob, err := sharesParser.parse()
if err != nil {
return nil, nil, err
}

if sharesParser.verify(blob) {
return blob, &proofs, nil
}

index += len(appShares) - len(shrs)
appShares = shrs
sharesParser.reset()

if !wasEmpty {
// remove proofs for prev rows if verified blob spans multiple rows
proofs = proofs[len(proofs)-1:]
}
}

rowIndex++
if sharesParser.isEmpty() {
proofs = nil
}
}

err = ErrBlobNotFound
for _, sh := range appShares {
ok, err := sh.IsPadding()
if err != nil {
return nil, nil, err
}
if !ok {
err = fmt.Errorf("incomplete blob with the "+
"namespace: %s detected at %d: %w", namespace.String(), height, err)
log.Error(err)
}
}
return nil, nil, err
}

// retrieve retrieves blobs and their proofs by requesting the whole namespace and
// comparing Commitments.
// Retrieving is stopped once the `verify` condition in shareParser is met.
func (s *Service) retrieveBlobProof(
ctx context.Context,
height uint64,
namespace share.Namespace,
sharesParser *parser,
) (_ *Blob, _ *Proof, err error) {
log.Infow("requesting blob proof",
"height", height,
"namespace", namespace.String())

getCtx, headerGetterSpan := tracer.Start(ctx, "header-getter")

header, err := s.headerGetter(getCtx, height)
if err != nil {
headerGetterSpan.SetStatus(codes.Error, err.Error())
return nil, nil, err
}

// find the index of the row where the blob could start
inclusiveNamespaceStartRowIndex := -1
for i, row := range header.DAH.RowRoots {
Expand All @@ -386,6 +525,15 @@ func (s *Service) retrieve(
return nil, nil, fmt.Errorf("couldn't find the row index of the namespace end")
}

eds, err := s.shareGetter.GetEDS(ctx, header)
if err != nil {
headerGetterSpan.SetStatus(codes.Error, err.Error())
return nil, nil, err
}
headerGetterSpan.SetStatus(codes.Ok, "")
headerGetterSpan.AddEvent("received eds", trace.WithAttributes(
attribute.Int64("eds-size", int64(len(header.DAH.RowRoots)))))

// calculate the square size
squareSize := len(header.DAH.RowRoots) / 2

Expand Down
67 changes: 59 additions & 8 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/merkle"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/proto/tendermint/types"
coretypes "github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
Expand Down Expand Up @@ -380,12 +381,18 @@ func TestBlobService_Get(t *testing.T) {
name: "internal error",
doFn: func() (interface{}, error) {
ctrl := gomock.NewController(t)
shareService := service.shareGetter
shareGetterMock := shareMock.NewMockModule(ctrl)
shareGetterMock.EXPECT().
GetEDS(gomock.Any(), gomock.Any()).
GetSharesByNamespace(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
return nil, errors.New("internal error")
func(
ctx context.Context, h *header.ExtendedHeader, ns share.Namespace,
) (share.NamespacedShares, error) {
if ns.Equals(blobsWithDiffNamespaces[0].Namespace()) {
return nil, errors.New("internal error")
}
return shareService.GetSharesByNamespace(ctx, h, ns)
}).AnyTimes()

service.shareGetter = shareGetterMock
Expand All @@ -396,6 +403,31 @@ func TestBlobService_Get(t *testing.T) {
},
)
},
expectedResult: func(res interface{}, err error) {
blobs, ok := res.([]*Blob)
assert.True(t, ok)
assert.Error(t, err)
assert.Contains(t, err.Error(), "internal error")
assert.Equal(t, blobs[0].Namespace(), blobsWithSameNamespace[0].Namespace())
assert.NotEmpty(t, blobs)
assert.Len(t, blobs, len(blobsWithSameNamespace))
},
},
{
name: "get blob internal error",
doFn: func() (interface{}, error) {
ctrl := gomock.NewController(t)
shareGetterMock := shareMock.NewMockModule(ctrl)
shareGetterMock.EXPECT().
GetEDS(gomock.Any(), gomock.Any()).
DoAndReturn(
func(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
return nil, errors.New("internal error")
}).AnyTimes()

service.shareGetter = shareGetterMock
return service.GetProof(ctx, 1, blobsWithDiffNamespaces[0].Namespace(), blobsWithDiffNamespaces[0].Commitment)
},
expectedResult: func(res interface{}, err error) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "internal error")
Expand Down Expand Up @@ -1134,9 +1166,15 @@ func TestBlobVerify(t *testing.T) {
name: "malformed blob and proof",
dataRoot: dataRoot,
proof: func() Proof {
p := blobProof
p.ShareToRowRootProof = p.ShareToRowRootProof[1:]
return p
return Proof{
ShareToRowRootProof: []*types.NMTProof{{
Start: 1,
End: 3,
Nodes: [][]byte{{0x01}},
LeafHash: nil,
}},
RowToDataRootProof: blobProof.RowToDataRootProof,
}
}(),
blob: func() Blob {
b := *blob
Expand All @@ -1150,7 +1188,7 @@ func TestBlobVerify(t *testing.T) {
dataRoot: dataRoot,
proof: func() Proof {
p := blobProof
p.ShareToRowRootProof = p.ShareToRowRootProof[1:]
p.ShareToRowRootProof[0].End = 15
return p
}(),
blob: *blob,
Expand All @@ -1167,7 +1205,20 @@ func TestBlobVerify(t *testing.T) {
name: "valid proof",
dataRoot: dataRoot,
blob: *blob,
proof: blobProof,
proof: func() Proof {
sharesProof, err := pkgproof.NewShareInclusionProofFromEDS(
eds,
nss[5],
shares.NewRange(startShareIndex, startShareIndex+len(blobShares)),
)
require.NoError(t, err)
require.NoError(t, sharesProof.Validate(dataRoot))

return Proof{
ShareToRowRootProof: sharesProof.ShareProofs,
RowToDataRootProof: sharesProof.RowProof,
}
}(),
},
}

Expand Down

0 comments on commit d3e653a

Please sign in to comment.