Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Feb 23, 2021
1 parent 29b076a commit 9b8c48d
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 17 deletions.
1 change: 1 addition & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// FullNode API is a low-level interface to the Filecoin network full node
type FullNode interface {
Common
Sentinel

// MethodGroup: Chain
// The Chain method group contains methods for interacting with the
Expand Down
10 changes: 10 additions & 0 deletions api/api_sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

import (
"context"
)

type Sentinel interface {
WatchStop(context.Context) error
WatchStart(context.Context) error
}
17 changes: 17 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ import (
// All permissions are listed in permissioned.go
var _ = AllPermissions

type SentinelStruct struct {
Internal struct {
WatchStart func(ctx context.Context) error `perm:"admin"`
WatchStop func(ctx context.Context) error `perm:"admin"`
}
}

func (s *SentinelStruct) WatchStart(ctx context.Context) error {
return s.Internal.WatchStart(ctx)
}

func (s *SentinelStruct) WatchStop(ctx context.Context) error {
return s.Internal.WatchStop(ctx)
}

type CommonStruct struct {
Internal struct {
AuthVerify func(ctx context.Context, token string) ([]auth.Permission, error) `perm:"read"`
Expand Down Expand Up @@ -79,6 +94,7 @@ type CommonStruct struct {
// FullNodeStruct implements API passing calls to user-provided function values.
type FullNodeStruct struct {
CommonStruct
SentinelStruct

Internal struct {
ChainNotify func(context.Context) (<-chan []*api.HeadChange, error) `perm:"read"`
Expand Down Expand Up @@ -1835,3 +1851,4 @@ var _ api.StorageMiner = &StorageMinerStruct{}
var _ api.WorkerAPI = &WorkerStruct{}
var _ api.GatewayAPI = &GatewayStruct{}
var _ api.WalletAPI = &WalletStruct{}
var _ api.Sentinel = &SentinelStruct{}
1 change: 1 addition & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewFullNodeRPC(ctx context.Context, addr string, requestHeader http.Header)
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
[]interface{}{
&res.CommonStruct.Internal,
&res.SentinelStruct.Internal,
&res.Internal,
}, requestHeader)

Expand Down
51 changes: 46 additions & 5 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ type Events struct {

heightEvents
*hcEvents

observers []TipSetObserver
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, 2*build.ForkLengthThreshold)
}

func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)

e := &Events{
Expand All @@ -77,8 +81,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}

go e.listenHeadChanges(ctx)
Expand All @@ -90,6 +95,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
}

return e

}

func (e *Events) listenHeadChanges(ctx context.Context) {
Expand Down Expand Up @@ -162,7 +168,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}

if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

Expand All @@ -175,7 +181,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}

func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
Expand All @@ -187,5 +193,40 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}

return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
1 change: 1 addition & 0 deletions cli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ var Commands = []*cli.Command{
WithCategory("developer", fetchParamCmd),
WithCategory("network", netCmd),
WithCategory("network", syncCmd),
WithCategory("sentinel", sentinelCmd),
pprofCmd,
VersionCmd,
}
Expand Down
39 changes: 39 additions & 0 deletions cli/sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cli

import (
"github.com/urfave/cli/v2"
)

var sentinelCmd = &cli.Command{
Name: "sentinel",
Usage: "Interact with the sentinel module",
Subcommands: []*cli.Command{
sentinelStartWatchCmd,
},
}

var sentinelStartWatchCmd = &cli.Command{
Name: "watch",
Usage: "start a watch against the chain",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "confidence",
},
},
Action: func(cctx *cli.Context) error {
apic, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

//confidence := abi.ChainEpoch(cctx.Int64("confidence"))

if err := apic.WatchStart(ctx); err != nil {
return err
}

return nil
},
}
45 changes: 38 additions & 7 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ const (
preTemplateFlag = "genesis-template"
)

type daemonMode int

const (
modeUnknown daemonMode = 0 // no valid mode could be determined
modeStandard daemonMode = 1 // standard mode
modeLite daemonMode = 2 // lite mode, backed by gateway
modeSentinel daemonMode = 3 // stats collection mode, analyses chain events
)

var daemonStopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus daemon",
Expand Down Expand Up @@ -119,6 +128,10 @@ var DaemonCmd = &cli.Command{
Usage: "start lotus in lite mode",
Hidden: true,
},
&cli.BoolFlag{
Name: "sentinel",
Usage: "start lotus in sentinel mode.",
},
&cli.StringFlag{
Name: "pprof",
Usage: "specify name of file for writing cpu profile to",
Expand Down Expand Up @@ -154,13 +167,15 @@ var DaemonCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
daemonMode, err := getDaemonMode(cctx)
if err != nil {
return err
}

err := runmetrics.Enable(runmetrics.RunMetricOptions{
if err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
})
if err != nil {
}); err != nil {
return xerrors.Errorf("enabling runtime metrics: %w", err)
}

Expand Down Expand Up @@ -217,7 +232,7 @@ var DaemonCmd = &cli.Command{
}
freshRepo := err != repo.ErrRepoExists

if !isLite {
if daemonMode != modeLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
Expand Down Expand Up @@ -279,7 +294,7 @@ var DaemonCmd = &cli.Command{
// If the daemon is started in "lite mode", provide a GatewayAPI
// for RPC calls
liteModeDeps := node.Options()
if isLite {
if daemonMode == modeLite {
gapi, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return err
Expand All @@ -298,7 +313,7 @@ var DaemonCmd = &cli.Command{

var api api.FullNode
stop, err := node.New(ctx,
node.FullAPI(&api, node.Lite(isLite)),
node.FullAPI(&api, node.Lite(daemonMode == modeLite), node.Sentinel(daemonMode == modeSentinel)),

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan),
Expand Down Expand Up @@ -499,3 +514,19 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {

return nil
}

func getDaemonMode(cctx *cli.Context) (daemonMode, error) {
isLite := cctx.Bool("lite")
isSentinel := cctx.Bool("sentinel")

switch {
case !isLite && !isSentinel:
return modeStandard, nil
case isLite && !isSentinel:
return modeLite, nil
case !isLite && isSentinel:
return modeSentinel, nil
default:
return modeUnknown, xerrors.Errorf("cannot specify more than one mode")
}
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
26 changes: 21 additions & 5 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/impl/sentinel"
"github.com/filecoin-project/lotus/system"

logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -166,9 +168,10 @@ type Settings struct {

nodeType repo.RepoType

Online bool // Online option applied
Config bool // Config option applied
Lite bool // Start node in "lite" mode
Online bool // Online option applied
Config bool // Config option applied
Lite bool // Start node in "lite" mode
Sentinel bool // Start node in "sentinel" mode
}

func defaults() []Option {
Expand Down Expand Up @@ -240,9 +243,9 @@ func isType(t repo.RepoType) func(s *Settings) bool {

// Online sets up basic libp2p node
func Online() Option {
isFullOrLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode }
isFullNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite }
isLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite }
isSentinelNode := func(s *Settings) bool { return true /*s.nodeType == repo.FullNode && s.Sentinel*/ }

return Options(
// make sure that online is applied before Config.
Expand All @@ -258,7 +261,7 @@ func Online() Option {
Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter),

// Full node or lite node
ApplyIf(isFullOrLiteNode,
ApplyIf(isType(repo.FullNode),
// TODO: Fix offline mode

Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Expand Down Expand Up @@ -340,6 +343,12 @@ func Online() Option {
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(new(api.Sentinel), From(new(sentinel.SentinelUnavailable))),
),

ApplyIf(isSentinelNode,
Override(new(*events.Events), modules.NewEvents),
Override(new(api.Sentinel), From(new(sentinel.SentinelAPI))),
),

// miner
Expand Down Expand Up @@ -578,6 +587,13 @@ func Lite(enable bool) FullOption {
}
}

func Sentinel(enable bool) FullOption {
return func(s *Settings) error {
s.Sentinel = enable
return nil
}
}

func FullAPI(out *api.FullNode, fopts ...FullOption) Option {
return Options(
func(s *Settings) error {
Expand Down
2 changes: 2 additions & 0 deletions node/impl/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/market"
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/impl/sentinel"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

Expand All @@ -29,6 +30,7 @@ type FullNodeAPI struct {
full.WalletAPI
full.SyncAPI
full.BeaconAPI
sentinel.SentinelAPI

DS dtypes.MetadataDS
}
Expand Down
Loading

0 comments on commit 9b8c48d

Please sign in to comment.