Skip to content

Commit

Permalink
feat: curio ffiselect: Isolate gpu calls in a subprocess (#11994)
Browse files Browse the repository at this point in the history
* started

* so far so good

* builds and looks good

* changing level of abstration. some work remains

* it builds

* betterment

* import order

* 2

* stupid linter - you can cast a nil

* build commit and date

* nicer

* tmp and nide makefile

* comments handled

* oops

* added debug and reg

* ffiselect: change err encode to strings, fix some bugs

* ffiselect: Wrap rust logs into go-log

* ffiselect: Make the linter happy

* verification tests

* ffiselect: Fix startup

---------

Co-authored-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
snadrus and magik6k authored May 23, 2024
1 parent 9391548 commit c1f99c5
Show file tree
Hide file tree
Showing 17 changed files with 695 additions and 50 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ BINS+=lotus-miner

curio: $(BUILD_DEPS)
rm -f curio
$(GOCC) build $(GOFLAGS) -o curio ./cmd/curio
$(GOCC) build $(GOFLAGS) -o curio -ldflags " \
-X github.com/filecoin-project/lotus/curiosrc/build.IsOpencl=$(FFI_USE_OPENCL) \
-X github.com/filecoin-project/lotus/curiosrc/build.Commit=`git log -1 --format=%h_%cI`" \
./cmd/curio
.PHONY: curio
BINS+=curio

Expand Down
13 changes: 0 additions & 13 deletions cmd/curio/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

"github.com/BurntSushi/toml"
"github.com/gbrlsnchs/jwt/v3"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
Expand All @@ -28,7 +26,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
Expand Down Expand Up @@ -170,7 +167,6 @@ type Deps struct {
DB *harmonydb.DB // has itest capability
Full api.FullNode
Verif storiface.Verifier
LW *sealer.LocalWorker
As *multictladdr.MultiAddressSelector
Maddrs map[dtypes.MinerAddress]bool
ProofTypes map[abi.RegisteredSealProof]bool
Expand Down Expand Up @@ -311,16 +307,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
if deps.Stor == nil {
deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
}
if deps.LW == nil {
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))

// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a curio specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{
MaxParallelChallengeReads: deps.Cfg.Proving.ParallelCheckLimit,
}, deps.Stor, deps.LocalStore, deps.Si, nil, wstates)
}
if deps.Maddrs == nil {
deps.Maddrs = map[dtypes.MinerAddress]bool{}
}
Expand Down
71 changes: 71 additions & 0 deletions cmd/curio/ffi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"encoding/gob"
"fmt"
"os"
"reflect"

"github.com/ipfs/go-cid"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/lib/ffiselect"
ffidirect "github.com/filecoin-project/lotus/lib/ffiselect/ffidirect"
"github.com/filecoin-project/lotus/lib/must"
)

var ffiCmd = &cli.Command{
Name: "ffi",
Hidden: true,
Flags: []cli.Flag{
layersFlag,
},
Action: func(cctx *cli.Context) (err error) {
output := os.NewFile(uintptr(3), "out")

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
if err != nil {
err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: nil, Err: err.Error()})
if err != nil {
panic(err)
}
}
}()
var callInfo ffiselect.FFICall
if err := gob.NewDecoder(os.Stdin).Decode(&callInfo); err != nil {
return xerrors.Errorf("ffi subprocess can not decode: %w", err)
}

args := lo.Map(callInfo.Args, func(arg any, i int) reflect.Value {
return reflect.ValueOf(arg)
})

resAry := reflect.ValueOf(ffidirect.FFI{}).MethodByName(callInfo.Fn).Call(args)
res := lo.Map(resAry, func(res reflect.Value, i int) any {
return res.Interface()
})

err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: res, Err: ""})
if err != nil {
return xerrors.Errorf("ffi subprocess can not encode: %w", err)
}

return output.Close()
},
}

func ffiSelfTest() {
val1, val2 := 12345678, must.One(cid.Parse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"))
ret1, ret2, err := ffiselect.FFISelect{}.SelfTest(val1, val2)
if err != nil {
panic("ffi self test failed:" + err.Error())
}
if ret1 != val1 || !val2.Equals(ret2) {
panic(fmt.Sprint("ffi self test failed: values do not match: ", val1, val2, ret1, ret2))
}
}
1 change: 1 addition & 0 deletions cmd/curio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
sealCmd,
marketCmd,
fetchParamCmd,
ffiCmd,
}

jaeger := tracing.SetupJaegerTracing("curio")
Expand Down
2 changes: 1 addition & 1 deletion cmd/curio/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
}

wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler(
ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, nil,
ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, nil, nil,
deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
Expand Down
20 changes: 9 additions & 11 deletions cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/cmd/curio/rpc"
Expand Down Expand Up @@ -94,11 +92,7 @@ var runCmd = &cli.Command{
log.Errorf("ensuring tempdir exists: %s", err)
}

ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "curio"),
)
ctx := lcli.DaemonContext(cctx)
shutdownChan := make(chan struct{})
{
var ctxclose func()
Expand Down Expand Up @@ -131,6 +125,8 @@ var runCmd = &cli.Command{
return err
}

go ffiSelfTest() // Panics on failure

taskEngine, err := tasks.StartTasks(ctx, dependencies)

if err != nil {
Expand All @@ -155,6 +151,11 @@ var runCmd = &cli.Command{
},
}

var layersFlag = &cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
}

var webCmd = &cli.Command{
Name: "web",
Usage: "Start Curio web interface",
Expand All @@ -170,10 +171,7 @@ var webCmd = &cli.Command{
Name: "nosync",
Usage: "don't check full-node sync status",
},
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
},
layersFlag,
},
Action: func(cctx *cli.Context) error {

Expand Down
6 changes: 3 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
db := dependencies.DB
full := dependencies.Full
verif := dependencies.Verif
lw := dependencies.LW
as := dependencies.As
maddrs := dependencies.Maddrs
stor := dependencies.Stor
Expand All @@ -61,7 +60,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task

if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler(
ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, chainSched,
ctx, cfg.Fees, cfg.Proving, full, verif, sender, chainSched,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)

if err != nil {
Expand All @@ -72,7 +71,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}

if cfg.Subsystems.EnableWinningPost {
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs)
pl := dependencies.LocalStore
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, pl, verif, full, maddrs)
activeTasks = append(activeTasks, winPoStTask)
needProofParams = true
}
Expand Down
9 changes: 9 additions & 0 deletions curiosrc/build/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package build

// IsOpencl is set to the value of FFI_USE_OPENCL
var IsOpencl string

// Format: 8 HEX then underscore then ISO8701 date
// Ex: 4c5e98f28_2024-05-17T18:42:27-04:00
// NOTE: git date for repeatabile builds.
var Commit string
5 changes: 2 additions & 3 deletions curiosrc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ import (
"github.com/filecoin-project/lotus/node/config"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

//var log = logging.Logger("provider")

func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.CurioProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *message.Sender, chainSched *chainsched.CurioChainSched,
api api.FullNode, verif storiface.Verifier, sender *message.Sender, chainSched *chainsched.CurioChainSched,
as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*window.WdPostTask, *window.WdPostSubmitTask, *window.WdPostRecoverDeclareTask, error) {

// todo config
ft := window.NewSimpleFaultTracker(stor, idx, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout), time.Duration(pc.PartitionCheckTimeout))

computeTask, err := window.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
computeTask, err := window.NewWdPostTask(db, api, ft, stor, verif, chainSched, addresses, max, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout))
if err != nil {
return nil, nil, nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/xerrors"

// TODO everywhere here that we call this we should call our proxy instead.
ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
proof2 "github.com/filecoin-project/go-state-types/proof"

"github.com/filecoin-project/lotus/curiosrc/proof"
"github.com/filecoin-project/lotus/lib/ffiselect"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/proofpaths"
Expand Down Expand Up @@ -256,7 +258,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
}
}

sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed)
sl, uns, err := ffiselect.FFISelect{}.SealPreCommitPhase2(sector.ID, p1o, fspaths.Cache, fspaths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err)
}
Expand Down Expand Up @@ -307,7 +309,7 @@ func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sea
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
}

proof, err := ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner)
proof, err := ffiselect.FFISelect{}.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner)
if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err)
}
Expand Down
Loading

0 comments on commit c1f99c5

Please sign in to comment.