diff --git a/blob/service.go b/blob/service.go index 79e7dd7937..45be7db334 100644 --- a/blob/service.go +++ b/blob/service.go @@ -30,6 +30,33 @@ type Submitter interface { SubmitPayForBlob(ctx context.Context, fee math.Int, gasLim uint64, blobs []*Blob) (*types.TxResponse, error) } +// BlobsByNamespace - helper type to provide map of blob under namespaces +type BlobsByNamespace map[*share.Namespace][]*Blob + +// Add - adding blob to map in a simple way +func (bb BlobsByNamespace) Add(namespace *share.Namespace, blob ...*Blob) { + val, exists := bb[namespace] + if !exists { + bb[namespace] = make([]*Blob, 0) + bb[namespace] = append(bb[namespace], blob...) + return + } + bb[namespace] = append(val, blob...) +} + +// BlobsSubscription - contains map of blobs and height +type BlobsSubscription struct { + height uint64 + blobsByNamespace BlobsByNamespace +} + +// BlobsError - signal error if something happens in subscription +type BlobsError struct { + height uint64 + nameSpace *share.Namespace + err error +} + type Service struct { // accessor dials the given celestia-core endpoint to submit blobs. blobSubmitter Submitter @@ -37,17 +64,21 @@ type Service struct { shareGetter share.Getter // headerGetter fetches header by the provided height headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error) + // headerSubscribe returns header Subscribe channel + headerSubscribe func(context.Context) (<-chan *header.ExtendedHeader, error) } func NewService( submitter Submitter, getter share.Getter, headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error), + headerSubscribe func(context.Context) (<-chan *header.ExtendedHeader, error), ) *Service { return &Service{ - blobSubmitter: submitter, - shareGetter: getter, - headerGetter: headerGetter, + blobSubmitter: submitter, + shareGetter: getter, + headerGetter: headerGetter, + headerSubscribe: headerSubscribe, } } @@ -186,6 +217,55 @@ func (s *Service) Included( return true, resProof.equal(*proof) } +// Subscribe returns all blobs under the given namespaces at subscrubed heigh. +// Subscribe can return map of blobs and an error in case if some requests failed. +func (s *Service) Subscribe(ctx context.Context, namespaces []share.Namespace) (<-chan BlobsSubscription, <-chan BlobsError, error) { + headerChan, err := s.headerSubscribe(ctx) + if err != nil { + return nil, nil, err + } + + blobChan := make(chan BlobsSubscription) + blobErr := make(chan BlobsError) + go func() { + defer close(blobChan) + defer close(blobErr) + for { + select { + case head := <-headerChan: + wg := sync.WaitGroup{} + wg.Add(len(namespaces)) + + mu := new(sync.Mutex) + blobsByName := BlobsByNamespace{} + for i, namespace := range namespaces { + go func(i int, namespace share.Namespace) { + defer wg.Done() + blobs, err := s.getBlobs(ctx, namespace, head) + if err != nil { + log.Debugw("error getting blobs", "namespace", namespace.String(), "height", head.Height()) + blobErr <- BlobsError{height: head.Height(), nameSpace: &namespace, err: err} + return + } + + mu.Lock() + defer mu.Unlock() + blobsByName.Add(&namespace, blobs...) + }(i, namespace) + } + wg.Wait() + + blobSub := BlobsSubscription{height: head.Height(), blobsByNamespace: blobsByName} + blobChan <- blobSub + case <-ctx.Done(): + return + } + } + }() + + return blobChan, blobErr, nil +} + // getByCommitment retrieves the DAH row by row, fetching shares and constructing blobs in order to // compare Commitments. Retrieving is stopped once the requested blob/proof is found. func (s *Service) getByCommitment( diff --git a/blob/service_test.go b/blob/service_test.go index 6777084eb4..1625f65c0d 100644 --- a/blob/service_test.go +++ b/blob/service_test.go @@ -337,7 +337,11 @@ func TestService_GetSingleBlobWithoutPadding(t *testing.T) { fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { return headerStore.GetByHeight(ctx, height) } - service := NewService(nil, getters.NewIPLDGetter(bs), fn) + dummyFnSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + return nil, nil + } + + service := NewService(nil, getters.NewIPLDGetter(bs), fn, dummyFnSub) newBlob, err := service.Get(ctx, 1, blobs[1].Namespace(), blobs[1].Commitment) require.NoError(t, err) @@ -407,13 +411,84 @@ func TestService_GetAllWithoutPadding(t *testing.T) { fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { return headerStore.GetByHeight(ctx, height) } + dummyFnSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + return nil, nil + } - service := NewService(nil, getters.NewIPLDGetter(bs), fn) + service := NewService(nil, getters.NewIPLDGetter(bs), fn, dummyFnSub) _, err = service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()}) require.NoError(t, err) } +func TestBlobService_Subscribe(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5}, true) + require.NoError(t, err) + blobs, err := convertBlobs(appBlob...) + require.NoError(t, err) + + ns1, ns2 := blobs[0].Namespace().ToAppNamespace(), blobs[1].Namespace().ToAppNamespace() + + padding0, err := shares.NamespacePaddingShare(ns1, appconsts.ShareVersionZero) + require.NoError(t, err) + padding1, err := shares.NamespacePaddingShare(ns2, appconsts.ShareVersionZero) + require.NoError(t, err) + rawShares0, err := BlobsToShares(blobs[0]) + require.NoError(t, err) + rawShares1, err := BlobsToShares(blobs[1]) + require.NoError(t, err) + rawShares := make([][]byte, 0) + + // create shares in correct order with padding shares + if bytes.Compare(blobs[0].Namespace(), blobs[1].Namespace()) <= 0 { + rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...) + rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...) + } else { + rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...) + rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...) + } + + bs := ipld.NewMemBlockservice() + batching := ds_sync.MutexWrap(ds.NewMapDatastore()) + headerStore, err := store.NewStore[*header.ExtendedHeader](batching) + require.NoError(t, err) + eds, err := ipld.AddShares(ctx, rawShares, bs) + require.NoError(t, err) + + h := headertest.ExtendedHeaderFromEDS(t, 1, eds) + err = headerStore.Init(ctx, h) + require.NoError(t, err) + + chanHead := make(chan *header.ExtendedHeader) + fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + return headerStore.GetByHeight(ctx, height) + } + dummyFnSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + return chanHead, nil + } + + service := NewService(nil, getters.NewIPLDGetter(bs), fn, dummyFnSub) + + res, _, err := service.Subscribe(ctx, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()}) + require.NoError(t, err) + + for i := 0; i < 1_000; i++ { + go func() { + chanHead <- h + }() + } + + counter := 0 + for received := range res { + require.Len(t, received.blobsByNamespace, 2) + counter += 1 + } + require.Equal(t, counter, 1_000) +} + func createService(ctx context.Context, t *testing.T, blobs []*Blob) *Service { bs := ipld.NewMemBlockservice() batching := ds_sync.MutexWrap(ds.NewMapDatastore()) @@ -431,5 +506,10 @@ func createService(ctx context.Context, t *testing.T, blobs []*Blob) *Service { fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { return headerStore.GetByHeight(ctx, height) } - return NewService(nil, getters.NewIPLDGetter(bs), fn) + + dummyFnSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + return nil, nil + } + + return NewService(nil, getters.NewIPLDGetter(bs), fn, dummyFnSub) } diff --git a/nodebuilder/blob/blob.go b/nodebuilder/blob/blob.go index 5e29d3b90c..368c3190c1 100644 --- a/nodebuilder/blob/blob.go +++ b/nodebuilder/blob/blob.go @@ -26,15 +26,18 @@ type Module interface { // Included checks whether a blob's given commitment(Merkle subtree root) is included at // given height and under the namespace. Included(_ context.Context, height uint64, _ share.Namespace, _ *blob.Proof, _ blob.Commitment) (bool, error) + // Subscribe will subscribe to most recent blocks under a target namespaces. + Subscribe(_ context.Context, _ []share.Namespace) (<-chan blob.BlobsSubscription, <-chan blob.BlobsError, error) } type API struct { Internal struct { - Submit func(context.Context, []*blob.Blob, *blob.SubmitOptions) (uint64, error) `perm:"write"` - Get func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Blob, error) `perm:"read"` - GetAll func(context.Context, uint64, []share.Namespace) ([]*blob.Blob, error) `perm:"read"` - GetProof func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Proof, error) `perm:"read"` - Included func(context.Context, uint64, share.Namespace, *blob.Proof, blob.Commitment) (bool, error) `perm:"read"` + Submit func(context.Context, []*blob.Blob, *blob.SubmitOptions) (uint64, error) `perm:"write"` + Get func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Blob, error) `perm:"read"` + GetAll func(context.Context, uint64, []share.Namespace) ([]*blob.Blob, error) `perm:"read"` + GetProof func(context.Context, uint64, share.Namespace, blob.Commitment) (*blob.Proof, error) `perm:"read"` + Included func(context.Context, uint64, share.Namespace, *blob.Proof, blob.Commitment) (bool, error) `perm:"read"` + Subscribe func(context.Context, []share.Namespace) (<-chan blob.BlobsSubscription, <-chan blob.BlobsError, error) `perm:"read"` } } @@ -73,3 +76,10 @@ func (api *API) Included( ) (bool, error) { return api.Internal.Included(ctx, height, namespace, proof, commitment) } + +func (api *API) Subscribe( + ctx context.Context, + namespaces []share.Namespace, +) (<-chan blob.BlobsSubscription, <-chan blob.BlobsError, error) { + return api.Internal.Subscribe(ctx, namespaces) +} diff --git a/nodebuilder/blob/module.go b/nodebuilder/blob/module.go index 76e7677725..82d1a4b795 100644 --- a/nodebuilder/blob/module.go +++ b/nodebuilder/blob/module.go @@ -18,11 +18,16 @@ func ConstructModule() fx.Option { func(service headerService.Module) func(context.Context, uint64) (*header.ExtendedHeader, error) { return service.GetByHeight }), + fx.Provide( + func(service headerService.Module) func(context.Context) (<-chan *header.ExtendedHeader, error) { + return service.Subscribe + }), fx.Provide(func( state *state.CoreAccessor, sGetter share.Getter, getByHeightFn func(context.Context, uint64) (*header.ExtendedHeader, error), + headerSubscribFn func(ctx context.Context) (<-chan *header.ExtendedHeader, error), ) Module { - return blob.NewService(state, sGetter, getByHeightFn) + return blob.NewService(state, sGetter, getByHeightFn, headerSubscribFn) })) }