diff --git a/api/api_full.go b/api/api_full.go index ca219dd27c0..9a909635981 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -35,6 +35,7 @@ import ( // FullNode API is a low-level interface to the Filecoin network full node type FullNode interface { Common + Sentinel // MethodGroup: Chain // The Chain method group contains methods for interacting with the diff --git a/api/api_sentinel.go b/api/api_sentinel.go new file mode 100644 index 00000000000..4aa4e8657e1 --- /dev/null +++ b/api/api_sentinel.go @@ -0,0 +1,10 @@ +package api + +import ( + "context" +) + +type Sentinel interface { + WatchStop(context.Context) error + WatchStart(context.Context) error +} diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 3da39ef5628..bcc84497db0 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -43,6 +43,21 @@ import ( // All permissions are listed in permissioned.go var _ = AllPermissions +type SentinelStruct struct { + Internal struct { + WatchStart func(ctx context.Context) error `perm:"admin"` + WatchStop func(ctx context.Context) error `perm:"admin"` + } +} + +func (s *SentinelStruct) WatchStart(ctx context.Context) error { + return s.Internal.WatchStart(ctx) +} + +func (s *SentinelStruct) WatchStop(ctx context.Context) error { + return s.Internal.WatchStop(ctx) +} + type CommonStruct struct { Internal struct { AuthVerify func(ctx context.Context, token string) ([]auth.Permission, error) `perm:"read"` @@ -79,6 +94,7 @@ type CommonStruct struct { // FullNodeStruct implements API passing calls to user-provided function values. type FullNodeStruct struct { CommonStruct + SentinelStruct Internal struct { ChainNotify func(context.Context) (<-chan []*api.HeadChange, error) `perm:"read"` @@ -1835,3 +1851,4 @@ var _ api.StorageMiner = &StorageMinerStruct{} var _ api.WorkerAPI = &WorkerStruct{} var _ api.GatewayAPI = &GatewayStruct{} var _ api.WalletAPI = &WalletStruct{} +var _ api.Sentinel = &SentinelStruct{} diff --git a/api/client/client.go b/api/client/client.go index 7d8a466d333..0a6e162e874 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -33,6 +33,7 @@ func NewFullNodeRPC(ctx context.Context, addr string, requestHeader http.Header) closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin", []interface{}{ &res.CommonStruct.Internal, + &res.SentinelStruct.Internal, &res.Internal, }, requestHeader) diff --git a/chain/events/events.go b/chain/events/events.go index 1dcf634231c..c47652f48a0 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -55,11 +55,15 @@ type Events struct { heightEvents *hcEvents + + observers []TipSetObserver } func NewEvents(ctx context.Context, api eventAPI) *Events { - gcConfidence := 2 * build.ForkLengthThreshold + return NewEventsWithConfidence(ctx, api, 2*build.ForkLengthThreshold) +} +func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events { tsc := newTSCache(gcConfidence, api) e := &Events{ @@ -77,8 +81,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), - ready: make(chan struct{}), + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + ready: make(chan struct{}), + observers: []TipSetObserver{}, } go e.listenHeadChanges(ctx) @@ -90,6 +95,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { } return e + } func (e *Events) listenHeadChanges(ctx context.Context) { @@ -162,7 +168,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } } - if err := e.headChange(rev, app); err != nil { + if err := e.headChange(ctx, rev, app); err != nil { log.Warnf("headChange failed: %s", err) } @@ -175,7 +181,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { return nil } -func (e *Events) headChange(rev, app []*types.TipSet) error { +func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error { if len(app) == 0 { return xerrors.New("events.headChange expected at least one applied tipset") } @@ -187,5 +193,40 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } + if err := e.observeChanges(ctx, rev, app); err != nil { + return err + } + return e.processHeadChangeEvent(rev, app) } + +// A TipSetObserver receives notifications of tipsets +type TipSetObserver interface { + Apply(ctx context.Context, ts *types.TipSet) error + Revert(ctx context.Context, ts *types.TipSet) error +} + +// TODO: add a confidence level so we can have observers with difference levels of confidence +func (e *Events) Observe(obs TipSetObserver) error { + e.lk.Lock() + defer e.lk.Unlock() + e.observers = append(e.observers, obs) + return nil +} + +// observeChanges expects caller to hold e.lk +func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error { + for _, ts := range rev { + for _, o := range e.observers { + _ = o.Revert(ctx, ts) + } + } + + for _, ts := range app { + for _, o := range e.observers { + _ = o.Apply(ctx, ts) + } + } + + return nil +} diff --git a/cli/cmd.go b/cli/cmd.go index 02ef06002af..0a7445feec2 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -319,6 +319,7 @@ var Commands = []*cli.Command{ WithCategory("developer", fetchParamCmd), WithCategory("network", netCmd), WithCategory("network", syncCmd), + WithCategory("sentinel", sentinelCmd), pprofCmd, VersionCmd, } diff --git a/cli/sentinel.go b/cli/sentinel.go new file mode 100644 index 00000000000..04efa742111 --- /dev/null +++ b/cli/sentinel.go @@ -0,0 +1,39 @@ +package cli + +import ( + "github.com/urfave/cli/v2" +) + +var sentinelCmd = &cli.Command{ + Name: "sentinel", + Usage: "Interact with the sentinel module", + Subcommands: []*cli.Command{ + sentinelStartWatchCmd, + }, +} + +var sentinelStartWatchCmd = &cli.Command{ + Name: "watch", + Usage: "start a watch against the chain", + Flags: []cli.Flag{ + &cli.Int64Flag{ + Name: "confidence", + }, + }, + Action: func(cctx *cli.Context) error { + apic, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + //confidence := abi.ChainEpoch(cctx.Int64("confidence")) + + if err := apic.WatchStart(ctx); err != nil { + return err + } + + return nil + }, +} diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 457fb1efb84..95132c47109 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -51,6 +51,15 @@ const ( preTemplateFlag = "genesis-template" ) +type daemonMode int + +const ( + modeUnknown daemonMode = 0 // no valid mode could be determined + modeStandard daemonMode = 1 // standard mode + modeLite daemonMode = 2 // lite mode, backed by gateway + modeSentinel daemonMode = 3 // stats collection mode, analyses chain events +) + var daemonStopCmd = &cli.Command{ Name: "stop", Usage: "Stop a running lotus daemon", @@ -119,6 +128,10 @@ var DaemonCmd = &cli.Command{ Usage: "start lotus in lite mode", Hidden: true, }, + &cli.BoolFlag{ + Name: "sentinel", + Usage: "start lotus in sentinel mode.", + }, &cli.StringFlag{ Name: "pprof", Usage: "specify name of file for writing cpu profile to", @@ -154,13 +167,15 @@ var DaemonCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - isLite := cctx.Bool("lite") + daemonMode, err := getDaemonMode(cctx) + if err != nil { + return err + } - err := runmetrics.Enable(runmetrics.RunMetricOptions{ + if err := runmetrics.Enable(runmetrics.RunMetricOptions{ EnableCPU: true, EnableMemory: true, - }) - if err != nil { + }); err != nil { return xerrors.Errorf("enabling runtime metrics: %w", err) } @@ -217,7 +232,7 @@ var DaemonCmd = &cli.Command{ } freshRepo := err != repo.ErrRepoExists - if !isLite { + if daemonMode != modeLite { if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil { return xerrors.Errorf("fetching proof parameters: %w", err) } @@ -279,7 +294,7 @@ var DaemonCmd = &cli.Command{ // If the daemon is started in "lite mode", provide a GatewayAPI // for RPC calls liteModeDeps := node.Options() - if isLite { + if daemonMode == modeLite { gapi, closer, err := lcli.GetGatewayAPI(cctx) if err != nil { return err @@ -298,7 +313,7 @@ var DaemonCmd = &cli.Command{ var api api.FullNode stop, err := node.New(ctx, - node.FullAPI(&api, node.Lite(isLite)), + node.FullAPI(&api, node.Lite(daemonMode == modeLite), node.Sentinel(daemonMode == modeSentinel)), node.Override(new(dtypes.Bootstrapper), isBootstrapper), node.Override(new(dtypes.ShutdownChan), shutdownChan), @@ -499,3 +514,19 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { return nil } + +func getDaemonMode(cctx *cli.Context) (daemonMode, error) { + isLite := cctx.Bool("lite") + isSentinel := cctx.Bool("sentinel") + + switch { + case !isLite && !isSentinel: + return modeStandard, nil + case isLite && !isSentinel: + return modeLite, nil + case !isLite && isSentinel: + return modeSentinel, nil + default: + return modeUnknown, xerrors.Errorf("cannot specify more than one mode") + } +} diff --git a/go.sum b/go.sum index 7f60cef4ae9..c8f84737ea9 100644 --- a/go.sum +++ b/go.sum @@ -1406,6 +1406,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/node/builder.go b/node/builder.go index 8ee9b367440..f70ad356f41 100644 --- a/node/builder.go +++ b/node/builder.go @@ -10,11 +10,13 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/node/hello" + "github.com/filecoin-project/lotus/node/impl/sentinel" "github.com/filecoin-project/lotus/system" logging "github.com/ipfs/go-log" @@ -166,9 +168,10 @@ type Settings struct { nodeType repo.RepoType - Online bool // Online option applied - Config bool // Config option applied - Lite bool // Start node in "lite" mode + Online bool // Online option applied + Config bool // Config option applied + Lite bool // Start node in "lite" mode + Sentinel bool // Start node in "sentinel" mode } func defaults() []Option { @@ -240,9 +243,9 @@ func isType(t repo.RepoType) func(s *Settings) bool { // Online sets up basic libp2p node func Online() Option { - isFullOrLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode } isFullNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite } isLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite } + isSentinelNode := func(s *Settings) bool { return true /*s.nodeType == repo.FullNode && s.Sentinel*/ } return Options( // make sure that online is applied before Config. @@ -258,7 +261,7 @@ func Online() Option { Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), // Full node or lite node - ApplyIf(isFullOrLiteNode, + ApplyIf(isType(repo.FullNode), // TODO: Fix offline mode Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), @@ -340,6 +343,12 @@ func Online() Option { Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + Override(new(api.Sentinel), From(new(sentinel.SentinelUnavailable))), + ), + + ApplyIf(isSentinelNode, + Override(new(*events.Events), modules.NewEvents), + Override(new(api.Sentinel), From(new(sentinel.SentinelAPI))), ), // miner @@ -578,6 +587,13 @@ func Lite(enable bool) FullOption { } } +func Sentinel(enable bool) FullOption { + return func(s *Settings) error { + s.Sentinel = enable + return nil + } +} + func FullAPI(out *api.FullNode, fopts ...FullOption) Option { return Options( func(s *Settings) error { diff --git a/node/impl/full.go b/node/impl/full.go index add40917c84..e3aaf015a3d 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/market" "github.com/filecoin-project/lotus/node/impl/paych" + "github.com/filecoin-project/lotus/node/impl/sentinel" "github.com/filecoin-project/lotus/node/modules/dtypes" ) @@ -29,6 +30,7 @@ type FullNodeAPI struct { full.WalletAPI full.SyncAPI full.BeaconAPI + sentinel.SentinelAPI DS dtypes.MetadataDS } diff --git a/node/impl/sentinel/sentinel.go b/node/impl/sentinel/sentinel.go new file mode 100644 index 00000000000..eda2bd40d9d --- /dev/null +++ b/node/impl/sentinel/sentinel.go @@ -0,0 +1,46 @@ +package sentinel + +import ( + "context" + + logging "github.com/ipfs/go-log/v2" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/sentinel" +) + +var log = logging.Logger("sentinel-module") + +type SentinelAPI struct { + fx.In + + Events *events.Events +} + +func (m *SentinelAPI) WatchStart(ctx context.Context) error { + log.Info("starting sentinel watch") + return m.Events.Observe(&sentinel.LoggingTipSetObserver{}) +} + +func (m *SentinelAPI) WatchStop(ctx context.Context) error { + log.Info("stopping sentinel watch") + return nil +} + +// SentinelUnavailable is an implementation of the sentinel api that returns an unavailable error for every request +type SentinelUnavailable struct { + fx.In +} + +func (SentinelUnavailable) WatchStart(ctx context.Context) error { + return xerrors.Errorf("sentinel unavailable") +} + +func (SentinelUnavailable) WatchStop(ctx context.Context) error { + return xerrors.Errorf("sentinel unavailable") +} + +var _ api.Sentinel = &SentinelAPI{} diff --git a/node/modules/sentinel.go b/node/modules/sentinel.go new file mode 100644 index 00000000000..08932284a60 --- /dev/null +++ b/node/modules/sentinel.go @@ -0,0 +1,21 @@ +package modules + +import ( + "go.uber.org/fx" + + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/node/modules/helpers" +) + +func NewEvents(mctx helpers.MetricsCtx, lc fx.Lifecycle, chainAPI full.ChainModuleAPI, stateAPI full.StateModuleAPI) *events.Events { + api := struct { + full.ChainModuleAPI + full.StateModuleAPI + }{ + ChainModuleAPI: chainAPI, + StateModuleAPI: stateAPI, + } + + return events.NewEventsWithConfidence(mctx, api, 10) +} diff --git a/sentinel/observer.go b/sentinel/observer.go new file mode 100644 index 00000000000..da4525488f2 --- /dev/null +++ b/sentinel/observer.go @@ -0,0 +1,27 @@ +package sentinel + +import ( + "context" + + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/types" +) + +var log = logging.Logger("sentinel-observer") + +var _ events.TipSetObserver = (*LoggingTipSetObserver)(nil) + +type LoggingTipSetObserver struct { +} + +func (o *LoggingTipSetObserver) Apply(ctx context.Context, ts *types.TipSet) error { + log.Infof("TipSetObserver.Apply(%q)", ts.Key()) + return nil +} + +func (o *LoggingTipSetObserver) Revert(ctx context.Context, ts *types.TipSet) error { + log.Infof("TipSetObserver.Revert(%q)", ts.Key()) + return nil +}