From 26b190c7b2e2803287ba300945c1d6f486e26689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 Mar 2024 13:08:49 +0100 Subject: [PATCH] feat: curio: Move boost proxy into the main binary --- cmd/curio/run.go | 7 + cmd/lotus-shed/deal.go | 375 ------------------------------ curiosrc/market/fakelm/lmimpl.go | 45 ++-- curiosrc/market/lmrpc/lmrpc.go | 379 +++++++++++++++++++++++++++++++ node/config/doc_gen.go | 22 ++ node/config/types.go | 19 ++ node/modules/storageminer_svc.go | 101 -------- 7 files changed, 440 insertions(+), 508 deletions(-) create mode 100644 curiosrc/market/lmrpc/lmrpc.go diff --git a/cmd/curio/run.go b/cmd/curio/run.go index 35fdf4a4d03..ec9ab45ed78 100644 --- a/cmd/curio/run.go +++ b/cmd/curio/run.go @@ -11,12 +11,14 @@ import ( "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/cmd/curio/rpc" "github.com/filecoin-project/lotus/cmd/curio/tasks" + "github.com/filecoin-project/lotus/curiosrc/market/lmrpc" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" @@ -134,6 +136,11 @@ var runCmd = &cli.Command{ if err != nil { return err } + + if err := lmrpc.ServeCurioMarketRPCFromConfig(dependencies.DB, dependencies.Full, dependencies.Cfg); err != nil { + return xerrors.Errorf("starting market RPCs: %w", err) + } + finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, //node.ShutdownHandler{Component: "curio", StopFunc: stop}, diff --git a/cmd/lotus-shed/deal.go b/cmd/lotus-shed/deal.go index ba6979a140b..029e2966643 100644 --- a/cmd/lotus-shed/deal.go +++ b/cmd/lotus-shed/deal.go @@ -2,22 +2,15 @@ package main import ( "bytes" - "context" "fmt" "io" - "net" "net/http" "net/http/httptest" "net/url" "os" - "sync" - "time" "github.com/fatih/color" - "github.com/google/uuid" - "github.com/jackc/pgx/v5" "github.com/mitchellh/go-homedir" - manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -32,19 +25,9 @@ import ( "github.com/filecoin-project/go-state-types/builtin/v9/market" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/cmd/curio/deps" - cumarket "github.com/filecoin-project/lotus/curiosrc/market" - "github.com/filecoin-project/lotus/curiosrc/market/fakelm" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/must" - "github.com/filecoin-project/lotus/lib/nullreader" - "github.com/filecoin-project/lotus/metrics/proxy" - "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var lpUtilCmd = &cli.Command{ @@ -52,7 +35,6 @@ var lpUtilCmd = &cli.Command{ Usage: "lotus provider utility commands", Subcommands: []*cli.Command{ lpUtilStartDealCmd, - lpBoostProxyCmd, }, } @@ -300,360 +282,3 @@ var lpUtilStartDealCmd = &cli.Command{ return nil }, } - -var lpBoostProxyCmd = &cli.Command{ - Name: "boost-proxy", - Usage: "Start a legacy lotus-miner rpc proxy", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "actor-address", - Usage: "Address of the miner actor", - Required: true, - }, - - &cli.StringFlag{ - Name: "db-host", - EnvVars: []string{"LOTUS_DB_HOST"}, - Usage: "Command separated list of hostnames for yugabyte cluster", - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-name", - EnvVars: []string{"LOTUS_DB_NAME", "LOTUS_HARMONYDB_HOSTS"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-user", - EnvVars: []string{"LOTUS_DB_USER", "LOTUS_HARMONYDB_USERNAME"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-password", - EnvVars: []string{"LOTUS_DB_PASSWORD", "LOTUS_HARMONYDB_PASSWORD"}, - Value: "yugabyte", - }, - &cli.StringFlag{ - Name: "db-port", - EnvVars: []string{"LOTUS_DB_PORT", "LOTUS_HARMONYDB_PORT"}, - Hidden: true, - Value: "5433", - }, - &cli.StringFlag{ - Name: "layers", - EnvVars: []string{"LOTUS_LAYERS", "LOTUS_CONFIG_LAYERS"}, - Value: "base", - }, - - &cli.StringFlag{ - Name: "listen", - Usage: "Address to listen on", - Value: ":32100", - }, - }, - Action: func(cctx *cli.Context) error { - ctx := lcli.ReqContext(cctx) - - db, err := deps.MakeDB(cctx) - if err != nil { - return err - } - - maddr, err := address.NewFromString(cctx.String("actor-address")) - if err != nil { - return xerrors.Errorf("parsing miner address: %w", err) - } - - full, closer, err := lcli.GetFullNodeAPIV1(cctx) - if err != nil { - return err - } - - defer closer() - - pin := cumarket.NewPieceIngester(db, full) - - si := paths.NewDBIndex(nil, db) - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return xerrors.Errorf("getting miner id: %w", err) - } - - mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting miner info: %w", err) - } - - lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, cctx.String("layers")) - - laddr, err := net.ResolveTCPAddr("tcp", cctx.String("listen")) - if err != nil { - return xerrors.Errorf("net resolve: %w", err) - } - - if len(laddr.IP) == 0 { - // set localhost - laddr.IP = net.IPv4(127, 0, 0, 1) - } - rootUrl := url.URL{ - Scheme: "http", - Host: laddr.String(), - } - - ast := api.StorageMinerStruct{} - - ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) { - return api.APIVersion{ - Version: "lp-proxy-v0", - APIVersion: api.MinerAPIVersion0, - BlockDelay: build.BlockDelaySecs, - }, nil - } - - ast.CommonStruct.Internal.AuthNew = lp.AuthNew - - ast.Internal.ActorAddress = lp.ActorAddress - ast.Internal.WorkerJobs = lp.WorkerJobs - ast.Internal.SectorsStatus = lp.SectorsStatus - ast.Internal.SectorsList = lp.SectorsList - ast.Internal.SectorsSummary = lp.SectorsSummary - ast.Internal.SectorsListInStates = lp.SectorsListInStates - ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal - ast.Internal.ComputeDataCid = lp.ComputeDataCid - - type pieceInfo struct { - data storiface.Data - size abi.UnpaddedPieceSize - - done chan struct{} - } - - pieceInfoLk := new(sync.Mutex) - pieceInfos := map[uuid.UUID][]pieceInfo{} - - ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { - origPieceData := pieceData - defer func() { - closer, ok := origPieceData.(io.Closer) - if !ok { - log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData) - return - } - if err := closer.Close(); err != nil { - log.Warnw("closing pieceData in DataCid", "error", err) - } - }() - - pi := pieceInfo{ - data: pieceData, - size: pieceSize, - - done: make(chan struct{}), - } - - pieceUUID := uuid.New() - - color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) - - pieceInfoLk.Lock() - pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) - pieceInfoLk.Unlock() - - // /piece?piece_cid=xxxx - dataUrl := rootUrl - dataUrl.Path = "/piece" - dataUrl.RawQuery = "piece_id=" + pieceUUID.String() - - // add piece entry - - var refID int64 - var pieceWasCreated bool - - comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - var pieceID int64 - // Attempt to select the piece ID first - err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) - - if err != nil { - if err == pgx.ErrNoRows { - // Piece does not exist, attempt to insert - err = tx.QueryRow(` - INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) - VALUES ($1, $2, $3) - ON CONFLICT (piece_cid) DO NOTHING - RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) - if err != nil { - return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) - } - pieceWasCreated = true // New piece was created - } else { - // Some other error occurred during select - return false, xerrors.Errorf("checking existing parked piece: %w", err) - } - } else { - pieceWasCreated = false // Piece already exists, no new piece was created - } - - // Add parked_piece_ref - err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) - VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) - if err != nil { - return false, xerrors.Errorf("inserting parked piece ref: %w", err) - } - - // If everything went well, commit the transaction - return true, nil // This will commit the transaction - }, harmonydb.OptionRetry()) - if err != nil { - return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) - } - if !comm { - return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") - } - - // wait for piece to be parked - if pieceWasCreated { - <-pi.done - } else { - // If the piece was not created, we need to close the done channel - close(pi.done) - - go func() { - // close the data reader (drain to eof if it's not a closer) - if closer, ok := pieceData.(io.Closer); ok { - if err := closer.Close(); err != nil { - log.Warnw("closing pieceData in DataCid", "error", err) - } - } else { - log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) - - _, err := io.Copy(io.Discard, pieceData) - if err != nil { - log.Warnw("draining pieceData in DataCid", "error", err) - } - } - }() - } - - pieceIDUrl := url.URL{ - Scheme: "pieceref", - Opaque: fmt.Sprintf("%d", refID), - } - - // make a sector - so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil) - if err != nil { - return api.SectorOffset{}, err - } - - color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset) - - return so, nil - } - - ast.Internal.StorageList = si.StorageList - ast.Internal.StorageDetach = si.StorageDetach - ast.Internal.StorageReportHealth = si.StorageReportHealth - ast.Internal.StorageDeclareSector = si.StorageDeclareSector - ast.Internal.StorageDropSector = si.StorageDropSector - ast.Internal.StorageFindSector = si.StorageFindSector - ast.Internal.StorageInfo = si.StorageInfo - ast.Internal.StorageBestAlloc = si.StorageBestAlloc - ast.Internal.StorageLock = si.StorageLock - ast.Internal.StorageTryLock = si.StorageTryLock - ast.Internal.StorageGetLocks = si.StorageGetLocks - - var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { - // /piece?piece_id=xxxx - pieceUUID := r.URL.Query().Get("piece_id") - - pu, err := uuid.Parse(pieceUUID) - if err != nil { - http.Error(w, "bad piece id", http.StatusBadRequest) - return - } - - if r.Method != http.MethodGet { - http.Error(w, "bad method", http.StatusMethodNotAllowed) - return - } - - fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr) - - pieceInfoLk.Lock() - pis, ok := pieceInfos[pu] - if !ok { - http.Error(w, "piece not found", http.StatusNotFound) - color.Red("%s not found", pu) - pieceInfoLk.Unlock() - return - } - - // pop - pi := pis[0] - pis = pis[1:] - - pieceInfos[pu] = pis - if len(pis) == 0 { - delete(pieceInfos, pu) - } - - pieceInfoLk.Unlock() - - start := time.Now() - - pieceData := io.LimitReader(io.MultiReader( - pi.data, - nullreader.Reader{}, - ), int64(pi.size)) - - n, err := io.Copy(w, pieceData) - close(pi.done) - - took := time.Since(start) - mbps := float64(n) / (1024 * 1024) / took.Seconds() - - if err != nil { - log.Errorf("copying piece data: %s", err) - return - } - - color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps) - } - - finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast) - - mh, err := node.MinerHandler(finalApi, false) // todo permissioned - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle("/piece", pieceHandler) - mux.Handle("/", mh) - - { - tok, err := lp.AuthNew(ctx, api.AllPermissions) - if err != nil { - return err - } - - // parse listen into multiaddr - ma, err := manet.FromNetAddr(laddr) - if err != nil { - return xerrors.Errorf("net from addr (%v): %w", laddr, err) - } - - fmt.Printf("Token: %s:%s\n", tok, ma) - } - - server := &http.Server{ - Addr: cctx.String("listen"), - Handler: mux, - ReadTimeout: 48 * time.Hour, - WriteTimeout: 48 * time.Hour, // really high because we block until TreeD - } - - return server.ListenAndServe() - }, -} diff --git a/curiosrc/market/fakelm/lmimpl.go b/curiosrc/market/fakelm/lmimpl.go index 1f6c5b91dee..9dc19e627d8 100644 --- a/curiosrc/market/fakelm/lmimpl.go +++ b/curiosrc/market/fakelm/lmimpl.go @@ -6,7 +6,6 @@ import ( "net/http" "net/url" - "github.com/BurntSushi/toml" "github.com/gbrlsnchs/jwt/v3" "github.com/google/uuid" "golang.org/x/xerrors" @@ -37,21 +36,21 @@ type LMRPCProvider struct { ssize abi.SectorSize - pi market.Ingester - db *harmonydb.DB - confLayer string + pi market.Ingester + db *harmonydb.DB + conf *config.CurioConfig } -func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, confLayer string) *LMRPCProvider { +func NewLMRPCProvider(si paths.SectorIndex, full api.FullNode, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi market.Ingester, db *harmonydb.DB, conf *config.CurioConfig) *LMRPCProvider { return &LMRPCProvider{ - si: si, - full: full, - maddr: maddr, - minerID: minerID, - ssize: ssize, - pi: pi, - db: db, - confLayer: confLayer, + si: si, + full: full, + maddr: maddr, + minerID: minerID, + ssize: ssize, + pi: pi, + db: db, + conf: conf, } } @@ -330,24 +329,6 @@ func (l *LMRPCProvider) AllocatePieceToSector(ctx context.Context, maddr address } func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([]byte, error) { - var cs []struct { - Config string - } - - err := l.db.Select(ctx, &cs, "select config from harmony_config where title = $1", l.confLayer) - if err != nil { - return nil, err - } - - if len(cs) == 0 { - return nil, xerrors.Errorf("no harmony config found") - } - - lp := config.DefaultCurioConfig() - if _, err := toml.Decode(cs[0].Config, lp); err != nil { - return nil, xerrors.Errorf("decode harmony config: %w", err) - } - type jwtPayload struct { Allow []auth.Permission } @@ -356,7 +337,7 @@ func (l *LMRPCProvider) AuthNew(ctx context.Context, perms []auth.Permission) ([ Allow: perms, } - sk, err := base64.StdEncoding.DecodeString(lp.Apis.StorageRPCSecret) + sk, err := base64.StdEncoding.DecodeString(l.conf.Apis.StorageRPCSecret) if err != nil { return nil, xerrors.Errorf("decode secret: %w", err) } diff --git a/curiosrc/market/lmrpc/lmrpc.go b/curiosrc/market/lmrpc/lmrpc.go new file mode 100644 index 00000000000..5c4c9039c16 --- /dev/null +++ b/curiosrc/market/lmrpc/lmrpc.go @@ -0,0 +1,379 @@ +package lmrpc + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/fatih/color" + "github.com/google/uuid" + logging "github.com/ipfs/go-log/v2" + "github.com/jackc/pgx/v5" + manet "github.com/multiformats/go-multiaddr/net" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + cumarket "github.com/filecoin-project/lotus/curiosrc/market" + "github.com/filecoin-project/lotus/curiosrc/market/fakelm" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/nullreader" + "github.com/filecoin-project/lotus/metrics/proxy" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("lmrpc") + +func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error { + for n, server := range cfg.Subsystems.MarketRPCServers { + n := n + + // server: [f0.. actor address]:[bind address] + // bind address is either a numeric port or a full address + + // first split at first : to get the actor address and the bind address + split := strings.SplitN(server, ":", 2) + + // if the split length is not 2, return an error + if len(split) != 2 { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + + // get the actor address and the bind address + strMaddr, strListen := split[0], split[1] + + maddr, err := address.NewFromString(strMaddr) + if err != nil { + return xerrors.Errorf("parsing actor address: %w", err) + } + + // check the listen address + if strListen == "" { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + // if listen address is numeric, prepend the default host + if _, err := strconv.Atoi(strListen); err == nil { + strListen = "0.0.0.0:" + strListen + } + // check if the listen address is a valid address + if _, _, err := net.SplitHostPort(strListen); err != nil { + return fmt.Errorf("bad market rpc server config %d %s, expected [f0.. actor address]:[bind address]", n, server) + } + + log.Infow("Starting market RPC server", "actor", maddr, "listen", strListen) + + // serve the market rpc + go func() { + err := ServeCurioMarketRPC(db, full, maddr, cfg, strListen) + if err != nil { + log.Errorf("failed to serve market rpc %d for %s: %s", n, maddr, err) + } + }() + } + + return nil +} + +func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Address, conf *config.CurioConfig, listen string) error { + ctx := context.Background() + + pin := cumarket.NewPieceIngester(db, full) + + si := paths.NewDBIndex(nil, db) + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return xerrors.Errorf("getting miner id: %w", err) + } + + mi, err := full.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting miner info: %w", err) + } + + lp := fakelm.NewLMRPCProvider(si, full, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, conf) + + laddr, err := net.ResolveTCPAddr("tcp", listen) + if err != nil { + return xerrors.Errorf("net resolve: %w", err) + } + + if len(laddr.IP) == 0 { + // set localhost + laddr.IP = net.IPv4(127, 0, 0, 1) + } + rootUrl := url.URL{ + Scheme: "http", + Host: laddr.String(), + } + + ast := api.StorageMinerStruct{} + + ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) { + return api.APIVersion{ + Version: "lp-proxy-v0", + APIVersion: api.MinerAPIVersion0, + BlockDelay: build.BlockDelaySecs, + }, nil + } + + ast.CommonStruct.Internal.AuthNew = lp.AuthNew + + ast.Internal.ActorAddress = lp.ActorAddress + ast.Internal.WorkerJobs = lp.WorkerJobs + ast.Internal.SectorsStatus = lp.SectorsStatus + ast.Internal.SectorsList = lp.SectorsList + ast.Internal.SectorsSummary = lp.SectorsSummary + ast.Internal.SectorsListInStates = lp.SectorsListInStates + ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal + ast.Internal.ComputeDataCid = lp.ComputeDataCid + + type pieceInfo struct { + data storiface.Data + size abi.UnpaddedPieceSize + + done chan struct{} + } + + pieceInfoLk := new(sync.Mutex) + pieceInfos := map[uuid.UUID][]pieceInfo{} + + ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { + origPieceData := pieceData + defer func() { + closer, ok := origPieceData.(io.Closer) + if !ok { + log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData) + return + } + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + }() + + pi := pieceInfo{ + data: pieceData, + size: pieceSize, + + done: make(chan struct{}), + } + + pieceUUID := uuid.New() + + color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) + + pieceInfoLk.Lock() + pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) + pieceInfoLk.Unlock() + + // /piece?piece_cid=xxxx + dataUrl := rootUrl + dataUrl.Path = "/piece" + dataUrl.RawQuery = "piece_id=" + pieceUUID.String() + + // add piece entry + + var refID int64 + var pieceWasCreated bool + + comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + var pieceID int64 + // Attempt to select the piece ID first + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + + if err != nil { + if err == pgx.ErrNoRows { + // Piece does not exist, attempt to insert + err = tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) + VALUES ($1, $2, $3) + ON CONFLICT (piece_cid) DO NOTHING + RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + } + pieceWasCreated = true // New piece was created + } else { + // Some other error occurred during select + return false, xerrors.Errorf("checking existing parked piece: %w", err) + } + } else { + pieceWasCreated = false // Piece already exists, no new piece was created + } + + // Add parked_piece_ref + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) + VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + // If everything went well, commit the transaction + return true, nil // This will commit the transaction + }, harmonydb.OptionRetry()) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) + } + if !comm { + return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") + } + + // wait for piece to be parked + if pieceWasCreated { + <-pi.done + } else { + // If the piece was not created, we need to close the done channel + close(pi.done) + + go func() { + // close the data reader (drain to eof if it's not a closer) + if closer, ok := pieceData.(io.Closer); ok { + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + } else { + log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) + + _, err := io.Copy(io.Discard, pieceData) + if err != nil { + log.Warnw("draining pieceData in DataCid", "error", err) + } + } + }() + } + + pieceIDUrl := url.URL{ + Scheme: "pieceref", + Opaque: fmt.Sprintf("%d", refID), + } + + // make a sector + so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil) + if err != nil { + return api.SectorOffset{}, err + } + + color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset) + + return so, nil + } + + ast.Internal.StorageList = si.StorageList + ast.Internal.StorageDetach = si.StorageDetach + ast.Internal.StorageReportHealth = si.StorageReportHealth + ast.Internal.StorageDeclareSector = si.StorageDeclareSector + ast.Internal.StorageDropSector = si.StorageDropSector + ast.Internal.StorageFindSector = si.StorageFindSector + ast.Internal.StorageInfo = si.StorageInfo + ast.Internal.StorageBestAlloc = si.StorageBestAlloc + ast.Internal.StorageLock = si.StorageLock + ast.Internal.StorageTryLock = si.StorageTryLock + ast.Internal.StorageGetLocks = si.StorageGetLocks + + var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { + // /piece?piece_id=xxxx + pieceUUID := r.URL.Query().Get("piece_id") + + pu, err := uuid.Parse(pieceUUID) + if err != nil { + http.Error(w, "bad piece id", http.StatusBadRequest) + return + } + + if r.Method != http.MethodGet { + http.Error(w, "bad method", http.StatusMethodNotAllowed) + return + } + + fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr) + + pieceInfoLk.Lock() + pis, ok := pieceInfos[pu] + if !ok { + http.Error(w, "piece not found", http.StatusNotFound) + color.Red("%s not found", pu) + pieceInfoLk.Unlock() + return + } + + // pop + pi := pis[0] + pis = pis[1:] + + pieceInfos[pu] = pis + if len(pis) == 0 { + delete(pieceInfos, pu) + } + + pieceInfoLk.Unlock() + + start := time.Now() + + pieceData := io.LimitReader(io.MultiReader( + pi.data, + nullreader.Reader{}, + ), int64(pi.size)) + + n, err := io.Copy(w, pieceData) + close(pi.done) + + took := time.Since(start) + mbps := float64(n) / (1024 * 1024) / took.Seconds() + + if err != nil { + log.Errorf("copying piece data: %s", err) + return + } + + color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps) + } + + finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast) + + mh, err := node.MinerHandler(finalApi, false) // todo permissioned + if err != nil { + return err + } + + mux := http.NewServeMux() + mux.Handle("/piece", pieceHandler) + mux.Handle("/", mh) + + { + tok, err := lp.AuthNew(ctx, api.AllPermissions) + if err != nil { + return err + } + + // parse listen into multiaddr + ma, err := manet.FromNetAddr(laddr) + if err != nil { + return xerrors.Errorf("net from addr (%v): %w", laddr, err) + } + + fmt.Printf("Token: %s:%s\n", tok, ma) + } + + server := &http.Server{ + Addr: listen, + Handler: mux, + ReadTimeout: 48 * time.Hour, + WriteTimeout: 48 * time.Hour, // really high because we block until pieces are saved in PiecePark + } + + return server.ListenAndServe() +} diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index dcb832976dd..e3aeb5e8c95 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -458,6 +458,28 @@ SDRTrees machine into long-term storage. This task runs after the Finalize task. Comment: `The maximum amount of MoveStorage tasks that can run simultaneously. Note that the maximum number of tasks will also be bounded by resources available on the machine. It is recommended that this value is set to a number which uses all available network (or disk) bandwidth on the machine without causing bottlenecks.`, + }, + { + Name: "MarketRPCServers", + Type: "[]string", + + Comment: `MarketRPCServers is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests. +This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations. +Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0 +Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified. + +When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the +deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one +node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data. +This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was +received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for +the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are +sealed. + +To get API info for boost configuration run 'curio cli market rpc-info' + +NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on +a machine which handles ParkPiece tasks.`, }, { Name: "EnableWebGui", diff --git a/node/config/types.go b/node/config/types.go index b86da656933..78007c9a4d6 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -200,6 +200,25 @@ type CurioSubsystemsConfig struct { // uses all available network (or disk) bandwidth on the machine without causing bottlenecks. MoveStorageMaxTasks int + // MarketRPCServers is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests. + // This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations. + // Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0 + // Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified. + // + // When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the + // deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one + // node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data. + // This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was + // received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for + // the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are + // sealed. + // + // To get API info for boost configuration run 'curio cli market rpc-info' + // + // NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on + // a machine which handles ParkPiece tasks. + MarketRPCServers []string + // EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should // only need to be run on a single machine in the cluster. EnableWebGui bool diff --git a/node/modules/storageminer_svc.go b/node/modules/storageminer_svc.go index 1a909b4ec93..17eb987ef39 100644 --- a/node/modules/storageminer_svc.go +++ b/node/modules/storageminer_svc.go @@ -2,26 +2,15 @@ package modules import ( "context" - "strings" "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/chain/types" cliutil "github.com/filecoin-project/lotus/cli/util" - "github.com/filecoin-project/lotus/curiosrc/market" - "github.com/filecoin-project/lotus/curiosrc/market/fakelm" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/helpers" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -30,98 +19,8 @@ type MinerStorageService api.StorageMiner var _ sectorblocks.SectorBuilder = *new(MinerSealingService) -func harmonyApiInfoToConf(apiInfo string) (config.HarmonyDB, error) { - hc := config.HarmonyDB{} - - // apiInfo - harmony:layer:maddr:user:pass:dbname:host:port - - parts := strings.Split(apiInfo, ":") - - if len(parts) != 8 { - return config.HarmonyDB{}, xerrors.Errorf("invalid harmonydb info '%s'", apiInfo) - } - - hc.Username = parts[3] - hc.Password = parts[4] - hc.Database = parts[5] - hc.Hosts = []string{parts[6]} - hc.Port = parts[7] - - return hc, nil -} - -func connectHarmony(apiInfo string, fapi v1api.FullNode, mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { - log.Info("Connecting to harmonydb") - - hc, err := harmonyApiInfoToConf(apiInfo) - if err != nil { - return nil, err - } - - db, err := harmonydb.NewFromConfig(hc) - if err != nil { - return nil, xerrors.Errorf("connecting to harmonydb: %w", err) - } - - parts := strings.Split(apiInfo, ":") - maddr, err := address.NewFromString(parts[2]) - if err != nil { - return nil, xerrors.Errorf("parsing miner address: %w", err) - } - - pin := market.NewPieceIngester(db, fapi) - - si := paths.NewDBIndex(nil, db) - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return nil, xerrors.Errorf("getting miner id: %w", err) - } - - mi, err := fapi.StateMinerInfo(mctx, maddr, types.EmptyTSK) - if err != nil { - return nil, xerrors.Errorf("getting miner info: %w", err) - } - - lp := fakelm.NewLMRPCProvider(si, fapi, maddr, abi.ActorID(mid), mi.SectorSize, pin, db, parts[1]) - - ast := api.StorageMinerStruct{} - - ast.CommonStruct.Internal.AuthNew = lp.AuthNew - - ast.Internal.ActorAddress = lp.ActorAddress - ast.Internal.WorkerJobs = lp.WorkerJobs - ast.Internal.SectorsStatus = lp.SectorsStatus - ast.Internal.SectorsList = lp.SectorsList - ast.Internal.SectorsSummary = lp.SectorsSummary - ast.Internal.SectorsListInStates = lp.SectorsListInStates - ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal - ast.Internal.ComputeDataCid = lp.ComputeDataCid - ast.Internal.SectorAddPieceToAny = func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data, p3 api.PieceDealInfo) (api.SectorOffset, error) { - panic("implement me") - } - - ast.Internal.StorageList = si.StorageList - ast.Internal.StorageDetach = si.StorageDetach - ast.Internal.StorageReportHealth = si.StorageReportHealth - ast.Internal.StorageDeclareSector = si.StorageDeclareSector - ast.Internal.StorageDropSector = si.StorageDropSector - ast.Internal.StorageFindSector = si.StorageFindSector - ast.Internal.StorageInfo = si.StorageInfo - ast.Internal.StorageBestAlloc = si.StorageBestAlloc - ast.Internal.StorageLock = si.StorageLock - ast.Internal.StorageTryLock = si.StorageTryLock - ast.Internal.StorageGetLocks = si.StorageGetLocks - - return &ast, nil -} - func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { - if strings.HasPrefix(apiInfo, "harmony:") { - return connectHarmony(apiInfo, fapi, mctx, lc) - } - ctx := helpers.LifecycleCtx(mctx, lc) info := cliutil.ParseApiInfo(apiInfo) addr, err := info.DialArgs("v0")