Skip to content
This repository has been archived by the owner on Dec 20, 2023. It is now read-only.

Commit

Permalink
checkpoint #2
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoniz committed Dec 26, 2022
1 parent 8992928 commit a21388f
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 272 deletions.
12 changes: 5 additions & 7 deletions chain/consensus/mir/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ type Mir struct {
}

func NewConsensus(
ctx context.Context,
sm *stmgr.StateManager,
ds dtypes.MetadataDS,
b beacon.Schedule,
g chain.Genesis,
badBlock *chain.BadBlockCache,
netName dtypes.NetworkName,
) (consensus.Consensus, error) {
) (*Mir, error) {
return &Mir{
beacon: b,
sm: sm,
Expand All @@ -62,7 +60,7 @@ func NewConsensus(
}

// CreateBlock creates a Filecoin block from the block template provided by Mir.
func (bft *Mir) CreateBlock(ctx context.Context, w lapi.Wallet, bt *lapi.BlockTemplate) (*types.FullBlock, error) {
func (bft *Mir) CreateBlock(ctx context.Context, _ lapi.Wallet, bt *lapi.BlockTemplate) (*types.FullBlock, error) {
pts, err := bft.sm.ChainStore().LoadTipSet(ctx, bt.Parents)
if err != nil {
return nil, fmt.Errorf("failed to load parent tipset: %w", err)
Expand Down Expand Up @@ -104,7 +102,7 @@ func (bft *Mir) CreateBlock(ctx context.Context, w lapi.Wallet, bt *lapi.BlockTe
}, nil
}

func (bft *Mir) ValidateBlockHeader(ctx context.Context, b *types.BlockHeader) (rejectReason string, err error) {
func (bft *Mir) ValidateBlockHeader(_ context.Context, b *types.BlockHeader) (rejectReason string, err error) {
if b.IsValidated() {
return "", nil
}
Expand Down Expand Up @@ -304,14 +302,14 @@ func hasCheckpoint(h *types.BlockHeader) bool {
// IsEpochBeyondCurrMax is used in Filcns to detect delayed blocks.
// We are currently using defaults here and not worrying about it.
// We will consider potential changes of Consensus interface in https://github.com/filecoin-project/eudico/issues/143.
func (bft *Mir) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool {
func (bft *Mir) IsEpochBeyondCurrMax(_ abi.ChainEpoch) bool {
return false
}

// Weight in mir uses a default approach where the height determines the weight.
//
// Every tipset in mir has a single block.
func Weight(ctx context.Context, stateBs bstore.Blockstore, ts *types.TipSet) (types.BigInt, error) {
func Weight(_ context.Context, _ bstore.Blockstore, ts *types.TipSet) (types.BigInt, error) {
if ts == nil {
return types.NewInt(0), nil
}
Expand Down
14 changes: 14 additions & 0 deletions chain/consensus/mir/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package mir

import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"go.uber.org/fx"
)

var Module = fx.Module("mirConsensus",
fx.Provide(fx.Annotate(NewConsensus, fx.As(new(consensus.Consensus)))),
fx.Supply(store.WeightFunc(Weight)),
fx.Supply(fx.Annotate(consensus.NewTipSetExecutor(RewardFunc), fx.As(new(stmgr.Executor)))),
)
219 changes: 115 additions & 104 deletions cmd/lotus/daemon_refactored.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ import (
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/lp2p"
"github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
metricsprom "github.com/ipfs/go-metrics-prometheus"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opencensus.io/plugin/runmetrics"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/fx"
"golang.org/x/xerrors"
"net/http"
"os"
"runtime/pprof"
)
Expand All @@ -44,39 +47,6 @@ var RefactoredDaemonCmd = func() *cli.Command {
}()

func refactoredDaemonAction(cctx *cli.Context) error {

/*
// node.defaults() - originally called from node.New
options := node.Defaults()
// node.FullAPI()
var api lapi.FullNode
options = append(
options,
node.FullAPI(&api, node.Lite(false), node.MirValidator(false)),
)
fxOptions, _ := node.ConvertToFxOptions(options...)
*/
// node.Base()
// node.Repo()

// dtypes.Bootstrapper
// dtypes.ShutdownChan

// genesis
// liteModeDeps

// Mir Consensus

// SetApiEndpointKey

// Unset node.RunPeerMgrKey and peermgr.PeerMgr is not bootstrap

return legacyDaemonAction(cctx)
}

func legacyDaemonAction(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
isMirValidator := cctx.Bool("mir-validator")
log.Warnf("mir-validator = %v", isMirValidator)
Expand Down Expand Up @@ -243,43 +213,7 @@ func legacyDaemonAction(cctx *cli.Context) error {
fxInvokes[i] = fx.Options()
}

// node.defaults
fxProviders = append(fxProviders, node.FxDefaultsProviders)
mergeInvokes(fxInvokes, node.FxDefaultsInvokers())

// node.FullAPI (settings are set further below)
var api lapi.FullNode
var resAPI = &impl.FullNodeAPI{}
fxInvokes[node.ExtractApiKey] = fx.Populate(resAPI)
api = resAPI

// node.Base()
fxProviders = append(
fxProviders,
node.FxLibP2PProviders,
node.FxChainNodeProviders)
mergeInvokes(fxInvokes, node.FxLibP2PInvokers())
mergeInvokes(fxInvokes, node.FxChainNodeInvokers())

//// consensus conditional providers
if build.IsMirConsensus() {
fxProviders = append(
fxProviders,
fx.Provide(mir.NewConsensus),
fx.Supply(store.WeightFunc(mir.Weight)),
fx.Supply(fx.Annotate(consensus.NewTipSetExecutor(mir.RewardFunc), fx.As(new(stmgr.Executor)))),
)
} else {
fxProviders = append(
fxProviders,
fx.Provide(filcns.NewFilecoinExpectedConsensus),
fx.Supply(store.WeightFunc(filcns.Weight)),
fx.Supply(fx.Annotate(consensus.NewTipSetExecutor(filcns.RewardFunc), fx.As(new(stmgr.Executor)))),
)
}

// node.Repo()
//// setup
// repo setup
lockedRepo, err := r.Lock(repo.FullNode)
if err != nil {
panic(err)
Expand All @@ -293,10 +227,45 @@ func legacyDaemonAction(cctx *cli.Context) error {
panic("invalid config from repo")
}

// Refactored into modules
var consensusModule fx.Option
if build.IsMirConsensus() {
consensusModule = mir.Module
} else {
consensusModule = fx.Provide(
fx.Provide(filcns.NewFilecoinExpectedConsensus),
fx.Supply(store.WeightFunc(filcns.Weight)),
fx.Supply(fx.Annotate(consensus.NewTipSetExecutor(filcns.RewardFunc), fx.As(new(stmgr.Executor)))),
)
}

fxProviders = append(fxProviders,
lp2p.Module(&cfg.Common),
repoModule(lockedRepo),
consensusModule,
// orphaned, but likely belongs in some repo/store module
fx.Provide(modules.Datastore(cfg.Backup.DisableMetadataLog)),
)

// node.defaults
fxProviders = append(fxProviders, node.FxDefaultsProviders)
mergeInvokes(fxInvokes, node.FxDefaultsInvokers())

// node.Base()
fxProviders = append(
fxProviders,
//node.FxLibP2PProviders,
node.FxChainNodeProviders)
//mergeInvokes(fxInvokes, node.FxLibP2PInvokers())
mergeInvokes(fxInvokes, node.FxChainNodeInvokers())

// node.Repo()
//// providers and invokes
fxProviders = append(
fxProviders,
node.FxRepoProviders(lockedRepo, cfg))
//node.FxRepoProviders(lockedRepo, cfg),
node.FxConfigFullNodeProviders(cfg),
)
mergeInvokes(fxInvokes, node.FxConfigCommonInvokers(&cfg.Common))

// misc providers
Expand All @@ -323,7 +292,22 @@ func legacyDaemonAction(cctx *cli.Context) error {
// node.Unset(new(*peermgr.PeerMgr)),
//),

app := fx.New(fx.Options(fxProviders...), fx.Options(fxInvokes...))
// Debugging of the dependency graph
fxInvokes = append(fxInvokes,
fx.Invoke(
func(dotGraph fx.DotGraph) {
os.WriteFile("fx.dot", []byte(dotGraph), 0660)
}),
)

fxProviders = append(fxProviders, startupModule(cctx, r, lockedRepo, cfg))

var rpcStopper node.StopFunc
app := fx.New(
fx.Options(fxProviders...),
fx.Options(fxInvokes...),
fx.Populate(&rpcStopper),
)

// TODO: we probably should have a 'firewall' for Closing signal
// on this context, and implement closing logic through lifecycles
Expand All @@ -335,38 +319,11 @@ func legacyDaemonAction(cctx *cli.Context) error {
//// END REFACTORED NODE CONSTRUCTION //
////////////////////////////////////////

if cctx.String("import-key") != "" {
if err := importKey(ctx, api, cctx.String("import-key")); err != nil {
log.Errorf("importing key failed: %+v", err)
}
}

endpoint, err := r.APIEndpoint()
if err != nil {
return xerrors.Errorf("getting api endpoint: %w", err)
}

//
// Instantiate JSON-RPC endpoint.
// ----

// Populate JSON-RPC options.
serverOptions := []jsonrpc.ServerOption{jsonrpc.WithServerErrors(lapi.RPCErrors)}
if maxRequestSize := cctx.Int("api-max-req-size"); maxRequestSize != 0 {
serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(int64(maxRequestSize)))
}

// Instantiate the full node handler.
h, err := node.FullNodeHandler(api, true, serverOptions...)
if err != nil {
return fmt.Errorf("failed to instantiate rpc handler: %s", err)
}

// Serve the RPC.
rpcStopper, err := node.ServeRPC(h, "lotus-daemon", endpoint)
if err != nil {
return fmt.Errorf("failed to start json-rpc endpoint: %s", err)
}
//if cctx.String("import-key") != "" {
// if err := importKey(ctx, api, cctx.String("import-key")); err != nil {
// log.Errorf("importing key failed: %+v", err)
// }
//}

// Monitor for shutdown.
finishCh := node.MonitorShutdown(shutdownChan,
Expand All @@ -379,6 +336,60 @@ func legacyDaemonAction(cctx *cli.Context) error {
return nil
}

var repoModule = func(lr repo.LockedRepo) fx.Option {
return fx.Module("repo",
fx.Provide(
modules.LockedRepo(lr),
modules.KeyStore,
modules.APISecret,
),
)
}

func startupModule(cctx *cli.Context, r *repo.FsRepo, lr repo.LockedRepo, cfg *config.FullNode) fx.Option {
return fx.Module("startup",
fx.Provide(newFullNodeHandler),
fx.Supply(newServerOptions(cctx.Int("api-max-req-size"))),
fx.Supply(r),
fx.Supply(lr),
fx.Provide(
func(api impl.FullNodeAPI, lr repo.LockedRepo, e dtypes.APIEndpoint) lapi.FullNode {
lr.SetAPIEndpoint(e)
return &api
},
),
fx.Provide(
func() (dtypes.APIEndpoint, error) {
return multiaddr.NewMultiaddr(cfg.API.ListenAddress)
},
),
fx.Provide(startRPCServer),
//func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
// return lr.SetAPIEndpoint(e)
//},
)
}

func newServerOptions(maxRequestSize int) []jsonrpc.ServerOption {
serverOptions := []jsonrpc.ServerOption{jsonrpc.WithServerErrors(lapi.RPCErrors)}
if maxRequestSize > 0 {
serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(int64(maxRequestSize)))
}
return serverOptions
}

func newFullNodeHandler(api lapi.FullNode, serverOptions []jsonrpc.ServerOption) (http.Handler, error) {
return node.FullNodeHandler(api, true, serverOptions...)
}

func startRPCServer(h http.Handler, repo *repo.FsRepo) (node.StopFunc, error) {
endpoint, err := repo.APIEndpoint()
if err != nil {
return nil, xerrors.Errorf("getting api endpoint: %w", err)
}
return node.ServeRPC(h, "lotus-daemon", endpoint)
}

func mergeInvokes(into []fx.Option, from []fx.Option) {
if len(into) != len(from) {
panic("failed to merge invokes due to different lengths")
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ require (
go.opentelemetry.io/otel/bridge/opencensus v0.25.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.uber.org/fx v1.15.0
go.uber.org/fx v0.0.0-20221215155203-3a86277ab0e5
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.22.0
go.uber.org/zap v1.23.0
golang.org/x/net v0.0.0-20220812174116-3211cb980234
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
Expand Down Expand Up @@ -322,7 +322,7 @@ require (
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.12.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
Expand Down
Loading

0 comments on commit a21388f

Please sign in to comment.