Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve deal making efficiency & Lotus v1.1.3 update & fixes #705

Merged
merged 16 commits into from
Nov 13, 2020
2 changes: 1 addition & 1 deletion api/client/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func setupAdmin(t *testing.T, adminAuthToken string) (*admin.Admin, func()) {
defConfig.FFSAdminToken = adminAuthToken
}
serverDone := setupServer(t, defConfig)
conn, done := setupConnection(t)
conn, done := setupConnection(t, defConfig.GrpcHostAddress)
return admin.NewAdmin(adminPb.NewAdminServiceClient(conn)), func() {
done()
serverDone()
Expand Down
28 changes: 18 additions & 10 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package client

import (
"fmt"
"io/ioutil"
"math/big"
"testing"
"time"

"github.com/multiformats/go-multiaddr"
"github.com/phayes/freeport"
"github.com/stretchr/testify/require"
"github.com/textileio/powergate/api/server"
"github.com/textileio/powergate/tests"
"github.com/textileio/powergate/util"
"google.golang.org/grpc"
)

var (
grpcHostNetwork = "tcp"
grpcHostAddress = "/ip4/127.0.0.1/tcp/5002"
grpcWebProxyAddress = "127.0.0.1:6002"
gatewayHostAddr = "0.0.0.0:7000"
)
var ()

func defaultServerConfig(t *testing.T) server.Config {
grpcHostNetwork := "tcp"
grpcHostAddress := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", freePort(t))
grpcWebProxyAddress := fmt.Sprintf("127.0.0.1:%d", freePort(t))
gatewayHostAddr := fmt.Sprintf("0.0.0.0:%d", freePort(t))
indexRawJSONHostAddr := fmt.Sprintf("0.0.0.0:%d", freePort(t))

repoPath, err := ioutil.TempDir("/tmp/powergate", ".powergate-*")
require.NoError(t, err)

Expand All @@ -48,6 +51,7 @@ func defaultServerConfig(t *testing.T) server.Config {
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
Expand All @@ -71,11 +75,15 @@ func setupServer(t *testing.T, conf server.Config) func() {
}
}

func setupConnection(t *testing.T) (*grpc.ClientConn, func()) {
auth := TokenAuth{}
ma, err := multiaddr.NewMultiaddr(grpcHostAddress)
func freePort(t *testing.T) int {
fp, err := freeport.GetFreePort()
require.NoError(t, err)
addr, err := util.TCPAddrFromMultiAddr(ma)
return fp
}

func setupConnection(t *testing.T, grpcHostAddress multiaddr.Multiaddr) (*grpc.ClientConn, func()) {
auth := TokenAuth{}
addr, err := util.TCPAddrFromMultiAddr(grpcHostAddress)
require.NoError(t, err)
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithPerRPCCredentials(auth))
require.NoError(t, err)
Expand Down
13 changes: 7 additions & 6 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ type Config struct {
GrpcServerOpts []grpc.ServerOption
GrpcWebProxyAddress string

GatewayBasePath string
GatewayHostAddr string
GatewayBasePath string
GatewayHostAddr string
IndexRawJSONHostAddr string

MongoURI string
MongoDB string
Expand Down Expand Up @@ -309,7 +310,7 @@ func NewServer(conf Config) (*Server, error) {
return nil, fmt.Errorf("starting GRPC services: %s", err)
}

s.indexServer = startIndexHTTPServer(s)
s.indexServer = startIndexHTTPServer(s, conf.IndexRawJSONHostAddr)

log.Info("Starting finished, serving requests")

Expand Down Expand Up @@ -412,13 +413,13 @@ func startGRPCServices(server *grpc.Server, webProxy *http.Server, s *Server, ho

go func() {
if err := webProxy.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("error starting proxy: %v", err)
log.Errorf("starting proxy: %v", err)
}
}()
return nil
}

func startIndexHTTPServer(s *Server) *http.Server {
func startIndexHTTPServer(s *Server, addr string) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/index/ask", func(w http.ResponseWriter, r *http.Request) {
index := s.ai.Get()
Expand Down Expand Up @@ -454,7 +455,7 @@ func startIndexHTTPServer(s *Server) *http.Server {
}
})

srv := &http.Server{Addr: ":8889", Handler: mux}
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("serving index http: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
const (
hcApply = "apply"
// For completeness:
// hcRevert = "revert"
// hcCurrent = "current"
// hcRevert = "revert".
// hcCurrent = "current".
)

// ChainSync provides methods to resolve chain syncing situations.
Expand Down
7 changes: 5 additions & 2 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func configFromFlags() (server.Config, error) {
grpcWebProxyAddr := config.GetString("grpcwebproxyaddr")
gatewayHostAddr := config.GetString("gatewayhostaddr")
gatewayBasePath := config.GetString("gatewaybasepath")
indexRawJSONHostAddr := config.GetString("indexrawjsonhostaddr")
maxminddbfolder := config.GetString("maxminddbfolder")
mongoURI := config.GetString("mongouri")
mongoDB := config.GetString("mongodb")
Expand Down Expand Up @@ -154,8 +155,9 @@ func configFromFlags() (server.Config, error) {
GrpcHostAddress: grpcHostMaddr,
GrpcWebProxyAddress: grpcWebProxyAddr,

GatewayHostAddr: gatewayHostAddr,
GatewayBasePath: gatewayBasePath,
GatewayHostAddr: gatewayHostAddr,
GatewayBasePath: gatewayBasePath,
IndexRawJSONHostAddr: indexRawJSONHostAddr,

MongoURI: mongoURI,
MongoDB: mongoDB,
Expand Down Expand Up @@ -334,6 +336,7 @@ func setupFlags() error {

pflag.String("grpchostaddr", "/ip4/0.0.0.0/tcp/5002", "gRPC host listening address.")
pflag.String("grpcwebproxyaddr", "0.0.0.0:6002", "gRPC webproxy listening address.")
pflag.String("indexrawjsonhostaddr", "0.0.0.0:8889", "Indexes raw json output listening address")

pflag.String("lotushost", "/ip4/127.0.0.1/tcp/1234", "Lotus client API endpoint multiaddress.")
pflag.String("lotustoken", "", "Lotus API authorization token. This flag or --lotustoken file are mandatory.")
Expand Down
17 changes: 9 additions & 8 deletions deals/module/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *Module) Import(ctx context.Context, data io.Reader, isCAR bool) (cid.Ci
// is automatically calculated considering each miner epoch price and piece size.
// The data of dataCid should be already imported to the Filecoin Client or should be
// accessible to it. (e.g: is integrated with an IPFS node).
func (m *Module) Store(ctx context.Context, waddr string, dataCid cid.Cid, pieceSize uint64, dcfgs []deals.StorageDealConfig, minDuration uint64) ([]deals.StoreResult, error) {
func (m *Module) Store(ctx context.Context, waddr string, dataCid cid.Cid, pieceSize abi.PaddedPieceSize, pieceCid cid.Cid, dcfgs []deals.StorageDealConfig, minDuration uint64) ([]deals.StoreResult, error) {
if minDuration < util.MinDealDuration {
return nil, fmt.Errorf("duration %d should be greater or equal to %d", minDuration, util.MinDealDuration)
}
Expand Down Expand Up @@ -146,9 +146,11 @@ func (m *Module) Store(ctx context.Context, waddr string, dataCid cid.Cid, piece
Data: &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync,
Root: dataCid,
PieceCid: &pieceCid,
PieceSize: pieceSize.Unpadded(),
Comment on lines +149 to +150
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delegate to the client to provide pieceSize and pieceCid. This allows Lotus to avoid calculating these values, and just trusting they're correct. This is important for clients such as FFS, which create multiple deals with the same data.

},
MinBlocksDuration: minDuration,
EpochPrice: big.Div(big.Mul(big.NewIntUnsigned(c.EpochPrice), big.NewIntUnsigned(pieceSize)), abi.NewTokenAmount(1<<30)),
EpochPrice: big.Div(big.Mul(big.NewIntUnsigned(c.EpochPrice), big.NewIntUnsigned(uint64(pieceSize))), abi.NewTokenAmount(1<<30)),
Miner: maddr,
Wallet: addr,
FastRetrieval: c.FastRetrieval,
Expand All @@ -173,18 +175,17 @@ func (m *Module) Store(ctx context.Context, waddr string, dataCid cid.Cid, piece
return res, nil
}

// CalculatePieceSize calculates the data and piece size of a Cid accesible
// by the underlying Lotus node.
func (m *Module) CalculatePieceSize(ctx context.Context, c cid.Cid) (api.DataSize, error) {
// CalculateDealPiece calculates the size and CommP for a data cid.
func (m *Module) CalculateDealPiece(ctx context.Context, c cid.Cid) (api.DataCIDSize, error) {
lapi, cls, err := m.clientBuilder(ctx)
if err != nil {
return api.DataSize{}, fmt.Errorf("creating lotus client: %s", err)
return api.DataCIDSize{}, fmt.Errorf("creating lotus client: %s", err)
}
defer cls()

dsz, err := lapi.ClientDealSize(ctx, c)
dsz, err := lapi.ClientDealPieceCID(ctx, c)
Comment on lines -185 to +186
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a new API that calculates both size and PieceCID.

if err != nil {
return api.DataSize{}, fmt.Errorf("calculating data size: %s", err)
return api.DataCIDSize{}, fmt.Errorf("calculating data size: %s", err)
}
return dsz, nil
}
Expand Down
10 changes: 7 additions & 3 deletions deals/module/deals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,18 @@ func storeMultiMiner(m *Module, client *apistruct.FullNodeStruct, numMiners int,
EpochPrice: 500000000,
}
}
dataCid, size, err := m.Import(ctx, bytes.NewReader(data), false)
dataCid, _, err := m.Import(ctx, bytes.NewReader(data), false)
if err != nil {
return cid.Undef, nil, err
}
if !dataCid.Defined() {
return cid.Undef, nil, fmt.Errorf("data cid is undefined")
}
srs, err := m.Store(ctx, addr.String(), dataCid, 2*uint64(size), cfgs, util.MinDealDuration)
piece, err := m.CalculateDealPiece(ctx, dataCid)
if err != nil {
return cid.Undef, nil, err
}
srs, err := m.Store(ctx, addr.String(), dataCid, piece.PieceSize, piece.PieceCID, cfgs, util.MinDealDuration)
if err != nil {
return cid.Undef, nil, fmt.Errorf("calling Store(): %s", err)
}
Expand Down Expand Up @@ -199,8 +203,8 @@ func waitForDealComplete(client *apistruct.FullNodeStruct, deals []cid.Cid) erro
storagemarket.StorageDealStaged,
storagemarket.StorageDealValidating,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealFundsEnsured,
storagemarket.StorageDealCheckForAcceptance,
storagemarket.StorageDealReserveClientFunds,
storagemarket.StorageDealClientFunding,
storagemarket.StorageDealPublish,
storagemarket.StorageDealPublishing,
Expand Down
4 changes: 2 additions & 2 deletions docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ localnet:


up: down
LOTUS_IMAGE_TAG=v1.1.2 \
LOTUS_IMAGE_TAG=v1.1.3 \
docker-compose \
-p mainnet \
-f docker-compose.yaml \
Expand All @@ -26,7 +26,7 @@ up: down
.PHONY: up

down:
LOTUS_IMAGE_TAG=v1.1.2 \
LOTUS_IMAGE_TAG=v1.1.3 \
docker-compose \
-p mainnet \
-f docker-compose.yaml \
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-localnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
- 5001:5001

lotus:
image: textile/lotus-devnet:sha-d57ac07
image: textile/lotus-devnet:v1.1.3
ports:
- 7777:7777
environment:
Expand Down
Loading