Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement scaffolding for sentinel integration #5672

Closed
wants to merge 7 commits into from
Closed
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,11 @@ docsgen:

print-%:
@echo $*=$($*)

# Sentinel build mode
lotus-sentinel: $(BUILD_DEPS)
rm -f lotus
go build $(GOFLAGS) -ldflags="-X=github.com/filecoin-project/lotus/build.SentinelMode=true" -o lotus ./cmd/lotus
go run github.com/GeertJohan/go.rice/rice append --exec lotus -i ./build

.PHONY: lotus
1 change: 1 addition & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// FullNode API is a low-level interface to the Filecoin network full node
type FullNode interface {
Common
Sentinel

// MethodGroup: Chain
// The Chain method group contains methods for interacting with the
Expand Down
12 changes: 12 additions & 0 deletions api/api_sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package api

import (
"context"
)

type Sentinel interface {
// MethodGroup: Sentinel

// WatchStart start a watch against the chain
WatchStart(context.Context) error
frrist marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions api/apistruct/permissioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func PermissionedFullAPI(a api.FullNode) api.FullNode {
var out FullNodeStruct
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.Internal)
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.CommonStruct.Internal)
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.SentinelStruct.Internal)
return &out
}

Expand Down
12 changes: 12 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type CommonStruct struct {
// FullNodeStruct implements API passing calls to user-provided function values.
type FullNodeStruct struct {
CommonStruct
SentinelStruct
frrist marked this conversation as resolved.
Show resolved Hide resolved

Internal struct {
ChainNotify func(context.Context) (<-chan []*api.HeadChange, error) `perm:"read"`
Expand Down Expand Up @@ -470,6 +471,12 @@ type WalletStruct struct {
}
}

type SentinelStruct struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be easier to have the sentinel methods be defined on the FullNodeStruct struct, so it doesn't have to be registered in all the random places

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree it would be easier, I believe separation will make future work simpler to land as changes will only occur in sentinel-related structures and files. I think this is a bit cleaner too. But, if you feel strongly about defining methods on the FullNodeStruct, we can do it that way instead.

Internal struct {
WatchStart func(ctx context.Context) error `perm:"admin"`
}
}

// CommonStruct

func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]auth.Permission, error) {
Expand Down Expand Up @@ -1854,9 +1861,14 @@ func (c *WalletStruct) WalletDelete(ctx context.Context, addr address.Address) e
return c.Internal.WalletDelete(ctx, addr)
}

func (s *SentinelStruct) WatchStart(ctx context.Context) error {
return s.Internal.WatchStart(ctx)
}

var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{}
var _ api.WorkerAPI = &WorkerStruct{}
var _ api.GatewayAPI = &GatewayStruct{}
var _ api.WalletAPI = &WalletStruct{}
var _ api.Sentinel = &SentinelStruct{}
1 change: 1 addition & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewFullNodeRPC(ctx context.Context, addr string, requestHeader http.Header)
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
[]interface{}{
&res.CommonStruct.Internal,
&res.SentinelStruct.Internal,
&res.Internal,
}, requestHeader)

Expand Down
3 changes: 3 additions & 0 deletions build/sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package build

var SentinelMode string = "false"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting and parsing this seems really clunky. Is this the pattern already in use by Lotus? If so then let's leave it (but it's still clunky!)

In theory the Go compiler could optimize away code that isn't needed for sentinel mode but I think this would have to be a const bool for that to work.

I feel that fx must have better support for compile time switching of modules.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only prior art I saw for this was passing the version via a build flag. @magik6k wants to avoid adding more flags to the daemon and suggested a build flag or a go plugin. If, as you've already suggested, I can pull the required bits into a separate binary, then I think this flag can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless separate binary, using build tags might be cleaner: go build -tags sentinel. It also forces to separate everything sentinel-specific to its own files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented a standalone binary here: #5693

51 changes: 46 additions & 5 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ type Events struct {

heightEvents
*hcEvents

observers []TipSetObserver
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, 2*build.ForkLengthThreshold)
}

func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)

e := &Events{
Expand All @@ -77,8 +81,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}

go e.listenHeadChanges(ctx)
Expand All @@ -90,6 +95,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
}

return e

}

func (e *Events) listenHeadChanges(ctx context.Context) {
Expand Down Expand Up @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}

if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

Expand All @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}

func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
Expand All @@ -189,5 +195,40 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}

return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
1 change: 1 addition & 0 deletions cli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ var Commands = []*cli.Command{
WithCategory("developer", fetchParamCmd),
WithCategory("network", netCmd),
WithCategory("network", syncCmd),
WithCategory("sentinel", sentinelCmd),
pprofCmd,
VersionCmd,
}
Expand Down
39 changes: 39 additions & 0 deletions cli/sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cli

import (
"github.com/urfave/cli/v2"
)

var sentinelCmd = &cli.Command{
Name: "sentinel",
Usage: "Interact with the sentinel module",
Subcommands: []*cli.Command{
sentinelStartWatchCmd,
},
}

var sentinelStartWatchCmd = &cli.Command{
Name: "watch",
Usage: "start a watch against the chain",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "confidence",
},
},
Action: func(cctx *cli.Context) error {
apic, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

//confidence := abi.ChainEpoch(cctx.Int64("confidence"))

if err := apic.WatchStart(ctx); err != nil {
return err
}

return nil
},
}
45 changes: 38 additions & 7 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http"
"os"
"runtime/pprof"
"strconv"
"strings"

paramfetch "github.com/filecoin-project/go-paramfetch"
Expand Down Expand Up @@ -51,6 +52,15 @@ const (
preTemplateFlag = "genesis-template"
)

type daemonMode int

const (
modeUnknown daemonMode = 0 // no valid mode could be determined
modeStandard daemonMode = 1 // standard mode
modeLite daemonMode = 2 // lite mode, backed by gateway
modeSentinel daemonMode = 3 // stats collection mode, analyses chain events
)

var daemonStopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus daemon",
Expand Down Expand Up @@ -154,13 +164,15 @@ var DaemonCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
daemonMode, err := getDaemonMode(cctx)
if err != nil {
return err
}

err := runmetrics.Enable(runmetrics.RunMetricOptions{
if err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
})
if err != nil {
}); err != nil {
return xerrors.Errorf("enabling runtime metrics: %w", err)
}

Expand Down Expand Up @@ -217,7 +229,7 @@ var DaemonCmd = &cli.Command{
}
freshRepo := err != repo.ErrRepoExists

if !isLite {
if daemonMode != modeLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
Expand Down Expand Up @@ -279,7 +291,7 @@ var DaemonCmd = &cli.Command{
// If the daemon is started in "lite mode", provide a GatewayAPI
// for RPC calls
liteModeDeps := node.Options()
if isLite {
if daemonMode == modeLite {
gapi, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return err
Expand All @@ -298,7 +310,7 @@ var DaemonCmd = &cli.Command{

var api api.FullNode
stop, err := node.New(ctx,
node.FullAPI(&api, node.Lite(isLite)),
node.FullAPI(&api, node.Lite(daemonMode == modeLite), node.Sentinel(daemonMode == modeSentinel)),

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan),
Expand Down Expand Up @@ -499,3 +511,22 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)

return nil
}

func getDaemonMode(cctx *cli.Context) (daemonMode, error) {
isLite := cctx.Bool("lite")
isSentinel, err := strconv.ParseBool(build.SentinelMode)
if err != nil {
return modeUnknown, err
}

switch {
case !isLite && !isSentinel:
return modeStandard, nil
case isLite && !isSentinel:
return modeLite, nil
case !isLite && isSentinel:
return modeSentinel, nil
default:
return modeUnknown, xerrors.Errorf("cannot specify more than one mode")
}
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions metrics/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func MetricedFullAPI(a api.FullNode) api.FullNode {
var out apistruct.FullNodeStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
proxy(a, &out.SentinelStruct.Internal)
return &out
}

Expand Down
Loading