Skip to content

Commit

Permalink
feat(nodebuilder/state): Provide stubbed state module if a core endpo…
Browse files Browse the repository at this point in the history
…int not provided (celestiaorg#2577)

This is a pre-requisite for celestiaorg#2511

It makes a check in CoreAccessor constructor to see if a core endpoint
was provided, and returns nil CoreAccessor if not. A stubbed state
module will then be provided if no core endpoint was provided so that
errors are more readable.

Previously, node start logic relied on the fact that the grpc Dial
inside CoreAccessor.Start was non-blocking so it could silently fail
under the hood and any calls made on state Module would return errors
from the inability to reach the address that is the default for the core
config (which was confusing).

(cherry picked from commit 13e9b1f)
  • Loading branch information
renaynay authored and walldiss committed Sep 25, 2023
1 parent b365ee5 commit f6efdaf
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 25 deletions.
16 changes: 13 additions & 3 deletions nodebuilder/core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ type Config struct {
// node's connection to a Celestia-Core endpoint.
func DefaultConfig() Config {
return Config{
IP: "0.0.0.0",
RPCPort: "0",
GRPCPort: "0",
IP: "",
RPCPort: "",
GRPCPort: "",
}
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if !cfg.IsEndpointConfigured() {
return nil
}

ip, err := utils.ValidateAddr(cfg.IP)
if err != nil {
return err
Expand All @@ -41,3 +45,9 @@ func (cfg *Config) Validate() error {
}
return nil
}

// IsEndpointConfigured returns whether a core endpoint has been set
// on the config (true if set).
func (cfg *Config) IsEndpointConfigured() bool {
return cfg.IP != ""
}
8 changes: 8 additions & 0 deletions nodebuilder/fraud/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type ServiceBreaker[S service] struct {
// Start starts the inner service if there are no fraud proofs stored.
// Subscribes for fraud and stops the service whenever necessary.
func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error {
if breaker == nil {
return nil
}

proofs, err := breaker.FraudServ.Get(ctx, breaker.FraudType)
switch err {
default:
Expand All @@ -57,6 +61,10 @@ func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error {

// Stop stops the service and cancels subscription.
func (breaker *ServiceBreaker[S]) Stop(ctx context.Context) error {
if breaker == nil {
return nil
}

if breaker.ctx.Err() != nil {
// short circuit if the service was already stopped
return nil
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store
fx.Supply(signer),
// modules provided by the node
p2p.ConstructModule(tp, &cfg.P2P),
state.ConstructModule(tp, &cfg.State),
state.ConstructModule(tp, &cfg.State, &cfg.Core),
header.ConstructModule(tp, &cfg.Header),
share.ConstructModule(tp, &cfg.Share),
rpc.ConstructModule(tp, &cfg.RPC),
Expand Down
10 changes: 10 additions & 0 deletions nodebuilder/node_light_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nodebuilder

import (
"context"
"crypto/rand"
"testing"

Expand All @@ -11,6 +12,7 @@ import (

nodebuilder "github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

func TestNewLightWithP2PKey(t *testing.T) {
Expand Down Expand Up @@ -44,3 +46,11 @@ func TestLight_WithNetwork(t *testing.T) {
require.NotNil(t, node)
assert.Equal(t, p2p.Private, node.Network)
}

// TestLight_WithStubbedCoreAccessor ensures that a node started without
// a core connection will return a stubbed StateModule.
func TestLight_WithStubbedCoreAccessor(t *testing.T) {
node := TestNode(t, nodebuilder.Light)
_, err := node.StateServ.Balance(context.Background())
assert.ErrorIs(t, state.ErrNoStateAccess, err)
}
12 changes: 0 additions & 12 deletions nodebuilder/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,8 @@ func TestLifecycle(t *testing.T) {
err := node.Start(ctx)
require.NoError(t, err)

// ensure the state service is running
require.False(t, node.StateServ.IsStopped(ctx))

err = node.Stop(ctx)
require.NoError(t, err)

// ensure the state service is stopped
require.True(t, node.StateServ.IsStopped(ctx))
})
}
}
Expand Down Expand Up @@ -96,14 +90,8 @@ func TestLifecycle_WithMetrics(t *testing.T) {
err := node.Start(ctx)
require.NoError(t, err)

// ensure the state service is running
require.False(t, node.StateServ.IsStopped(ctx))

err = node.Stop(ctx)
require.NoError(t, err)

// ensure the state service is stopped
require.True(t, node.StateServ.IsStopped(ctx))
})
}
}
Expand Down
7 changes: 6 additions & 1 deletion nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(state.WithMetrics),
fx.Invoke(func(ca *state.CoreAccessor) {
if ca == nil {
return
}
state.WithMetrics(ca)
}),
fx.Invoke(fraud.WithMetrics),
fx.Invoke(node.WithMetrics),
fx.Invoke(modheader.WithMetrics),
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func coreAccessor(
signer *apptypes.KeyringSigner,
sync *sync.Syncer[*header.ExtendedHeader],
fraudServ libfraud.Service,
) (*state.CoreAccessor, *modfraud.ServiceBreaker[*state.CoreAccessor]) {
) (*state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor]) {
ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort)

return ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{
return ca, ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{
Service: ca,
FraudType: byzantine.BadEncoding,
FraudServ: fraudServ,
Expand Down
11 changes: 6 additions & 5 deletions nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/state"
Expand All @@ -15,14 +17,14 @@ var log = logging.Logger("module/state")

// ConstructModule provides all components necessary to construct the
// state service.
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()

baseComponents := fx.Options(
fx.Supply(*cfg),
fx.Error(cfgErr),
fx.Provide(fx.Annotate(
fxutil.ProvideIf(coreCfg.IsEndpointConfigured(), fx.Annotate(
coreAccessor,
fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error {
return breaker.Start(ctx)
Expand All @@ -31,9 +33,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return breaker.Stop(ctx)
}),
)),
// the module is needed for the handler
fx.Provide(func(ca *state.CoreAccessor) Module {
return ca
fxutil.ProvideIf(!coreCfg.IsEndpointConfigured(), func() (*state.CoreAccessor, Module) {
return nil, &stubbedStateModule{}
}),
)

Expand Down
116 changes: 116 additions & 0 deletions nodebuilder/state/stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package state

import (
"context"
"errors"

"github.com/cosmos/cosmos-sdk/x/staking/types"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/state"
)

var ErrNoStateAccess = errors.New("node is running without state access")

// stubbedStateModule provides a stub for the state module to return
// errors when state endpoints are accessed without a running connection
// to a core endpoint.
type stubbedStateModule struct{}

func (s stubbedStateModule) IsStopped(context.Context) bool {
return true
}

func (s stubbedStateModule) AccountAddress(context.Context) (state.Address, error) {
return state.Address{}, ErrNoStateAccess
}

func (s stubbedStateModule) Balance(context.Context) (*state.Balance, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) BalanceForAddress(
context.Context,
state.Address,
) (*state.Balance, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Transfer(
_ context.Context,
_ state.AccAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) SubmitTx(context.Context, state.Tx) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) SubmitPayForBlob(
context.Context,
state.Int,
uint64,
[]*blob.Blob,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) CancelUnbondingDelegation(
_ context.Context,
_ state.ValAddress,
_, _, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) BeginRedelegate(
_ context.Context,
_, _ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Undelegate(
_ context.Context,
_ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Delegate(
_ context.Context,
_ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryDelegation(
context.Context,
state.ValAddress,
) (*types.QueryDelegationResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryUnbonding(
context.Context,
state.ValAddress,
) (*types.QueryUnbondingDelegationResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryRedelegations(
_ context.Context,
_, _ state.ValAddress,
) (*types.QueryRedelegationsResponse, error) {
return nil, ErrNoStateAccess
}
6 changes: 5 additions & 1 deletion state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {

// dial given celestia-core endpoint
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort)
client, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
client, err := grpc.DialContext(
ctx,
endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return err
}
Expand Down

0 comments on commit f6efdaf

Please sign in to comment.