Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Market LibP2P improvements #280

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 0 additions & 122 deletions cmd/curio/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"bufio"
"crypto/rand"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -11,8 +10,6 @@ import (
"strings"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
Expand All @@ -37,7 +34,6 @@ var marketCmd = &cli.Command{
marketSealCmd,
marketAddOfflineURLCmd,
marketMoveToEscrowCmd,
marketLibp2pCmd,
},
}

Expand Down Expand Up @@ -339,121 +335,3 @@ var marketMoveToEscrowCmd = &cli.Command{
return merr
},
}

var marketLibp2pCmd = &cli.Command{
Name: "libp2p",
Usage: "Libp2p key operations",
Subcommands: []*cli.Command{
libp2pShowCmd,
libp2pGenerateCmd,
},
}

var libp2pShowCmd = &cli.Command{
Name: "peerID",
Usage: "Show Libp2p peer id for the provided miner actor",
ArgsUsage: "<Miner ID>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("incorrect number of agruments")
}

act, err := address.NewFromString(cctx.Args().First())
if err != nil {
return xerrors.Errorf("parsing miner ID: %w", err)
}

ctx := cctx.Context
dep, err := deps.GetDepsCLI(ctx, cctx)
if err != nil {
return err
}

mid, err := address.IDFromAddress(act)
if err != nil {
return err
}

var pb []byte
err = dep.DB.QueryRow(ctx, `SELECT priv_key FROM libp2p where sp_id = $1`, mid).Scan(&pb)
if err != nil {
return xerrors.Errorf("failed to get the private key from db: %w", err)
}

pkey, err := crypto.UnmarshalPrivateKey(pb)
if err != nil {
return fmt.Errorf("failed to unmarshal private key: %w", err)
}

id, err := peer.IDFromPublicKey(pkey.GetPublic())
if err != nil {
return fmt.Errorf("getting peer ID: %w", err)
}

fmt.Println("PeerID for Miner", act.String(), id.String())

return nil

},
}

var libp2pGenerateCmd = &cli.Command{
Name: "generate-key",
Usage: "Generates a new Libp2p key for the miner ID",
ArgsUsage: "<Miner ID>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("incorrect number of agruments")
}

act, err := address.NewFromString(cctx.Args().First())
if err != nil {
return xerrors.Errorf("parsing miner ID: %w", err)
}

ctx := cctx.Context
dep, err := deps.GetDepsCLI(ctx, cctx)
if err != nil {
return err
}

pk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return xerrors.Errorf("failed to generate a new key: %w", err)
}

kbytes, err := crypto.MarshalPrivateKey(pk)
if err != nil {
return xerrors.Errorf("failed to marshal the private key: %w", err)
}

mid, err := address.IDFromAddress(act)
if err != nil {
return err
}

n, err := dep.DB.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes)
if err != nil {
return xerrors.Errorf("failed to to insert the key into DB: %w", err)
}

if n == 0 {
return xerrors.Errorf("No new was created. Check if key already exists")
}

if n > 1 {
return xerrors.Errorf("%d rows affected in DB when 1 was expected", n)
}

fmt.Println("New Key created for the miner", act)

id, err := peer.IDFromPublicKey(pk.GetPublic())
if err != nil {
return xerrors.Errorf("getting peer ID: %w", err)
}

fmt.Println("PeerID for Miner", act.String(), id.String())

return nil
},
}
6 changes: 2 additions & 4 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, sealingTasks...)
}

minerAddresses := make([]string, 0, len(maddrs))
miners := make([]address.Address, 0, len(maddrs))
for k := range maddrs {
miners = append(miners, address.Address(k))
minerAddresses = append(minerAddresses, address.Address(k).String())
}

{
Expand Down Expand Up @@ -235,7 +233,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, psdTask, dealFindTask)

// Start libp2p hosts and handle streams
err = libp2p.NewDealProvider(ctx, db, cfg, dm.MK12Handler, full, machine)
err = libp2p.NewDealProvider(ctx, db, cfg, dm.MK12Handler, full, sender, miners, machine)
if err != nil {
return nil, err
}
Expand All @@ -257,7 +255,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, amTask)

log.Infow("This Curio instance handles",
"miner_addresses", minerAddresses,
"miner_addresses", miners,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

// harmony treats the first task as highest priority, so reverse the order
Expand Down
41 changes: 39 additions & 2 deletions cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
ipni_provider "github.com/filecoin-project/curio/market/ipni/ipni-provider"
"github.com/filecoin-project/curio/market/libp2p"
"github.com/filecoin-project/curio/market/retrieval"
)

Expand Down Expand Up @@ -68,6 +69,38 @@ func compressionMiddleware(config *config.CompressionConfig) (func(http.Handler)
return adapter, nil
}

// libp2pConnMiddleware intercepts WebSocket upgrade requests to "/" and rewrites the path to "/libp2p"
func libp2pConnMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request path is "/"
if r.URL.Path == "/" {
// Check if the request is a WebSocket upgrade request
if isWebSocketUpgrade(r) {
// Rewrite the path to "/libp2p"
r.URL.Path = "/libp2p"
// Update RequestURI in case downstream handlers use it
r.RequestURI = "/libp2p"
}
}
// Call the next handler in the chain
next.ServeHTTP(w, r)
})
}

// isWebSocketUpgrade checks if the request is a WebSocket upgrade request
func isWebSocketUpgrade(r *http.Request) bool {
if r.Method != http.MethodGet {
return false
}
if r.Header.Get("Upgrade") != "websocket" {
return false
}
if r.Header.Get("Connection") != "Upgrade" {
return false
}
return true
}

func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
ch := cache{db: d.DB}
cfg := d.Cfg.HTTP
Expand Down Expand Up @@ -106,7 +139,7 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
})

// Root path handler (simpler routes handled by http.ServeMux)
chiRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
chiRouter.Get("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Hello, World!\n -Curio\n")
})
Expand All @@ -125,7 +158,7 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
// Set up the HTTP server with proper timeouts
server := &http.Server{
Addr: cfg.ListenAddress,
Handler: loggingMiddleware(compressionMw(chiRouter)), // Attach middlewares
Handler: libp2pConnMiddleware(loggingMiddleware(compressionMw(chiRouter))), // Attach middlewares
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
IdleTimeout: cfg.IdleTimeout,
Expand Down Expand Up @@ -210,5 +243,9 @@ func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps) (*chi.Mux, err

go ipp.StartPublishing(ctx)

// Attach LibP2P redirector
rd := libp2p.NewRedirector(d.DB)
libp2p.Router(r, rd)

return r, nil
}
92 changes: 28 additions & 64 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading