Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lp api handler for stop #11473

Merged
merged 1 commit into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/api_lp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

import "context"

type LotusProvider interface {
Version(context.Context) (Version, error) //perm:admin

// Trigger shutdown
Shutdown(context.Context) error //perm:admin
}
10 changes: 10 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ import (
"github.com/filecoin-project/lotus/lib/rpcenc"
)

// NewProviderRpc creates a new http jsonrpc client.
func NewProviderRpc(ctx context.Context, addr string, requestHeader http.Header) (api.LotusProvider, jsonrpc.ClientCloser, error) {
var res v1api.LotusProviderStruct

closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, jsonrpc.WithErrors(api.RPCErrors))

return &res, closer, err
}

// NewCommonRPCV0 creates a new http jsonrpc client.
func NewCommonRPCV0(ctx context.Context, addr string, requestHeader http.Header) (api.CommonNet, jsonrpc.ClientCloser, error) {
var res v0api.CommonNetStruct
Expand Down
6 changes: 6 additions & 0 deletions api/permissioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func PermissionedWorkerAPI(a Worker) Worker {
return &out
}

func PermissionedAPI[T any](a T) T {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay generics exist now; We could drop the other ones now, but that's a separate PR, not now

var out T
permissionedProxies(a, &out)
return out
}

func PermissionedWalletAPI(a Wallet) Wallet {
var out WalletStruct
permissionedProxies(a, &out)
Expand Down
36 changes: 36 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/v1api/latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ type RawFullNodeAPI FullNode
func PermissionedFullAPI(a FullNode) FullNode {
return api.PermissionedFullAPI(a)
}

type LotusProviderStruct = api.LotusProviderStruct
2 changes: 2 additions & 0 deletions api/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (

MinerAPIVersion0 = newVer(1, 5, 0)
WorkerAPIVersion0 = newVer(1, 7, 0)

ProviderAPIVersion0 = newVer(1, 0, 0)
)

//nolint:varcheck,deadcode
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
51 changes: 51 additions & 0 deletions cmd/lotus-provider/rpc/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rpc

import (
"context"
"net/http"

"github.com/gorilla/mux"

// logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics/proxy"
)

//var log = logging.Logger("lp/rpc")

func LotusProviderHandler(
authv func(ctx context.Context, token string) ([]auth.Permission, error),
remote http.HandlerFunc,
a api.LotusProvider,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(jsonrpc.WithServerErrors(api.RPCErrors), readerServerOpt)

wapi := proxy.MetricedAPI(a)
if permissioned {
wapi = api.PermissionedAPI(wapi)
}

rpcServer.Register("Filecoin", wapi)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")

mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

if !permissioned {
return mux
}

ah := &auth.Handler{
Verify: authv,
Next: mux.ServeHTTP,
}
return ah
}
82 changes: 59 additions & 23 deletions cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/cmd/lotus-provider/rpc"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal"
Expand Down Expand Up @@ -118,11 +119,14 @@ var runCmd = &cli.Command{
tag.Insert(metrics.NodeType, "provider"),
)
shutdownChan := make(chan struct{})
ctx, ctxclose := context.WithCancel(ctx)
go func() {
<-shutdownChan
ctxclose()
}()
{
var ctxclose func()
ctx, ctxclose = context.WithCancel(ctx)
go func() {
<-shutdownChan
ctxclose()
}()
}
// Register all metric views
/*
if err := view.Register(
Expand Down Expand Up @@ -206,26 +210,43 @@ var runCmd = &cli.Command{

}

// Serve the RPC.
/*
srv := &http.Server{
Handler: sealworker.WorkerHandler(nodeApi.AuthVerify, remoteHandler, workerApi, true),
ReadHeaderTimeout: timeout,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
var authVerify func(context.Context, string) ([]auth.Permission, error)
{
privateKey, err := base64.RawStdEncoding.DecodeString(deps.cfg.Apis.StorageRPCSecret)
if err != nil {
return err
}

go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) {
var payload jwtPayload
if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil {
return nil, xerrors.Errorf("JWT Verification failed: %w", err)
}
log.Warn("Graceful shutdown successful")
}()
*/

return payload.Allow, nil
}
}
// Serve the RPC.
srv := &http.Server{
Handler: rpc.LotusProviderHandler(
authVerify,
remoteHandler,
&ProviderAPI{deps, shutdownChan},
true),
ReadHeaderTimeout: time.Minute * 3,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
}

go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()

// Monitor for shutdown.
// TODO provide a graceful shutdown API on shutdownChan
Expand Down Expand Up @@ -421,3 +442,18 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
}, nil

}

type ProviderAPI struct {
*Deps
ShutdownChan chan struct{}
}

func (p *ProviderAPI) Version(context.Context) (api.Version, error) {
return api.Version(api.ProviderAPIVersion0), nil
}

// Trigger shutdown
func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}
6 changes: 5 additions & 1 deletion cmd/lotus-worker/sealworker/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import (

var log = logging.Logger("sealworker")

func WorkerHandler(authv func(ctx context.Context, token string) ([]auth.Permission, error), remote http.HandlerFunc, a api.Worker, permissioned bool) http.Handler {
func WorkerHandler(
authv func(ctx context.Context, token string) ([]auth.Permission, error),
remote http.HandlerFunc,
a api.Worker,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(jsonrpc.WithServerErrors(api.RPCErrors), readerServerOpt)
Expand Down
6 changes: 6 additions & 0 deletions metrics/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"github.com/filecoin-project/lotus/metrics"
)

func MetricedAPI[T any](a T) T {
var out T
proxy(a, &out)
return out
}

func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
var out api.StorageMinerStruct
proxy(a, &out)
Expand Down