diff --git a/blob/blob.go b/blob/blob.go index 9843441dd2..dc18669e32 100644 --- a/blob/blob.go +++ b/blob/blob.go @@ -9,13 +9,14 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/celestiaorg/celestia-app/pkg/appconsts" - "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/celestia-app/x/blob/types" "github.com/celestiaorg/nmt" "github.com/celestiaorg/celestia-node/share" ) +var errEmptyShares = errors.New("empty shares") + // Commitment is a Merkle Root of the subtree built from shares of the Blob. // It is computed by splitting the blob into shares and building the Merkle subtree to be included // after Submit. @@ -31,6 +32,9 @@ func (com Commitment) Equal(c Commitment) bool { } // Proof is a collection of nmt.Proofs that verifies the inclusion of the data. +// Proof proves the WHOLE namespaced data for the particular row. +// TODO (@vgonkivs): rework `Proof` in order to prove a particular blob. +// https://github.com/celestiaorg/celestia-node/issues/2303 type Proof []*nmt.Proof func (p Proof) Len() int { return len(p) } @@ -99,6 +103,10 @@ type Blob struct { // the celestia-node's namespace type // this is to avoid converting to and from app's type namespace share.Namespace + + // index represents the index of the blob's first share in the EDS. + // Only retrieved, on-chain blobs will have the index set. Default is -1. + index int } // NewBlobV0 constructs a new blob from the provided Namespace and data. @@ -127,7 +135,7 @@ func NewBlob(shareVersion uint8, namespace share.Namespace, data []byte) (*Blob, if err != nil { return nil, err } - return &Blob{Blob: blob, Commitment: com, namespace: namespace}, nil + return &Blob{Blob: blob, Commitment: com, namespace: namespace, index: -1}, nil } // Namespace returns blob's namespace. @@ -135,11 +143,22 @@ func (b *Blob) Namespace() share.Namespace { return b.namespace } +// Index returns the blob's first share index in the EDS. +// Only retrieved, on-chain blobs will have the index set. Default is -1. +func (b *Blob) Index() int { + return b.index +} + +func (b *Blob) compareCommitments(com Commitment) bool { + return bytes.Equal(b.Commitment, com) +} + type jsonBlob struct { Namespace share.Namespace `json:"namespace"` Data []byte `json:"data"` ShareVersion uint32 `json:"share_version"` Commitment Commitment `json:"commitment"` + Index int `json:"index"` } func (b *Blob) MarshalJSON() ([]byte, error) { @@ -148,6 +167,7 @@ func (b *Blob) MarshalJSON() ([]byte, error) { Data: b.Data, ShareVersion: b.ShareVersion, Commitment: b.Commitment, + Index: b.index, } return json.Marshal(blob) } @@ -165,39 +185,6 @@ func (b *Blob) UnmarshalJSON(data []byte) error { b.Blob.ShareVersion = blob.ShareVersion b.Commitment = blob.Commitment b.namespace = blob.Namespace + b.index = blob.Index return nil } - -// buildBlobsIfExist takes shares and tries building the Blobs from them. -// It will build blobs either until appShares will be empty or the first incomplete blob will -// appear, so in this specific case it will return all built blobs + remaining shares. -func buildBlobsIfExist(appShares []shares.Share) ([]*Blob, []shares.Share, error) { - if len(appShares) == 0 { - return nil, nil, errors.New("empty shares received") - } - blobs := make([]*Blob, 0, len(appShares)) - for { - length, err := appShares[0].SequenceLen() - if err != nil { - return nil, nil, err - } - - amount := shares.SparseSharesNeeded(length) - if amount > len(appShares) { - return blobs, appShares, nil - } - - b, err := parseShares(appShares[:amount]) - if err != nil { - return nil, nil, err - } - - // only 1 blob will be created bc we passed the exact amount of shares - blobs = append(blobs, b[0]) - - if amount == len(appShares) { - return blobs, nil, nil - } - appShares = appShares[amount:] - } -} diff --git a/blob/blob_test.go b/blob/blob_test.go index 85486ad125..020bfb191b 100644 --- a/blob/blob_test.go +++ b/blob/blob_test.go @@ -1,7 +1,7 @@ package blob import ( - "reflect" + "bytes" "testing" "github.com/stretchr/testify/assert" @@ -14,7 +14,7 @@ import ( ) func TestBlob(t *testing.T) { - appBlobs, err := blobtest.GenerateV0Blobs([]int{1}, false) + appBlobs, err := blobtest.GenerateV0Blobs([]int{16}, false) require.NoError(t, err) blob, err := convertBlobs(appBlobs...) require.NoError(t, err) @@ -53,10 +53,12 @@ func TestBlob(t *testing.T) { expectedRes: func(t *testing.T) { sh, err := BlobsToShares(blob...) require.NoError(t, err) - b, err := SharesToBlobs(sh) + shares, err := toAppShares(sh...) require.NoError(t, err) - assert.Equal(t, len(b), 1) - assert.Equal(t, blob[0].Commitment, b[0].Commitment) + p := &parser{length: len(shares), shares: shares} + b, err := p.parse() + require.NoError(t, err) + assert.Equal(t, blob[0].Commitment, b.Commitment) }, }, { @@ -67,7 +69,8 @@ func TestBlob(t *testing.T) { newBlob := &Blob{} require.NoError(t, newBlob.UnmarshalJSON(data)) - require.True(t, reflect.DeepEqual(blob[0], newBlob)) + require.True(t, bytes.Equal(blob[0].Blob.Data, newBlob.Data)) + require.True(t, bytes.Equal(blob[0].Commitment, newBlob.Commitment)) }, }, } diff --git a/blob/helper.go b/blob/helper.go index 72a56c7889..d3b418e8ba 100644 --- a/blob/helper.go +++ b/blob/helper.go @@ -11,55 +11,6 @@ import ( "github.com/celestiaorg/celestia-node/share" ) -// SharesToBlobs takes raw shares and converts them to the blobs. -func SharesToBlobs(rawShares []share.Share) ([]*Blob, error) { - if len(rawShares) == 0 { - return nil, ErrBlobNotFound - } - - appShares, err := toAppShares(rawShares...) - if err != nil { - return nil, err - } - return parseShares(appShares) -} - -// parseShares takes shares and converts them to the []*Blob. -func parseShares(appShrs []shares.Share) ([]*Blob, error) { - shareSequences, err := shares.ParseShares(appShrs, true) - if err != nil { - return nil, err - } - - // ensure that sequence length is not 0 - if len(shareSequences) == 0 { - return nil, ErrBlobNotFound - } - - blobs := make([]*Blob, len(shareSequences)) - for i, sequence := range shareSequences { - data, err := sequence.RawData() - if err != nil { - return nil, err - } - if len(data) == 0 { - continue - } - - shareVersion, err := sequence.Shares[0].Version() - if err != nil { - return nil, err - } - - blob, err := NewBlob(shareVersion, sequence.Namespace.Bytes(), data) - if err != nil { - return nil, err - } - blobs[i] = blob - } - return blobs, nil -} - // BlobsToShares accepts blobs and convert them to the Shares. func BlobsToShares(blobs ...*Blob) ([]share.Share, error) { b := make([]types.Blob, len(blobs)) @@ -75,7 +26,7 @@ func BlobsToShares(blobs ...*Blob) ([]share.Share, error) { sort.Slice(b, func(i, j int) bool { val := bytes.Compare(b[i].NamespaceID, b[j].NamespaceID) - return val <= 0 + return val < 0 }) rawShares, err := shares.SplitBlobs(b...) @@ -84,3 +35,22 @@ func BlobsToShares(blobs ...*Blob) ([]share.Share, error) { } return shares.ToBytes(rawShares), nil } + +// toAppShares converts node's raw shares to the app shares, skipping padding +func toAppShares(shrs ...share.Share) ([]shares.Share, error) { + appShrs := make([]shares.Share, 0, len(shrs)) + for _, shr := range shrs { + bShare, err := shares.NewShare(shr) + if err != nil { + return nil, err + } + appShrs = append(appShrs, *bShare) + } + return appShrs, nil +} + +func calculateIndex(rowLength, blobIndex int) (row, col int) { + row = blobIndex / rowLength + col = blobIndex - (row * rowLength) + return +} diff --git a/blob/parser.go b/blob/parser.go new file mode 100644 index 0000000000..8999afcb22 --- /dev/null +++ b/blob/parser.go @@ -0,0 +1,151 @@ +package blob + +import ( + "errors" + "fmt" + + "github.com/celestiaorg/celestia-app/pkg/shares" +) + +// parser is a helper struct that allows collecting shares and transforming them into the blob. +// it contains all necessary information that is needed to build the blob: +// * position of the blob inside the EDS; +// * blob's length; +// * shares needed to build the blob; +// * extra condition to verify the final blob. +type parser struct { + index int + length int + shares []shares.Share + verifyFn func(blob *Blob) bool +} + +// NOTE: passing shares here needed to detect padding shares(as we do not need this check in addShares) +func (p *parser) set(index int, shrs []shares.Share) ([]shares.Share, error) { + if len(shrs) == 0 { + return nil, errEmptyShares + } + + shrs, err := p.skipPadding(shrs) + if err != nil { + return nil, err + } + + if len(shrs) == 0 { + return nil, errEmptyShares + } + + // `+=` as index could be updated in `skipPadding` + p.index += index + length, err := shrs[0].SequenceLen() + if err != nil { + return nil, err + } + + p.length = shares.SparseSharesNeeded(length) + return shrs, nil +} + +// addShares sets shares until the blob is completed and returns extra shares back. +// we do not need here extra condition to check padding shares as we do not expect it here. +// it is possible only between two blobs. +func (p *parser) addShares(shares []shares.Share) (shrs []shares.Share, isComplete bool) { + index := -1 + for i, sh := range shares { + p.shares = append(p.shares, sh) + if len(p.shares) == p.length { + index = i + isComplete = true + break + } + } + + if index == -1 { + return + } + + if index+1 >= len(shares) { + return shrs, true + } + return shares[index+1:], true +} + +// parse parses shares and creates the Blob. +func (p *parser) parse() (*Blob, error) { + if p.length != len(p.shares) { + return nil, fmt.Errorf("invalid shares amount. want:%d, have:%d", p.length, len(p.shares)) + } + + sequence, err := shares.ParseShares(p.shares, true) + if err != nil { + return nil, err + } + + // ensure that sequence length is not 0 + if len(sequence) == 0 { + return nil, ErrBlobNotFound + } + if len(sequence) > 1 { + return nil, errors.New("unexpected amount of sequences") + } + + data, err := sequence[0].RawData() + if err != nil { + return nil, err + } + if len(data) == 0 { + return nil, ErrBlobNotFound + } + + shareVersion, err := sequence[0].Shares[0].Version() + if err != nil { + return nil, err + } + + blob, err := NewBlob(shareVersion, sequence[0].Namespace.Bytes(), data) + if err != nil { + return nil, err + } + blob.index = p.index + return blob, nil +} + +// skipPadding skips first share in the range if this share is the Padding share. +func (p *parser) skipPadding(shares []shares.Share) ([]shares.Share, error) { + if len(shares) == 0 { + return nil, errEmptyShares + } + + isPadding, err := shares[0].IsPadding() + if err != nil { + return nil, err + } + + if !isPadding { + return shares, nil + } + + // update blob index if we are going to skip one share + p.index++ + if len(shares) > 1 { + return shares[1:], nil + } + return nil, nil +} + +func (p *parser) verify(blob *Blob) bool { + if p.verifyFn == nil { + return false + } + return p.verifyFn(blob) +} + +func (p *parser) isEmpty() bool { + return p.index == 0 && p.length == 0 && len(p.shares) == 0 +} + +func (p *parser) reset() { + p.index = 0 + p.length = 0 + p.shares = nil +} diff --git a/blob/service.go b/blob/service.go index fc1d630e62..7445c05054 100644 --- a/blob/service.go +++ b/blob/service.go @@ -111,12 +111,27 @@ func (s *Service) Submit(ctx context.Context, blobs []*Blob, gasPrice GasPrice) } // Get retrieves all the blobs for given namespaces at the given height by commitment. -func (s *Service) Get(ctx context.Context, height uint64, ns share.Namespace, commitment Commitment) (*Blob, error) { - blob, _, err := s.getByCommitment(ctx, height, ns, commitment) - if err != nil { - return nil, err - } - return blob, nil +func (s *Service) Get( + ctx context.Context, + height uint64, + namespace share.Namespace, + commitment Commitment, +) (blob *Blob, err error) { + ctx, span := tracer.Start(ctx, "get") + defer func() { + utils.SetStatusAndEnd(span, err) + }() + span.SetAttributes( + attribute.Int64("height", int64(height)), + attribute.String("namespace", namespace.String()), + ) + + sharesParser := &parser{verifyFn: func(blob *Blob) bool { + return blob.compareCommitments(commitment) + }} + + blob, _, err = s.retrieve(ctx, height, namespace, sharesParser) + return } // GetProof retrieves all blobs in the given namespaces at the given height by commitment @@ -126,11 +141,21 @@ func (s *Service) GetProof( height uint64, namespace share.Namespace, commitment Commitment, -) (*Proof, error) { - _, proof, err := s.getByCommitment(ctx, height, namespace, commitment) - if err != nil { - return nil, err - } +) (proof *Proof, err error) { + ctx, span := tracer.Start(ctx, "get-proof") + defer func() { + utils.SetStatusAndEnd(span, err) + }() + span.SetAttributes( + attribute.Int64("height", int64(height)), + attribute.String("namespace", namespace.String()), + ) + + sharesParser := &parser{verifyFn: func(blob *Blob) bool { + return blob.compareCommitments(commitment) + }} + + _, proof, err = s.retrieve(ctx, height, namespace, sharesParser) return proof, nil } @@ -147,8 +172,8 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share. resultErr = make([]error, len(namespaces)) ) - for _, ns := range namespaces { - log.Debugw("performing GetAll request", "namespace", ns.String(), "height", height) + for _, namespace := range namespaces { + log.Debugw("performing GetAll request", "namespace", namespace.String(), "height", height) } wg := sync.WaitGroup{} @@ -190,12 +215,17 @@ func (s *Service) Included( height uint64, namespace share.Namespace, proof *Proof, - com Commitment, + commitment Commitment, ) (_ bool, err error) { ctx, span := tracer.Start(ctx, "included") defer func() { utils.SetStatusAndEnd(span, err) }() + span.SetAttributes( + attribute.Int64("height", int64(height)), + attribute.String("namespace", namespace.String()), + ) + // In the current implementation, LNs will have to download all shares to recompute the commitment. // To achieve 1. we need to modify Proof structure and to store all subtree roots, that were // involved in commitment creation and then call `merkle.HashFromByteSlices`(tendermint package). @@ -205,7 +235,10 @@ func (s *Service) Included( // but we have to guarantee that all our stored subtree roots will be on the same height(e.g. one // level above shares). // TODO(@vgonkivs): rework the implementation to perform all verification without network requests. - _, resProof, err := s.getByCommitment(ctx, height, namespace, com) + sharesParser := &parser{verifyFn: func(blob *Blob) bool { + return blob.compareCommitments(commitment) + }} + _, resProof, err := s.retrieve(ctx, height, namespace, sharesParser) switch err { case nil: case ErrBlobNotFound: @@ -216,27 +249,19 @@ func (s *Service) Included( return true, resProof.equal(*proof) } -// getByCommitment retrieves the DAH row by row, fetching shares and constructing blobs in order to -// compare Commitments. Retrieving is stopped once the requested blob/proof is found. -func (s *Service) getByCommitment( +// retrieve retrieves blobs and their proofs by requesting the whole namespace and +// comparing Commitments with each blob. +// Retrieving is stopped once the requested blob/proof is found. +func (s *Service) retrieve( ctx context.Context, height uint64, namespace share.Namespace, - commitment Commitment, + sharesParser *parser, ) (_ *Blob, _ *Proof, err error) { log.Infow("requesting blob", "height", height, "namespace", namespace.String()) - ctx, span := tracer.Start(ctx, "get-by-commitment") - defer func() { - utils.SetStatusAndEnd(span, err) - }() - span.SetAttributes( - attribute.Int64("height", int64(height)), - attribute.String("commitment", string(commitment)), - ) - getCtx, headerGetterSpan := tracer.Start(ctx, "header-getter") header, err := s.headerGetter(getCtx, height) @@ -249,6 +274,14 @@ func (s *Service) getByCommitment( headerGetterSpan.AddEvent("received eds", trace.WithAttributes( attribute.Int64("eds-size", int64(len(header.DAH.RowRoots))))) + rowIndex := -1 + for i, row := range header.DAH.RowRoots { + if !namespace.IsOutsideRange(row, row) { + rowIndex = i + break + } + } + getCtx, getSharesSpan := tracer.Start(ctx, "get-shares-by-namespace") namespacedShares, err := s.shareGetter.GetSharesByNamespace(getCtx, header, namespace) @@ -265,10 +298,8 @@ func (s *Service) getByCommitment( attribute.Int64("eds-size", int64(len(header.DAH.RowRoots))))) var ( - rawShares = make([]shares.Share, 0) + appShares = make([]shares.Share, 0) proofs = make(Proof, 0) - // spansMultipleRows specifies whether blob is expanded into multiple rows - spansMultipleRows bool ) for _, row := range namespacedShares { @@ -279,47 +310,81 @@ func (s *Service) getByCommitment( return nil, nil, ErrBlobNotFound } - appShares, err := toAppShares(row.Shares...) + appShares, err = toAppShares(row.Shares...) if err != nil { return nil, nil, err } - rawShares = append(rawShares, appShares...) + proofs = append(proofs, row.Proof) + index := row.Proof.Start() + + for { + var ( + isComplete bool + shrs []shares.Share + wasEmpty = sharesParser.isEmpty() + ) + + if wasEmpty { + // create a parser if it is empty + shrs, err = sharesParser.set(rowIndex*len(header.DAH.RowRoots)+index, appShares) + if err != nil { + if errors.Is(err, errEmptyShares) { + appShares = nil + break + } + return nil, nil, err + } + + if len(appShares) != len(shrs) { + // update index and shares if a padding share was detected. + index += len(appShares) - len(shrs) + appShares = shrs + } + } - var blobs []*Blob - blobs, rawShares, err = buildBlobsIfExist(rawShares) - if err != nil { - return nil, nil, err - } - for _, b := range blobs { - if b.Commitment.Equal(commitment) { - span.AddEvent("blob reconstructed") - return b, &proofs, nil + shrs, isComplete = sharesParser.addShares(appShares) + if !isComplete { + appShares = nil + break } - // Falling under this flag means that the data from the last row - // was insufficient to create a complete blob. As a result, - // the first blob received spans two rows and includes proofs - // for both of these rows. All other blobs in the result will relate - // to the current row and have a single proof. - if spansMultipleRows { - spansMultipleRows = false - // leave proof only for the current row + + blob, err := sharesParser.parse() + if err != nil { + return nil, nil, err + } + + if sharesParser.verify(blob) { + return blob, &proofs, nil + } + + index += len(appShares) - len(shrs) + appShares = shrs + sharesParser.reset() + + if !wasEmpty { + // remove proofs for prev rows if verified blob spans multiple rows proofs = proofs[len(proofs)-1:] } } - if len(rawShares) > 0 { - spansMultipleRows = true - continue + rowIndex++ + if sharesParser.isEmpty() { + proofs = nil } - proofs = nil } err = ErrBlobNotFound - if len(rawShares) > 0 { - err = fmt.Errorf("incomplete blob with the "+ - "namespace: %s detected at %d: %w", namespace.String(), height, err) - log.Error(err) + for _, sh := range appShares { + ok, err := sh.IsPadding() + if err != nil { + return nil, nil, err + } + if !ok { + err = fmt.Errorf("incomplete blob with the "+ + "namespace: %s detected at %d: %w", namespace.String(), height, err) + log.Error(err) + } } return nil, nil, err } @@ -332,34 +397,24 @@ func (s *Service) getBlobs( header *header.ExtendedHeader, ) (_ []*Blob, err error) { ctx, span := tracer.Start(ctx, "get-blobs") + span.SetAttributes( + attribute.Int64("height", int64(header.Height())), + attribute.String("namespace", namespace.String()), + ) defer func() { utils.SetStatusAndEnd(span, err) }() - namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header, namespace) - if err != nil { - return nil, err - } - return SharesToBlobs(namespacedShares.Flatten()) -} -// toAppShares converts node's raw shares to the app shares, skipping padding -func toAppShares(shrs ...share.Share) ([]shares.Share, error) { - appShrs := make([]shares.Share, 0, len(shrs)) - for _, shr := range shrs { - bShare, err := shares.NewShare(shr) - if err != nil { - return nil, err - } - - ok, err := bShare.IsPadding() - if err != nil { - return nil, err - } - if ok { - continue - } + blobs := make([]*Blob, 0) + verifyFn := func(blob *Blob) bool { + blobs = append(blobs, blob) + return false + } + sharesParser := &parser{verifyFn: verifyFn} - appShrs = append(appShrs, *bShare) + _, _, err = s.retrieve(ctx, header.Height(), namespace, sharesParser) + if len(blobs) == 0 { + return nil, ErrBlobNotFound } - return appShrs, nil + return blobs, nil } diff --git a/blob/service_test.go b/blob/service_test.go index 3e22f887af..4bda8d993b 100644 --- a/blob/service_test.go +++ b/blob/service_test.go @@ -5,6 +5,8 @@ import ( "context" "crypto/sha256" "encoding/json" + "fmt" + "sort" "testing" "time" @@ -72,8 +74,7 @@ func TestBlobService_Get(t *testing.T) { { name: "get all with the same namespace", doFn: func() (interface{}, error) { - b, err := service.GetAll(ctx, 1, []share.Namespace{blobs1[0].Namespace()}) - return b, err + return service.GetAll(ctx, 1, []share.Namespace{blobs1[0].Namespace()}) }, expectedResult: func(res interface{}, err error) { require.NoError(t, err) @@ -85,7 +86,47 @@ func TestBlobService_Get(t *testing.T) { assert.Len(t, blobs, 2) for i := range blobs1 { - bytes.Equal(blobs1[i].Commitment, blobs[i].Commitment) + require.Equal(t, blobs1[i].Commitment, blobs[i].Commitment) + } + }, + }, + { + name: "verify indexes", + doFn: func() (interface{}, error) { + b0, err := service.Get(ctx, 1, blobs0[0].Namespace(), blobs0[0].Commitment) + require.NoError(t, err) + b1, err := service.Get(ctx, 1, blobs0[1].Namespace(), blobs0[1].Commitment) + require.NoError(t, err) + b23, err := service.GetAll(ctx, 1, []share.Namespace{blobs1[0].Namespace()}) + require.NoError(t, err) + return []*Blob{b0, b1, b23[0], b23[1]}, nil + }, + expectedResult: func(res interface{}, err error) { + require.NoError(t, err) + blobs, ok := res.([]*Blob) + assert.True(t, ok) + assert.NotEmpty(t, blobs) + assert.Len(t, blobs, 4) + + sort.Slice(blobs, func(i, j int) bool { + val := bytes.Compare(blobs[i].NamespaceId, blobs[j].NamespaceId) + return val < 0 + }) + + h, err := service.headerGetter(ctx, 1) + require.NoError(t, err) + + resultShares, err := BlobsToShares(blobs...) + require.NoError(t, err) + shareOffset := 0 + for i := range blobs { + row, col := calculateIndex(len(h.DAH.RowRoots), blobs[i].index) + sh, err := service.shareGetter.GetShare(ctx, h, row, col) + require.NoError(t, err) + require.True(t, bytes.Equal(sh, resultShares[shareOffset]), + fmt.Sprintf("issue on %d attempt. ROW:%d, COL: %d, blobIndex:%d", i, row, col, blobs[i].index), + ) + shareOffset += shares.SparseSharesNeeded(uint32(len(blobs[i].Data))) } }, }, @@ -300,7 +341,7 @@ func TestBlobService_Get(t *testing.T) { // But to satisfy the rule of eds creating, padding namespace share is placed between // blobs. Test ensures that blob service will skip padding share and return the correct blob. func TestService_GetSingleBlobWithoutPadding(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) t.Cleanup(cancel) appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5}, true) @@ -342,6 +383,14 @@ func TestService_GetSingleBlobWithoutPadding(t *testing.T) { newBlob, err := service.Get(ctx, 1, blobs[1].Namespace(), blobs[1].Commitment) require.NoError(t, err) assert.Equal(t, newBlob.Commitment, blobs[1].Commitment) + + resultShares, err := BlobsToShares(newBlob) + require.NoError(t, err) + row, col := calculateIndex(len(h.DAH.RowRoots), newBlob.index) + sh, err := service.shareGetter.GetShare(ctx, h, row, col) + require.NoError(t, err) + + assert.Equal(t, sh, resultShares[0]) } func TestService_Get(t *testing.T) { @@ -356,10 +405,25 @@ func TestService_Get(t *testing.T) { require.NoError(t, err) service := createService(ctx, t, blobs) - for _, blob := range blobs { + + h, err := service.headerGetter(ctx, 1) + require.NoError(t, err) + + resultShares, err := BlobsToShares(blobs...) + require.NoError(t, err) + shareOffset := 0 + + for i, blob := range blobs { b, err := service.Get(ctx, 1, blob.Namespace(), blob.Commitment) require.NoError(t, err) assert.Equal(t, b.Commitment, blob.Commitment) + + row, col := calculateIndex(len(h.DAH.RowRoots), b.index) + sh, err := service.shareGetter.GetShare(ctx, h, row, col) + require.NoError(t, err) + + assert.Equal(t, sh, resultShares[shareOffset], fmt.Sprintf("issue on %d attempt", i)) + shareOffset += shares.SparseSharesNeeded(uint32(len(blob.Data))) } } @@ -410,11 +474,27 @@ func TestService_GetAllWithoutPadding(t *testing.T) { service := NewService(nil, getters.NewIPLDGetter(bs), fn) - _, err = service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()}) + blobs, err = service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()}) + require.NoError(t, err) + + resultShares, err := BlobsToShares(blobs...) require.NoError(t, err) + sort.Slice(blobs, func(i, j int) bool { + val := bytes.Compare(blobs[i].NamespaceId, blobs[j].NamespaceId) + return val < 0 + }) + shareOffset := 0 + for _, blob := range blobs { + row, col := calculateIndex(len(h.DAH.RowRoots), blob.index) + sh, err := service.shareGetter.GetShare(ctx, h, row, col) + require.NoError(t, err) + + assert.Equal(t, sh, resultShares[shareOffset]) + shareOffset += shares.SparseSharesNeeded(uint32(len(blob.Data))) + } } -// BenchmarkGetByCommitment-12 3139 380827 ns/op 701647 B/op 4990 allocs/op +// BenchmarkGetByCommitment-12 1869 571663 ns/op 1085371 B/op 6414 allocs/op func BenchmarkGetByCommitment(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) b.Cleanup(cancel) @@ -425,11 +505,17 @@ func BenchmarkGetByCommitment(b *testing.B) { require.NoError(b, err) service := createService(ctx, b, blobs) + indexer := &parser{} b.ResetTimer() for i := 0; i < b.N; i++ { b.ReportAllocs() - _, _, err = service.getByCommitment( - ctx, 1, blobs[1].Namespace(), blobs[1].Commitment, + indexer.reset() + indexer.verifyFn = func(blob *Blob) bool { + return blob.compareCommitments(blobs[1].Commitment) + } + + _, _, err = service.retrieve( + ctx, 1, blobs[1].Namespace(), indexer, ) require.NoError(b, err) } diff --git a/nodebuilder/blob/cmd/blob.go b/nodebuilder/blob/cmd/blob.go index 25a102843b..78dd57f250 100644 --- a/nodebuilder/blob/cmd/blob.go +++ b/nodebuilder/blob/cmd/blob.go @@ -272,6 +272,7 @@ func formatData(data interface{}) interface{} { Data string `json:"data"` ShareVersion uint32 `json:"share_version"` Commitment []byte `json:"commitment"` + Index int `json:"index"` } if reflect.TypeOf(data).Kind() == reflect.Slice { @@ -283,6 +284,7 @@ func formatData(data interface{}) interface{} { Data: string(b.Data), ShareVersion: b.ShareVersion, Commitment: b.Commitment, + Index: b.Index(), } } return result @@ -294,5 +296,6 @@ func formatData(data interface{}) interface{} { Data: string(b.Data), ShareVersion: b.ShareVersion, Commitment: b.Commitment, + Index: b.Index(), } } diff --git a/nodebuilder/tests/blob_test.go b/nodebuilder/tests/blob_test.go index d0aeefd568..7eb225a14a 100644 --- a/nodebuilder/tests/blob_test.go +++ b/nodebuilder/tests/blob_test.go @@ -79,7 +79,7 @@ func TestBlobModule(t *testing.T) { time.Sleep(time.Second) blob1, err := fullClient.Blob.Get(ctx, height, blobs[0].Namespace(), blobs[0].Commitment) require.NoError(t, err) - require.Equal(t, blobs[0], blob1) + require.Equal(t, blobs[0].Commitment, blob1.Commitment) }, }, { @@ -151,7 +151,7 @@ func TestBlobModule(t *testing.T) { b0, err := fullClient.Blob.Get(ctx, height, b.Namespace(), b.Commitment) require.NoError(t, err) - require.Equal(t, b, b0) + require.Equal(t, b.Commitment, b0.Commitment) // give some time to store the data, // otherwise the test will hang on the IPLD level. @@ -180,7 +180,7 @@ func TestBlobModule(t *testing.T) { b0, err := fullClient.Blob.Get(ctx, h, blobs[0].Namespace(), blobs[0].Commitment) require.NoError(t, err) - require.Equal(t, blobs[0], b0) + require.Equal(t, blobs[0].Commitment, b0.Commitment) // give some time to store the data, // otherwise the test will hang on the IPLD level.