Skip to content

Commit

Permalink
fix: Make node options composable (sourcenetwork#2648)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#2642 

## Description

This PR fixes an issue where node subsystem options were not composable.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored May 24, 2024
1 parent dbc8219 commit 150eb3f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 163 deletions.
59 changes: 21 additions & 38 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,48 +99,41 @@ func MakeStartCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
cfg := mustGetContextConfig(cmd)

dbOpts := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
var peers []peer.AddrInfo
if val := cfg.GetStringSlice("net.peers"); len(val) > 0 {
addrs, err := netutils.ParsePeers(val)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

netOpts := []net.NodeOpt{
opts := []node.Option{
node.WithPath(cfg.GetString("datastore.badger.path")),
node.WithInMemory(cfg.GetString("datastore.store") == configStoreMemory),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
node.WithACPType(node.LocalACPType),
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
net.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
}

serverOpts := []http.ServerOpt{
// http server options
http.WithAddress(cfg.GetString("api.address")),
http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...),
http.WithTLSCertPath(cfg.GetString("api.pubKeyPath")),
http.WithTLSKeyPath(cfg.GetString("api.privKeyPath")),
}

storeOpts := []node.StoreOpt{
node.WithPath(cfg.GetString("datastore.badger.path")),
node.WithInMemory(cfg.GetString("datastore.store") == configStoreMemory),
}

acpOpts := []node.ACPOpt{
node.WithACPType(node.LocalACPType),
}

var peers []peer.AddrInfo
if val := cfg.GetStringSlice("net.peers"); len(val) > 0 {
addrs, err := netutils.ParsePeers(val)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

if cfg.GetString("datastore.store") != configStoreMemory {
rootDir := mustGetContextRootDir(cmd)
// TODO-ACP: Infuture when we add support for the --no-acp flag when admin signatures are in,
// we can allow starting of db without acp. Currently that can only be done programmatically.
// https://github.com/sourcenetwork/defradb/issues/2271
acpOpts = append(acpOpts, node.WithACPPath(rootDir))
opts = append(opts, node.WithACPPath(rootDir))
}

if !cfg.GetBool("keyring.disabled") {
Expand All @@ -153,23 +146,13 @@ func MakeStartCommand() *cobra.Command {
if err != nil {
return NewErrKeyringHelp(err)
}
netOpts = append(netOpts, net.WithPrivateKey(peerKey))
opts = append(opts, net.WithPrivateKey(peerKey))
// load the optional encryption key
encryptionKey, err := kr.Get(encryptionKeyName)
if err != nil && !errors.Is(err, keyring.ErrNotFound) {
return err
}
storeOpts = append(storeOpts, node.WithEncryptionKey(encryptionKey))
}

opts := []node.NodeOpt{
node.WithPeers(peers...),
node.WithStoreOpts(storeOpts...),
node.WithDatabaseOpts(dbOpts...),
node.WithNetOpts(netOpts...),
node.WithServerOpts(serverOpts...),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
node.WithACPOpts(acpOpts...),
opts = append(opts, node.WithEncryptionKey(encryptionKey))
}

n, err := node.NewNode(cmd.Context(), opts...)
Expand Down
91 changes: 44 additions & 47 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ import (

var log = corelog.NewLogger("node")

// Option is a generic option that applies to any subsystem.
//
// Invalid option types will be silently ignored. Valid option types are:
// - `ACPOpt`
// - `NodeOpt`
// - `StoreOpt`
// - `db.Option`
// - `http.ServerOpt`
// - `net.NodeOpt`
type Option any

// Options contains start configuration values.
type Options struct {
storeOpts []StoreOpt
dbOpts []db.Option
netOpts []net.NodeOpt
serverOpts []http.ServerOpt
acpOpts []ACPOpt
peers []peer.AddrInfo
disableP2P bool
disableAPI bool
Expand All @@ -47,41 +53,6 @@ func DefaultOptions() *Options {
// NodeOpt is a function for setting configuration values.
type NodeOpt func(*Options)

// WithStoreOpts sets the store options.
func WithStoreOpts(opts ...StoreOpt) NodeOpt {
return func(o *Options) {
o.storeOpts = opts
}
}

// WithACPOpts sets the ACP options.
func WithACPOpts(opts ...ACPOpt) NodeOpt {
return func(o *Options) {
o.acpOpts = opts
}
}

// WithDatabaseOpts sets the database options.
func WithDatabaseOpts(opts ...db.Option) NodeOpt {
return func(o *Options) {
o.dbOpts = opts
}
}

// WithNetOpts sets the net / p2p options.
func WithNetOpts(opts ...net.NodeOpt) NodeOpt {
return func(o *Options) {
o.netOpts = opts
}
}

// WithServerOpts sets the api server options.
func WithServerOpts(opts ...http.ServerOpt) NodeOpt {
return func(o *Options) {
o.serverOpts = opts
}
}

// WithDisableP2P sets the disable p2p flag.
func WithDisableP2P(disable bool) NodeOpt {
return func(o *Options) {
Expand Down Expand Up @@ -111,31 +82,57 @@ type Node struct {
}

// NewNode returns a new node instance configured with the given options.
func NewNode(ctx context.Context, opts ...NodeOpt) (*Node, error) {
func NewNode(ctx context.Context, opts ...Option) (*Node, error) {
var (
dbOpts []db.Option
acpOpts []ACPOpt
netOpts []net.NodeOpt
storeOpts []StoreOpt
serverOpts []http.ServerOpt
)

options := DefaultOptions()
for _, opt := range opts {
opt(options)
switch t := opt.(type) {
case ACPOpt:
acpOpts = append(acpOpts, t)

case NodeOpt:
t(options)

case StoreOpt:
storeOpts = append(storeOpts, t)

case db.Option:
dbOpts = append(dbOpts, t)

case http.ServerOpt:
serverOpts = append(serverOpts, t)

case net.NodeOpt:
netOpts = append(netOpts, t)
}
}

rootstore, err := NewStore(ctx, options.storeOpts...)
rootstore, err := NewStore(ctx, storeOpts...)
if err != nil {
return nil, err
}

acp, err := NewACP(ctx, options.acpOpts...)
acp, err := NewACP(ctx, acpOpts...)
if err != nil {
return nil, err
}

db, err := db.NewDB(ctx, rootstore, acp, options.dbOpts...)
db, err := db.NewDB(ctx, rootstore, acp, dbOpts...)
if err != nil {
return nil, err
}

var node *net.Node
if !options.disableP2P {
// setup net node
node, err = net.NewNode(ctx, db, options.netOpts...)
node, err = net.NewNode(ctx, db, netOpts...)
if err != nil {
return nil, err
}
Expand All @@ -156,7 +153,7 @@ func NewNode(ctx context.Context, opts ...NodeOpt) (*Node, error) {
if err != nil {
return nil, err
}
server, err = http.NewServer(handler, options.serverOpts...)
server, err = http.NewServer(handler, serverOpts...)
if err != nil {
return nil, err
}
Expand Down
59 changes: 0 additions & 59 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,13 @@
package node

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/internal/db"
"github.com/sourcenetwork/defradb/net"
)

func TestWithStoreOpts(t *testing.T) {
storeOpts := []StoreOpt{WithPath("test")}

options := &Options{}
WithStoreOpts(storeOpts...)(options)
assert.Equal(t, storeOpts, options.storeOpts)
}

func TestWithDatabaseOpts(t *testing.T) {
dbOpts := []db.Option{db.WithMaxRetries(10)}

options := &Options{}
WithDatabaseOpts(dbOpts...)(options)
assert.Equal(t, dbOpts, options.dbOpts)
}

func TestWithNetOpts(t *testing.T) {
netOpts := []net.NodeOpt{net.WithEnablePubSub(true)}

options := &Options{}
WithNetOpts(netOpts...)(options)
assert.Equal(t, netOpts, options.netOpts)
}

func TestWithServerOpts(t *testing.T) {
serverOpts := []http.ServerOpt{http.WithAddress("127.0.0.1:8080")}

options := &Options{}
WithServerOpts(serverOpts...)(options)
assert.Equal(t, serverOpts, options.serverOpts)
}

func TestWithDisableP2P(t *testing.T) {
options := &Options{}
WithDisableP2P(true)(options)
Expand All @@ -78,24 +40,3 @@ func TestWithPeers(t *testing.T) {
require.Len(t, options.peers, 1)
assert.Equal(t, *peer, options.peers[0])
}

func TestNodeStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := []NodeOpt{
WithStoreOpts(WithPath(t.TempDir())),
WithDatabaseOpts(db.WithUpdateEvents()),
}

node, err := NewNode(ctx, opts...)
require.NoError(t, err)

err = node.Start(ctx)
require.NoError(t, err)

<-time.After(5 * time.Second)

err = node.Close(ctx)
require.NoError(t, err)
}
29 changes: 10 additions & 19 deletions tests/integration/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func init() {
}

func NewBadgerMemoryDB(ctx context.Context) (client.DB, error) {
opts := []node.NodeOpt{
node.WithStoreOpts(node.WithInMemory(true)),
node.WithDatabaseOpts(db.WithUpdateEvents()),
opts := []node.Option{
node.WithInMemory(true),
db.WithUpdateEvents(),
}

node, err := node.NewNode(ctx, opts...)
Expand All @@ -87,8 +87,8 @@ func NewBadgerMemoryDB(ctx context.Context) (client.DB, error) {
func NewBadgerFileDB(ctx context.Context, t testing.TB) (client.DB, error) {
path := t.TempDir()

opts := []node.NodeOpt{
node.WithStoreOpts(node.WithPath(path)),
opts := []node.Option{
node.WithPath(path),
}

node, err := node.NewNode(ctx, opts...)
Expand All @@ -103,13 +103,9 @@ func NewBadgerFileDB(ctx context.Context, t testing.TB) (client.DB, error) {
// testing state. The database type on the test state is used to
// select the datastore implementation to use.
func setupDatabase(s *state) (client.DB, string, error) {
dbOpts := []db.Option{
opts := []node.Option{
db.WithUpdateEvents(),
db.WithLensPoolSize(lensPoolSize),
}
storeOpts := []node.StoreOpt{}
acpOpts := []node.ACPOpt{}
opts := []node.NodeOpt{
// The test framework sets this up elsewhere when required so that it may be wrapped
// into a [client.DB].
node.WithDisableAPI(true),
Expand All @@ -127,13 +123,13 @@ func setupDatabase(s *state) (client.DB, string, error) {
}

if encryptionKey != nil {
storeOpts = append(storeOpts, node.WithEncryptionKey(encryptionKey))
opts = append(opts, node.WithEncryptionKey(encryptionKey))
}

var path string
switch s.dbt {
case badgerIMType:
storeOpts = append(storeOpts, node.WithInMemory(true))
opts = append(opts, node.WithInMemory(true))

case badgerFileType:
switch {
Expand All @@ -150,20 +146,15 @@ func setupDatabase(s *state) (client.DB, string, error) {
path = s.t.TempDir()
}

storeOpts = append(storeOpts, node.WithPath(path))
acpOpts = append(acpOpts, node.WithACPPath(path))
opts = append(opts, node.WithPath(path), node.WithACPPath(path))

case defraIMType:
storeOpts = append(storeOpts, node.WithDefraStore(true))
opts = append(opts, node.WithDefraStore(true))

default:
return nil, "", fmt.Errorf("invalid database type: %v", s.dbt)
}

opts = append(opts, node.WithDatabaseOpts(dbOpts...))
opts = append(opts, node.WithStoreOpts(storeOpts...))
opts = append(opts, node.WithACPOpts(acpOpts...))

node, err := node.NewNode(s.ctx, opts...)
if err != nil {
return nil, "", err
Expand Down

0 comments on commit 150eb3f

Please sign in to comment.