Skip to content

Commit

Permalink
curio: Command for generating market tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 27, 2024
1 parent 26b190c commit b8a996d
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/curio/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.CurioConfig, error)
for _, k := range meta.Keys() {
have = append(have, strings.Join(k, " "))
}
log.Infow("Using layer", "layer", layer, "config", curioConfig)
log.Debugw("Using layer", "layer", layer, "config", curioConfig)
}
_ = have // FUTURE: verify that required fields are here.
// If config includes 3rd-party config, consider JSONSchema as a way that
Expand Down
1 change: 1 addition & 0 deletions cmd/curio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func main() {
guidedsetup.GuidedsetupCmd,
configMigrateCmd,
sealCmd,
marketCmd,
}

jaeger := tracing.SetupJaegerTracing("curio")
Expand Down
70 changes: 70 additions & 0 deletions cmd/curio/market.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"fmt"
"sort"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/curiosrc/market/lmrpc"
)

var marketCmd = &cli.Command{
Name: "market",
Subcommands: []*cli.Command{
marketRPCInfoCmd,
},
}

var marketRPCInfoCmd = &cli.Command{
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
},
},
Action: func(cctx *cli.Context) error {
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}

cfg, err := deps.GetConfig(cctx, db)
if err != nil {
return xerrors.Errorf("get config: %w", err)
}

ts, err := lmrpc.MakeTokens(cfg)
if err != nil {
return xerrors.Errorf("make tokens: %w", err)
}

var addrTokens []struct {
Address string
Token string
}

for address, s := range ts {
addrTokens = append(addrTokens, struct {
Address string
Token string
}{
Address: address.String(),
Token: s,
})
}

sort.Slice(addrTokens, func(i, j int) bool {
return addrTokens[i].Address < addrTokens[j].Address
})

for _, at := range addrTokens {
fmt.Printf("%s %s\n", at.Address, at.Token)
}

return nil
},
Name: "rpc-info",
}
99 changes: 71 additions & 28 deletions curiosrc/market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync"
"time"

"github.com/fatih/color"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
Expand All @@ -39,6 +38,68 @@ import (
var log = logging.Logger("lmrpc")

func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
addr, err := address.NewFromString(maddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}

go func() {
err := ServeCurioMarketRPC(db, full, addr, cfg, listen)
if err != nil {
log.Errorf("failed to serve market rpc: %s", err)
}
}()

return nil
})
}

func MakeTokens(cfg *config.CurioConfig) (map[address.Address]string, error) {
out := map[address.Address]string{}

err := forEachMarketRPC(cfg, func(smaddr string, listen string) error {
ctx := context.Background()

laddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}

if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}

// need minimal provider with just the config
lp := fakelm.NewLMRPCProvider(nil, nil, address.Undef, 0, 0, nil, nil, cfg)

tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}

// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}

maddr, err := address.NewFromString(smaddr)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}

token := fmt.Sprintf("%s:%s", tok, ma)
out[maddr] = token

return nil
})

return out, err
}

func forEachMarketRPC(cfg *config.CurioConfig, cb func(string, string) error) error {
for n, server := range cfg.Subsystems.MarketRPCServers {
n := n

Expand Down Expand Up @@ -76,13 +137,9 @@ func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *con

log.Infow("Starting market RPC server", "actor", maddr, "listen", strListen)

// serve the market rpc
go func() {
err := ServeCurioMarketRPC(db, full, maddr, cfg, strListen)
if err != nil {
log.Errorf("failed to serve market rpc %d for %s: %s", n, maddr, err)
}
}()
if err := cb(strMaddr, strListen); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -125,7 +182,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

ast.CommonStruct.Internal.Version = func(ctx context.Context) (api.APIVersion, error) {
return api.APIVersion{
Version: "lp-proxy-v0",
Version: "curio-proxy-v0",
APIVersion: api.MinerAPIVersion0,
BlockDelay: build.BlockDelaySecs,
}, nil
Expand Down Expand Up @@ -174,7 +231,8 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

pieceUUID := uuid.New()

color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
//color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
log.Infow("piece assign request", "piece_cid", deal.DealProposal.PieceCID, "provider", deal.DealProposal.Provider, "piece_uuid", pieceUUID)

pieceInfoLk.Lock()
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
Expand Down Expand Up @@ -267,7 +325,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
return api.SectorOffset{}, err
}

color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
log.Infow("piece assigned to sector", "piece_cid", deal.DealProposal.PieceCID, "sector", so.Sector, "offset", so.Offset)

return so, nil
}
Expand Down Expand Up @@ -305,7 +363,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
pis, ok := pieceInfos[pu]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
color.Red("%s not found", pu)
log.Warnw("piece not found", "piece_uuid", pu)
pieceInfoLk.Unlock()
return
}
Expand Down Expand Up @@ -339,7 +397,7 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
return
}

color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps)
log.Infow("piece served", "piece_uuid", pu, "size", float64(n)/(1024*1024), "duration", took, "speed", mbps)
}

finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
Expand All @@ -353,21 +411,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
mux.Handle("/piece", pieceHandler)
mux.Handle("/", mh)

{
tok, err := lp.AuthNew(ctx, api.AllPermissions)
if err != nil {
return err
}

// parse listen into multiaddr
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}

fmt.Printf("Token: %s:%s\n", tok, ma)
}

server := &http.Server{
Addr: listen,
Handler: mux,
Expand Down
30 changes: 30 additions & 0 deletions documentation/en/cli-curio.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ COMMANDS:
guided-setup Run the guided setup for migrating from lotus-miner to Curio
from-miner Express a database config (for curio) from an existing miner.
seal Manage the sealing pipeline
market
version Print version
help, h Shows a list of commands or help for one command
DEVELOPER:
Expand Down Expand Up @@ -386,6 +387,35 @@ OPTIONS:
--help, -h show help
```

## curio market
```
NAME:
curio market
USAGE:
curio market command [command options] [arguments...]
COMMANDS:
rpc-info
help, h Shows a list of commands or help for one command
OPTIONS:
--help, -h show help
```

### curio market rpc-info
```
NAME:
curio market rpc-info
USAGE:
curio market rpc-info [command options] [arguments...]
OPTIONS:
--layers value [ --layers value ] list of layers to be interpreted (atop defaults). Default: base
--help, -h show help
```

## curio version
```
NAME:
Expand Down
21 changes: 21 additions & 0 deletions documentation/en/default-curio-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,27 @@
# type: int
#MoveStorageMaxTasks = 0

# MarketRPCServers is a list of tuples of miner address and port/ip to listen for market (e.g. boost) requests.
# This interface is compatible with the lotus-miner RPC, implementing a subset needed for storage market operations.
# Strings should be in the format "actor:port" or "actor:ip:port". Default listen address is 0.0.0.0
# Example: "f0123:32100", "f0123:127.0.0.1:32100". Multiple addresses can be specified.
#
# When a market node like boost gives Curio's market RPC a deal to placing into a sector, Curio will first store the
# deal data in a temporary location "Piece Park" before assigning it to a sector. This requires that at least one
# node in the cluster has the EnableParkPiece option enabled and has sufficient scratch space to store the deal data.
# This is different from lotus-miner which stored the deal data into an "unsealed" sector as soon as the deal was
# received. Deal data in PiecePark is accessed when the sector TreeD and TreeR are computed, but isn't needed for
# the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
# sealed.
#
# To get API info for boost configuration run 'curio market rpc-info'
#
# NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
# a machine which handles ParkPiece tasks.
#
# type: []string
#MarketRPCServers = []

# EnableWebGui enables the web GUI on this lotus-provider instance. The UI has minimal local overhead, but it should
# only need to be run on a single machine in the cluster.
#
Expand Down
3 changes: 2 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ const (
func DefaultCurioConfig() *CurioConfig {
return &CurioConfig{
Subsystems: CurioSubsystemsConfig{
GuiAddress: ":4701",
GuiAddress: ":4701",
MarketRPCServers: []string{},
},
Fees: CurioFees{
DefaultMaxFee: DefaultDefaultMaxFee,
Expand Down
2 changes: 1 addition & 1 deletion node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type CurioSubsystemsConfig struct {
// the initial SDR layers computation. Pieces in PiecePark are removed after all sectors referencing the piece are
// sealed.
//
// To get API info for boost configuration run 'curio cli market rpc-info'
// To get API info for boost configuration run 'curio market rpc-info'
//
// NOTE: All deal data will flow through this service, so it should be placed on a machine running boost or on
// a machine which handles ParkPiece tasks.
Expand Down

0 comments on commit b8a996d

Please sign in to comment.