Skip to content

Commit

Permalink
fix(rpc): fraud.Proof (un)marshalling and subscription as chan
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Nov 1, 2022
1 parent 9c996a7 commit 6f75e7d
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 123 deletions.
4 changes: 2 additions & 2 deletions nodebuilder/das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/share"
)

Expand All @@ -14,7 +14,7 @@ func NewDASer(
hsub header.Subscriber,
store header.Store,
batching datastore.Batching,
fraudService fraud.Module,
fraudService fraud.Service,
options ...das.Option,
) (*das.DASer, error) {
return das.NewDASer(da, hsub, store, batching, fraudService, options...)
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/das/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Provide(fx.Annotate(
NewDASer,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, das *das.DASer) error {
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error {
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice,
das.Start, das.Stop)
}),
Expand Down
87 changes: 87 additions & 0 deletions nodebuilder/fraud/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package fraud

import (
"context"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// NewModule constructs a fraud proof service with the syncer disabled.
func NewModule(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, fraud.Service, error) {
return newFraudService(lc, sub, host, hstore, ds, false, string(network))
}

// ModuleWithSyncer constructs fraud proof service with enabled syncer.
func ModuleWithSyncer(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, fraud.Service, error) {
return newFraudService(lc, sub, host, hstore, ds, true, string(network))
}

func newFraudService(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
isEnabled bool,
protocolSuffix string,
) (Module, fraud.Service, error) {
pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled, protocolSuffix)
lc.Append(fx.Hook{
OnStart: pservice.Start,
OnStop: pservice.Stop,
})
return &Service{
Service: pservice,
}, pservice, nil
}

// Lifecycle controls the lifecycle of service depending on fraud proofs.
// It starts the service only if no fraud-proof exists and stops the service automatically
// if a proof arrives after the service was started.
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fraudModule fraud.Service,
start, stop func(context.Context) error,
) error {
proofs, err := fraudModule.Get(startCtx, p)
switch err {
default:
return err
case nil:
return &fraud.ErrFraudExists{Proof: proofs}
case datastore.ErrNotFound:
}
err = start(startCtx)
if err != nil {
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) {
if err := stop(lifecycleCtx); err != nil {
log.Error(err)
}
})
return nil
}
88 changes: 14 additions & 74 deletions nodebuilder/fraud/fraud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,23 @@ package fraud
import (
"context"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// NewModule constructs a fraud proof service with the syncer disabled.
func NewModule(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, false, string(network))
}

// ModuleWithSyncer constructs fraud proof service with enabled syncer.
func ModuleWithSyncer(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
network p2p.Network,
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, true, string(network))
}

func newFraudService(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
isEnabled bool,
protocolSuffix string,
) (Module, error) {
pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled, protocolSuffix)
lc.Append(fx.Hook{
OnStart: pservice.Start,
OnStop: pservice.Stop,
})
return pservice, nil
// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the network.
// Any method signature changed here needs to also be changed in the API struct.
type Module interface {
// Get fetches fraud proofs from the disk by its type.
Get(context.Context, fraud.ProofType) ([]Proof, error)
// Subscribe allows to subscribe on a Proof pub sub topic by its type.
Subscribe(context.Context, fraud.ProofType) (chan Proof, error)
}

// Lifecycle controls the lifecycle of service depending on fraud proofs.
// It starts the service only if no fraud-proof exists and stops the service automatically
// if a proof arrives after the service was started.
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fraudModule Module,
start, stop func(context.Context) error,
) error {
proofs, err := fraudModule.Get(startCtx, p)
switch err {
default:
return err
case nil:
return &fraud.ErrFraudExists{Proof: proofs}
case datastore.ErrNotFound:
}
err = start(startCtx)
if err != nil {
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) {
if err := stop(lifecycleCtx); err != nil {
log.Error(err)
}
})
return nil
// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
//
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . Module
type API struct {
Subscribe func(context.Context, fraud.ProofType) (chan Proof, error)
Get func(context.Context, fraud.ProofType) ([]Proof, error)
}
29 changes: 8 additions & 21 deletions nodebuilder/fraud/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nodebuilder/fraud/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
var log = logging.Logger("fraud-module")

func ConstructModule(tp node.Type) fx.Option {
baseComponent := fx.Provide(func(module Module) fraud.Getter {
return module
baseComponent := fx.Provide(func(serv fraud.Service) fraud.Getter {
return serv
})
switch tp {
case node.Light:
Expand Down
76 changes: 66 additions & 10 deletions nodebuilder/fraud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,77 @@ package fraud

import (
"context"
"encoding/json"

"github.com/celestiaorg/celestia-node/fraud"
)

// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the network.
// Any method signature changed here needs to also be changed in the API struct.
type Module interface {
var _ Module = (*Service)(nil)

// Service is an implementation of Module that uses fraud.Service as a backend. It is used to provide fraud proofs as a
// non-interface type to the API, and wrap fraud.Subscriber with a channel of Proofs.
type Service struct {
fraud.Service
}

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
//
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . Module
type API struct {
Subscribe func(fraud.ProofType) (fraud.Subscription, error)
Get func(context.Context, fraud.ProofType) ([]fraud.Proof, error)
func (s *Service) Subscribe(ctx context.Context, proofType fraud.ProofType) (chan Proof, error) {
subscription, err := s.Service.Subscribe(proofType)
if err != nil {
return nil, err
}
concreteProofs := make(chan Proof)
go func() {
proof, _ := subscription.Proof(ctx)
concreteProofs <- Proof{Proof: proof}
}()
return concreteProofs, nil
}

func (s *Service) Get(ctx context.Context, proofType fraud.ProofType) ([]Proof, error) {
proofs, err := s.Service.Get(ctx, proofType)
if err != nil {
return nil, err
}
concreteProofs := make([]Proof, len(proofs))
for i, proof := range proofs {
concreteProofs[i].Proof = proof
}
return concreteProofs, nil
}

// Proof embeds the fraud.Proof interface type to provide a concrete type for JSON serialization.
type Proof struct {
fraud.Proof
}

func (f *Proof) UnmarshalJSON(data []byte) error {
type fraudProof struct {
ProofType fraud.ProofType `json:"proof_type"`
Data []byte `json:"data"`
}
var fp fraudProof
err := json.Unmarshal(data, &fp)
if err != nil {
return err
}
f.Proof, err = fraud.Unmarshal(fp.ProofType, fp.Data)
if err != nil {
return err
}
return nil
}

func (f *Proof) MarshalJSON() ([]byte, error) {
marshaledProof, err := f.MarshalBinary()
if err != nil {
return nil, err
}
fraudProof := &struct {
ProofType fraud.ProofType `json:"proof_type"`
Data []byte `json:"data"`
}{
ProofType: f.Type(),
Data: marshaledProof,
}
return json.Marshal(fraudProof)
}
2 changes: 1 addition & 1 deletion nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}),
fx.Provide(fx.Annotate(
newSyncer,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, syncer *sync.Syncer) error {
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, syncer *sync.Syncer) error {
syncerStartFunc := func(ctx context.Context) error {
err := syncer.Start(ctx)
switch err {
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(Keyring),
fx.Provide(fx.Annotate(
CoreAccessor,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, ca *state.CoreAccessor) error {
fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, ca *state.CoreAccessor) error {
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice, ca.Start, ca.Stop)
}),
fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error {
Expand Down
Loading

0 comments on commit 6f75e7d

Please sign in to comment.