diff --git a/nodebuilder/das/daser.go b/nodebuilder/das/daser.go index 0964fb86e2..81f3e17fc3 100644 --- a/nodebuilder/das/daser.go +++ b/nodebuilder/das/daser.go @@ -4,8 +4,8 @@ import ( "github.com/ipfs/go-datastore" "github.com/celestiaorg/celestia-node/das" + "github.com/celestiaorg/celestia-node/fraud" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/share" ) @@ -14,7 +14,7 @@ func NewDASer( hsub header.Subscriber, store header.Store, batching datastore.Batching, - fraudService fraud.Module, + fraudService fraud.Service, options ...das.Option, ) (*das.DASer, error) { return das.NewDASer(da, hsub, store, batching, fraudService, options...) diff --git a/nodebuilder/das/module.go b/nodebuilder/das/module.go index 40d8750a6d..548182de19 100644 --- a/nodebuilder/das/module.go +++ b/nodebuilder/das/module.go @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, fx.Provide(fx.Annotate( NewDASer, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, das *das.DASer) error { + fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, das *das.DASer) error { return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice, das.Start, das.Stop) }), diff --git a/nodebuilder/fraud/constructors.go b/nodebuilder/fraud/constructors.go new file mode 100644 index 0000000000..5670303286 --- /dev/null +++ b/nodebuilder/fraud/constructors.go @@ -0,0 +1,87 @@ +package fraud + +import ( + "context" + + "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p-core/host" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/fraud" + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" +) + +// NewModule constructs a fraud proof service with the syncer disabled. +func NewModule( + lc fx.Lifecycle, + sub *pubsub.PubSub, + host host.Host, + hstore header.Store, + ds datastore.Batching, + network p2p.Network, +) (Module, fraud.Service, error) { + return newFraudService(lc, sub, host, hstore, ds, false, string(network)) +} + +// ModuleWithSyncer constructs fraud proof service with enabled syncer. +func ModuleWithSyncer( + lc fx.Lifecycle, + sub *pubsub.PubSub, + host host.Host, + hstore header.Store, + ds datastore.Batching, + network p2p.Network, +) (Module, fraud.Service, error) { + return newFraudService(lc, sub, host, hstore, ds, true, string(network)) +} + +func newFraudService( + lc fx.Lifecycle, + sub *pubsub.PubSub, + host host.Host, + hstore header.Store, + ds datastore.Batching, + isEnabled bool, + protocolSuffix string, +) (Module, fraud.Service, error) { + pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled, protocolSuffix) + lc.Append(fx.Hook{ + OnStart: pservice.Start, + OnStop: pservice.Stop, + }) + return &Service{ + Service: pservice, + }, pservice, nil +} + +// Lifecycle controls the lifecycle of service depending on fraud proofs. +// It starts the service only if no fraud-proof exists and stops the service automatically +// if a proof arrives after the service was started. +func Lifecycle( + startCtx, lifecycleCtx context.Context, + p fraud.ProofType, + fraudModule fraud.Service, + start, stop func(context.Context) error, +) error { + proofs, err := fraudModule.Get(startCtx, p) + switch err { + default: + return err + case nil: + return &fraud.ErrFraudExists{Proof: proofs} + case datastore.ErrNotFound: + } + err = start(startCtx) + if err != nil { + return err + } + // handle incoming Fraud Proofs + go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) { + if err := stop(lifecycleCtx); err != nil { + log.Error(err) + } + }) + return nil +} diff --git a/nodebuilder/fraud/fraud.go b/nodebuilder/fraud/fraud.go index 3aec12514c..b02b173430 100644 --- a/nodebuilder/fraud/fraud.go +++ b/nodebuilder/fraud/fraud.go @@ -3,83 +3,23 @@ package fraud import ( "context" - "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p-core/host" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "go.uber.org/fx" - "github.com/celestiaorg/celestia-node/fraud" - "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/nodebuilder/p2p" ) -// NewModule constructs a fraud proof service with the syncer disabled. -func NewModule( - lc fx.Lifecycle, - sub *pubsub.PubSub, - host host.Host, - hstore header.Store, - ds datastore.Batching, - network p2p.Network, -) (Module, error) { - return newFraudService(lc, sub, host, hstore, ds, false, string(network)) -} - -// ModuleWithSyncer constructs fraud proof service with enabled syncer. -func ModuleWithSyncer( - lc fx.Lifecycle, - sub *pubsub.PubSub, - host host.Host, - hstore header.Store, - ds datastore.Batching, - network p2p.Network, -) (Module, error) { - return newFraudService(lc, sub, host, hstore, ds, true, string(network)) -} - -func newFraudService( - lc fx.Lifecycle, - sub *pubsub.PubSub, - host host.Host, - hstore header.Store, - ds datastore.Batching, - isEnabled bool, - protocolSuffix string, -) (Module, error) { - pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled, protocolSuffix) - lc.Append(fx.Hook{ - OnStart: pservice.Start, - OnStop: pservice.Stop, - }) - return pservice, nil +// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the network. +// Any method signature changed here needs to also be changed in the API struct. +type Module interface { + // Get fetches fraud proofs from the disk by its type. + Get(context.Context, fraud.ProofType) ([]Proof, error) + // Subscribe allows to subscribe on a Proof pub sub topic by its type. + Subscribe(context.Context, fraud.ProofType) (chan Proof, error) } -// Lifecycle controls the lifecycle of service depending on fraud proofs. -// It starts the service only if no fraud-proof exists and stops the service automatically -// if a proof arrives after the service was started. -func Lifecycle( - startCtx, lifecycleCtx context.Context, - p fraud.ProofType, - fraudModule Module, - start, stop func(context.Context) error, -) error { - proofs, err := fraudModule.Get(startCtx, p) - switch err { - default: - return err - case nil: - return &fraud.ErrFraudExists{Proof: proofs} - case datastore.ErrNotFound: - } - err = start(startCtx) - if err != nil { - return err - } - // handle incoming Fraud Proofs - go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) { - if err := stop(lifecycleCtx); err != nil { - log.Error(err) - } - }) - return nil +// API is a wrapper around Module for the RPC. +// TODO(@distractedm1nd): These structs need to be autogenerated. +// +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . Module +type API struct { + Subscribe func(context.Context, fraud.ProofType) (chan Proof, error) + Get func(context.Context, fraud.ProofType) ([]Proof, error) } diff --git a/nodebuilder/fraud/mocks/api.go b/nodebuilder/fraud/mocks/api.go index 2b41464644..3f026b42ab 100644 --- a/nodebuilder/fraud/mocks/api.go +++ b/nodebuilder/fraud/mocks/api.go @@ -9,6 +9,7 @@ import ( reflect "reflect" fraud "github.com/celestiaorg/celestia-node/fraud" + fraud0 "github.com/celestiaorg/celestia-node/nodebuilder/fraud" gomock "github.com/golang/mock/gomock" ) @@ -35,25 +36,11 @@ func (m *MockModule) EXPECT() *MockModuleMockRecorder { return m.recorder } -// Broadcast mocks base method. -func (m *MockModule) Broadcast(arg0 context.Context, arg1 fraud.Proof) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// Broadcast indicates an expected call of Broadcast. -func (mr *MockModuleMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockModule)(nil).Broadcast), arg0, arg1) -} - // Get mocks base method. -func (m *MockModule) Get(arg0 context.Context, arg1 fraud.ProofType) ([]fraud.Proof, error) { +func (m *MockModule) Get(arg0 context.Context, arg1 fraud.ProofType) ([]fraud0.Proof, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", arg0, arg1) - ret0, _ := ret[0].([]fraud.Proof) + ret0, _ := ret[0].([]fraud0.Proof) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -65,16 +52,16 @@ func (mr *MockModuleMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { } // Subscribe mocks base method. -func (m *MockModule) Subscribe(arg0 fraud.ProofType) (fraud.Subscription, error) { +func (m *MockModule) Subscribe(arg0 context.Context, arg1 fraud.ProofType) (chan fraud0.Proof, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Subscribe", arg0) - ret0, _ := ret[0].(fraud.Subscription) + ret := m.ctrl.Call(m, "Subscribe", arg0, arg1) + ret0, _ := ret[0].(chan fraud0.Proof) ret1, _ := ret[1].(error) return ret0, ret1 } // Subscribe indicates an expected call of Subscribe. -func (mr *MockModuleMockRecorder) Subscribe(arg0 interface{}) *gomock.Call { +func (mr *MockModuleMockRecorder) Subscribe(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockModule)(nil).Subscribe), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockModule)(nil).Subscribe), arg0, arg1) } diff --git a/nodebuilder/fraud/module.go b/nodebuilder/fraud/module.go index b115990345..bd5c1f7402 100644 --- a/nodebuilder/fraud/module.go +++ b/nodebuilder/fraud/module.go @@ -11,8 +11,8 @@ import ( var log = logging.Logger("fraud-module") func ConstructModule(tp node.Type) fx.Option { - baseComponent := fx.Provide(func(module Module) fraud.Getter { - return module + baseComponent := fx.Provide(func(serv fraud.Service) fraud.Getter { + return serv }) switch tp { case node.Light: diff --git a/nodebuilder/fraud/service.go b/nodebuilder/fraud/service.go index b4369e53d0..dd7e6fdd53 100644 --- a/nodebuilder/fraud/service.go +++ b/nodebuilder/fraud/service.go @@ -2,21 +2,77 @@ package fraud import ( "context" + "encoding/json" "github.com/celestiaorg/celestia-node/fraud" ) -// Module encompasses the behavior necessary to subscribe and broadcast fraud proofs within the network. -// Any method signature changed here needs to also be changed in the API struct. -type Module interface { +var _ Module = (*Service)(nil) + +// Service is an implementation of Module that uses fraud.Service as a backend. It is used to provide fraud proofs as a +// non-interface type to the API, and wrap fraud.Subscriber with a channel of Proofs. +type Service struct { fraud.Service } -// API is a wrapper around Module for the RPC. -// TODO(@distractedm1nd): These structs need to be autogenerated. -// -//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . Module -type API struct { - Subscribe func(fraud.ProofType) (fraud.Subscription, error) - Get func(context.Context, fraud.ProofType) ([]fraud.Proof, error) +func (s *Service) Subscribe(ctx context.Context, proofType fraud.ProofType) (chan Proof, error) { + subscription, err := s.Service.Subscribe(proofType) + if err != nil { + return nil, err + } + concreteProofs := make(chan Proof) + go func() { + proof, _ := subscription.Proof(ctx) + concreteProofs <- Proof{Proof: proof} + }() + return concreteProofs, nil +} + +func (s *Service) Get(ctx context.Context, proofType fraud.ProofType) ([]Proof, error) { + proofs, err := s.Service.Get(ctx, proofType) + if err != nil { + return nil, err + } + concreteProofs := make([]Proof, len(proofs)) + for i, proof := range proofs { + concreteProofs[i].Proof = proof + } + return concreteProofs, nil +} + +// Proof embeds the fraud.Proof interface type to provide a concrete type for JSON serialization. +type Proof struct { + fraud.Proof +} + +func (f *Proof) UnmarshalJSON(data []byte) error { + type fraudProof struct { + ProofType fraud.ProofType `json:"proof_type"` + Data []byte `json:"data"` + } + var fp fraudProof + err := json.Unmarshal(data, &fp) + if err != nil { + return err + } + f.Proof, err = fraud.Unmarshal(fp.ProofType, fp.Data) + if err != nil { + return err + } + return nil +} + +func (f *Proof) MarshalJSON() ([]byte, error) { + marshaledProof, err := f.MarshalBinary() + if err != nil { + return nil, err + } + fraudProof := &struct { + ProofType fraud.ProofType `json:"proof_type"` + Data []byte `json:"data"` + }{ + ProofType: f.Type(), + Data: marshaledProof, + } + return json.Marshal(fraudProof) } diff --git a/nodebuilder/header/module.go b/nodebuilder/header/module.go index 60f9b8fc1c..dffddf226b 100644 --- a/nodebuilder/header/module.go +++ b/nodebuilder/header/module.go @@ -42,7 +42,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { }), fx.Provide(fx.Annotate( newSyncer, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, syncer *sync.Syncer) error { + fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, syncer *sync.Syncer) error { syncerStartFunc := func(ctx context.Context) error { err := syncer.Start(ctx) switch err { diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index 99d7d1ddac..dd275a5dce 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -26,7 +26,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(Keyring), fx.Provide(fx.Annotate( CoreAccessor, - fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, ca *state.CoreAccessor) error { + fx.OnStart(func(startCtx, ctx context.Context, fservice fraud.Service, ca *state.CoreAccessor) error { return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice, ca.Start, ca.Stop) }), fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error { diff --git a/nodebuilder/tests/fraud_test.go b/nodebuilder/tests/fraud_test.go index 41f91244a2..fd851308ff 100644 --- a/nodebuilder/tests/fraud_test.go +++ b/nodebuilder/tests/fraud_test.go @@ -53,11 +53,10 @@ func TestFraudProofBroadcasting(t *testing.T) { // subscribe to fraud proof before node starts helps // to prevent flakiness when fraud proof is propagating before subscribing on it - subscr, err := full.FraudServ.Subscribe(fraud.BadEncoding) + subscr, err := full.FraudServ.Subscribe(ctx, fraud.BadEncoding) require.NoError(t, err) - p, err := subscr.Proof(ctx) - require.NoError(t, err) + p := <-subscr require.Equal(t, 20, int(p.Height())) // This is an obscure way to check if the Syncer was stopped. @@ -128,20 +127,18 @@ func TestFraudProofSyncing(t *testing.T) { require.NoError(t, full.Start(ctx)) require.NoError(t, ln.Start(ctx)) - subsFn, err := full.FraudServ.Subscribe(fraud.BadEncoding) - require.NoError(t, err) - defer subsFn.Cancel() - _, err = subsFn.Proof(ctx) + subsFn, err := full.FraudServ.Subscribe(ctx, fraud.BadEncoding) require.NoError(t, err) + <-subsFn + defer close(subsFn) // internal subscription for the fraud proof is done in order to ensure that light node // receives the BEFP. - subsLn, err := ln.FraudServ.Subscribe(fraud.BadEncoding) + subsLn, err := ln.FraudServ.Subscribe(ctx, fraud.BadEncoding) require.NoError(t, err) err = ln.Host.Connect(ctx, *host.InfoFromHost(full.Host)) require.NoError(t, err) - _, err = subsLn.Proof(ctx) - require.NoError(t, err) - subsLn.Cancel() + <-subsLn + close(subsLn) }