Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

market: Backport fixes from the PDP branch #275

Merged
merged 11 commits into from
Oct 14, 2024
Merged
8 changes: 7 additions & 1 deletion cmd/curio/guidedsetup/guidedsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,13 @@ func completeInit(d *MigrationData) {
os.Exit(1)
}

n, err := d.DB.Exec(d.ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, pk)
kbytes, err := crypto.MarshalPrivateKey(pk)
if err != nil {
d.say(notice, "Failed to marshal libp2p private key.", err.Error())
os.Exit(1)
}

n, err := d.DB.Exec(d.ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes)
if err != nil {
d.say(notice, "Failed to insert libp2p private key into database. Please run 'curio market libp2p generate-key minerID' to complete the migration.", err.Error())
os.Exit(1)
Expand Down
7 changes: 6 additions & 1 deletion cmd/curio/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,17 @@ var libp2pGenerateCmd = &cli.Command{
return xerrors.Errorf("failed to generate a new key: %w", err)
}

kbytes, err := crypto.MarshalPrivateKey(pk)
if err != nil {
return xerrors.Errorf("failed to marshal the private key: %w", err)
}

mid, err := address.IDFromAddress(act)
if err != nil {
return err
}

n, err := dep.DB.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, pk)
n, err := dep.DB.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes)
if err != nil {
return xerrors.Errorf("failed to to insert the key into DB: %w", err)
}
Expand Down
20 changes: 12 additions & 8 deletions cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/go-chi/chi/v5/middleware"
"github.com/gorilla/handlers"
logging "github.com/ipfs/go-log/v2"
"github.com/yugabyte/pgx/v5"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/deps"
Expand Down Expand Up @@ -108,7 +108,7 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
// Root path handler (simpler routes handled by http.ServeMux)
chiRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Hello, World!\n -Curio")
fmt.Fprintf(w, "Hello, World!\n -Curio\n")
})

// Status endpoint to check the health of the service
Expand Down Expand Up @@ -138,15 +138,14 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
// We don't need to run an HTTP server. Any HTTP request should simply be handled as HTTPS.

// Start the server with TLS
eg := errgroup.Group{}
eg.Go(func() error {
go func() {
log.Infof("Starting HTTPS server for https://%s on %s", cfg.DomainName, cfg.ListenAddress)
serr := server.ListenAndServeTLS("", "")
if serr != nil {
return xerrors.Errorf("failed to start listening: %w", serr)
log.Errorf("Failed to start HTTPS server: %s", serr)
panic(serr)
}
return nil
})
}()

go func() {
<-ctx.Done()
Expand All @@ -157,7 +156,7 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
log.Warn("HTTP Server graceful shutdown successful")
}()

return eg.Wait()
return nil
}

type cache struct {
Expand All @@ -168,6 +167,11 @@ func (c cache) Get(ctx context.Context, key string) ([]byte, error) {
var ret []byte
err := c.db.QueryRow(ctx, `SELECT v FROM autocert_cache WHERE k = $1`, key).Scan(&ret)
if err != nil {
if err == pgx.ErrNoRows {
return nil, autocert.ErrCacheMiss
}

log.Warnf("failed to get the value from DB for key %s: %s", key, err)
return nil, xerrors.Errorf("failed to get the value from DB for key %s: %w", key, err)
}
return ret, nil
Expand Down
4 changes: 2 additions & 2 deletions harmony/harmonydb/sql/20240731-market-migration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ CREATE TABLE market_mk12_deal_pipeline (
started BOOLEAN DEFAULT FALSE,

piece_cid TEXT NOT NULL,
piece_size BIGINT NOT NULL,
piece_size BIGINT NOT NULL, -- padded size
raw_size BIGINT DEFAULT NULL,

offline BOOLEAN NOT NULL,
Expand All @@ -169,7 +169,7 @@ CREATE TABLE market_mk12_deal_pipeline (

sector BIGINT DEFAULT NULL,
reg_seal_proof INT DEFAULT NULL,
sector_offset BIGINT DEFAULT NULL,
sector_offset BIGINT DEFAULT NULL, -- padded offset

sealed BOOLEAN DEFAULT FALSE,

Expand Down
15 changes: 14 additions & 1 deletion lib/cachedreader/cachedreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -209,5 +210,17 @@ func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCid
return nil, 0, r.err
}

return r.reader, r.pieceSize, nil
rs := io.NewSectionReader(r.reader, 0, int64(r.pieceSize))

return struct {
io.Closer
io.Reader
io.ReaderAt
io.Seeker
}{
Closer: r,
Reader: rs,
Seeker: rs,
ReaderAt: r.reader,
}, r.pieceSize, nil
}
93 changes: 39 additions & 54 deletions lib/dealdata/urlpiecereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"io"
"net/http"
"net/url"
"os"

"golang.org/x/xerrors"
)
Expand All @@ -27,6 +26,40 @@ func NewUrlReader(p string, h http.Header, rs int64) *UrlPieceReader {
}
}

func (u *UrlPieceReader) initiateRequest() error {
goUrl, err := url.Parse(u.Url)
if err != nil {
return xerrors.Errorf("failed to parse the URL: %w", err)
}

if goUrl.Scheme != "https" && goUrl.Scheme != "http" {
return xerrors.Errorf("URL scheme %s not supported", goUrl.Scheme)
}

req, err := http.NewRequest(http.MethodGet, goUrl.String(), nil)
if err != nil {
return xerrors.Errorf("error creating request: %w", err)
}

// Add custom headers for security and authentication
req.Header = u.Headers

// Create a client and make the request
client := &http.Client{}

resp, err := client.Do(req)
if err != nil {
return xerrors.Errorf("error making GET request: %w", err)
}
if resp.StatusCode != 200 {
return xerrors.Errorf("a non 200 response code: %s", resp.Status)
}

// Set 'active' to the response body
u.active = resp.Body
return nil
}

func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
// Check if we have already read the required amount of data
if u.readSoFar >= u.RawSize {
Expand All @@ -35,60 +68,9 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) {

// If 'active' is nil, initiate the HTTP request
if u.active == nil {
goUrl, err := url.Parse(u.Url)
err := u.initiateRequest()
if err != nil {
return 0, xerrors.Errorf("failed to parse the URL: %w", err)
}

if goUrl.Scheme != "https" && goUrl.Scheme != "http" {
return 0, xerrors.Errorf("URL scheme %s not supported", goUrl.Scheme)
}

req, err := http.NewRequest(http.MethodGet, goUrl.String(), nil)
if err != nil {
return 0, xerrors.Errorf("error creating request: %w", err)
}

// Add custom headers for security and authentication
req.Header = u.Headers

// Create a client and make the request
client := &http.Client{}

resp, err := client.Do(req)
if err != nil {
return 0, xerrors.Errorf("error making GET request: %w", err)
}
if resp.StatusCode != 200 {
return 0, xerrors.Errorf("a non 200 response code: %s", resp.Status)
}

if goUrl.Scheme == "file" {
fileUrl := goUrl.Path
file, err := os.Open(fileUrl)
if err != nil {
return 0, xerrors.Errorf("error opening file: %w", err)
}
u.active = file
} else {
req, err := http.NewRequest(http.MethodGet, u.Url, nil)
if err != nil {
return 0, xerrors.Errorf("error creating request: %w", err)
}
// Add custom headers for security and authentication
req.Header = u.Headers
// Create a client and make the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return 0, xerrors.Errorf("error making GET request: %w", err)
}
if resp.StatusCode != 200 {
return 0, xerrors.Errorf("a non 200 response code: %s", resp.Status)
}

// Set 'active' to the response body
u.active = resp.Body
return 0, err
}
}

Expand Down Expand Up @@ -129,6 +111,9 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
func (u *UrlPieceReader) Close() error {
if !u.closed {
u.closed = true
if u.active == nil {
return nil
}
return u.active.Close()
}

Expand Down
4 changes: 2 additions & 2 deletions market/ipni/chunker/initial-chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *InitialChunker) finishDB(ctx context.Context, db *harmonydb.DB, pieceCi
// Prepare the insert statement
batch.Queue(`
INSERT INTO ipni_chunks (cid, piece_cid, chunk_num, first_cid, start_offset, num_blocks, from_car)
VALUES ($1, $2, $3, $4, $5, $6, $7)
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING
`, link.String(), pieceCid.String(), i, firstCID.HexString(), startOffset, numBlocks, false)
}

Expand Down Expand Up @@ -235,7 +235,7 @@ func (c *InitialChunker) finishCAR(ctx context.Context, db *harmonydb.DB, pieceC
// Prepare the insert statement
batch.Queue(`
INSERT INTO ipni_chunks (cid, piece_cid, chunk_num, first_cid, start_offset, num_blocks, from_car)
VALUES ($1, $2, $3, $4, $5, $6, $7)
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING
`, link.String(), pieceCid.String(), i, nil, startOffset, numBlocks, true)
}

Expand Down
2 changes: 1 addition & 1 deletion market/ipni/ipni-provider/ipni-provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
return nil, err
}

announceURLs := make([]*url.URL, 0, len(d.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs))
announceURLs := make([]*url.URL, len(d.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs))

for i, us := range d.Cfg.Market.StorageMarketConfig.IPNI.DirectAnnounceURLs {
u, err := url.Parse(us)
Expand Down
4 changes: 2 additions & 2 deletions market/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func NewRetrievalProvider(ctx context.Context, db *harmonydb.DB, idxStore *index
}

func Router(mux *chi.Mux, rp *Provider) {
mux.Get(piecePrefix, rp.handleByPieceCid)
mux.Get(ipfsPrefix, rp.fr.ServeHTTP)
mux.Get(piecePrefix+"{cid}", rp.handleByPieceCid)
mux.Get(ipfsPrefix+"{cid}", rp.fr.ServeHTTP)
}
16 changes: 9 additions & 7 deletions tasks/indexing/task_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/passcall"
"github.com/filecoin-project/curio/lib/pieceprovider"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do

// Check if piece is already indexed
var indexed bool
err = i.db.Select(ctx, &indexed, `SELECT indexed FROM market_piece_metadata WHERE piece_cid = $1`, task.PieceCid)
err = i.db.QueryRow(ctx, `SELECT indexed FROM market_piece_metadata WHERE piece_cid = $1`, task.PieceCid).Scan(&indexed)
if err != nil {
return false, xerrors.Errorf("checking if piece is already indexed: %w", err)
}
Expand Down Expand Up @@ -137,7 +138,7 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
Number: task.Sector,
},
ProofType: task.Proof,
}, storiface.UnpaddedByteIndex(task.Offset), abi.UnpaddedPieceSize(task.Size), pieceCid)
}, storiface.UnpaddedByteIndex(abi.PaddedPieceSize(task.Offset).Unpadded()), task.Size.Unpadded(), pieceCid)

if err != nil {
return false, xerrors.Errorf("getting piece reader: %w", err)
Expand Down Expand Up @@ -193,7 +194,7 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
return false, xerrors.Errorf("adding index to DB: %w", err)
}

log.Infof("Indexing deal %s took %d seconds", task.UUID, time.Since(startTime).Seconds())
log.Infof("Indexing deal %s took %0.3f seconds", task.UUID, time.Since(startTime).Seconds())

err = i.recordCompletion(ctx, task, taskID, true)
if err != nil {
Expand Down Expand Up @@ -236,9 +237,9 @@ func (i *IndexingTask) recordCompletion(ctx context.Context, task itask, taskID

func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
var tasks []struct {
TaskID harmonytask.TaskID `db:"task_id"`
TaskID harmonytask.TaskID `db:"indexing_task_id"`
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
SectorNumber int64 `db:"sector"`
StorageID string `db:"storage_id"`
}

Expand All @@ -254,8 +255,8 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T
}

err := i.db.Select(ctx, &tasks, `
SELECT dp.indexing_task_id, dp.sp_id, dp.sector_number, l.storage_id FROM market_mk12_deal_pipeline dp
INNER JOIN sector_location l ON dp.sp_id = l.miner_id AND dp.sector_number = l.sector_num
SELECT dp.indexing_task_id, dp.sp_id, dp.sector, l.storage_id FROM market_mk12_deal_pipeline dp
INNER JOIN sector_location l ON dp.sp_id = l.miner_id AND dp.sector = l.sector_num
WHERE dp.indexing_task_id = ANY ($1) AND l.sector_filetype = 1
`, indIDs)
if err != nil {
Expand Down Expand Up @@ -298,6 +299,7 @@ func (i *IndexingTask) TypeDetails() harmonytask.TaskTypeDetails {
Cpu: 1,
Ram: uint64(i.insertBatchSize * i.insertConcurrency * 56 * 2),
},
Max: taskhelp.Max(4),
MaxFailures: 3,
IAmBored: passcall.Every(10*time.Second, func(taskFunc harmonytask.AddTaskFunc) error {
return i.schedule(context.Background(), taskFunc)
Expand Down
Loading