Skip to content

Commit

Permalink
Merge pull request #807 from textileio/jsign/stagecid
Browse files Browse the repository at this point in the history
Eagerly fetch data in hot-storage
  • Loading branch information
jsign authored Mar 22, 2021
2 parents 1726c2a + 90a20cf commit 9c642ba
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 96 deletions.
51 changes: 26 additions & 25 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,32 @@ func defaultServerConfig(t *testing.T) server.Config {

grpcMaddr := util.MustParseAddr(grpcHostAddress)
conf := server.Config{
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
}
return conf
}
Expand Down
29 changes: 15 additions & 14 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,20 @@ type Config struct {
MongoURI string
MongoDB string

FFSAdminToken string
FFSUseMasterAddr bool
FFSDealFinalityTimeout time.Duration
FFSMinimumPieceSize uint64
FFSMaxParallelDealPreparing int
FFSGCAutomaticGCInterval time.Duration
FFSGCStageGracePeriod time.Duration
SchedMaxParallel int
MinerSelector string
MinerSelectorParams string
DealWatchPollDuration time.Duration
AutocreateMasterAddr bool
WalletInitialFunds big.Int
FFSAdminToken string
FFSUseMasterAddr bool
FFSDealFinalityTimeout time.Duration
FFSMinimumPieceSize uint64
FFSRetrievalNextEventTimeout time.Duration
FFSMaxParallelDealPreparing int
FFSGCAutomaticGCInterval time.Duration
FFSGCStageGracePeriod time.Duration
SchedMaxParallel int
MinerSelector string
MinerSelectorParams string
DealWatchPollDuration time.Duration
AutocreateMasterAddr bool
WalletInitialFunds big.Int

AskIndexQueryAskTimeout time.Duration
AskindexMaxParallel int
Expand Down Expand Up @@ -261,7 +262,7 @@ func NewServer(conf Config) (*Server, error) {
if conf.Devnet {
conf.FFSMinimumPieceSize = 0
}
cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing)
cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing, conf.FFSRetrievalNextEventTimeout)
hs, err := coreipfs.New(txndstr.Wrap(ds, "ffs/coreipfs"), ipfs, l)
if err != nil {
return nil, fmt.Errorf("creating coreipfs: %s", err)
Expand Down
27 changes: 15 additions & 12 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func configFromFlags() (server.Config, error) {
ffsSchedMaxParallel := config.GetInt("ffsschedmaxparallel")
ffsDealWatchFinalityTimeout := time.Minute * time.Duration(config.GetInt("ffsdealfinalitytimeout"))
ffsMinimumPieceSize := config.GetUint64("ffsminimumpiecesize")
ffsRetrievalNextEventTimeout := config.GetDuration("ffsretrievalnexteventtimeout")
ffsMaxParallelDealPreparing := config.GetInt("ffsmaxparalleldealpreparing")
ffsGCInterval := time.Minute * time.Duration(config.GetInt("ffsgcinterval"))
ffsGCStagedGracePeriod := time.Minute * time.Duration(config.GetInt("ffsgcstagedgraceperiod"))
Expand Down Expand Up @@ -166,18 +167,19 @@ func configFromFlags() (server.Config, error) {
MongoURI: mongoURI,
MongoDB: mongoDB,

FFSAdminToken: ffsAdminToken,
FFSUseMasterAddr: ffsUseMasterAddr,
FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout,
FFSMinimumPieceSize: ffsMinimumPieceSize,
FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing,
FFSGCAutomaticGCInterval: ffsGCInterval,
FFSGCStageGracePeriod: ffsGCStagedGracePeriod,
AutocreateMasterAddr: autocreateMasterAddr,
MinerSelector: minerSelector,
MinerSelectorParams: minerSelectorParams,
SchedMaxParallel: ffsSchedMaxParallel,
DealWatchPollDuration: dealWatchPollDuration,
FFSAdminToken: ffsAdminToken,
FFSUseMasterAddr: ffsUseMasterAddr,
FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout,
FFSMinimumPieceSize: ffsMinimumPieceSize,
FFSRetrievalNextEventTimeout: ffsRetrievalNextEventTimeout,
FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing,
FFSGCAutomaticGCInterval: ffsGCInterval,
FFSGCStageGracePeriod: ffsGCStagedGracePeriod,
AutocreateMasterAddr: autocreateMasterAddr,
MinerSelector: minerSelector,
MinerSelectorParams: minerSelectorParams,
SchedMaxParallel: ffsSchedMaxParallel,
DealWatchPollDuration: dealWatchPollDuration,

AskIndexQueryAskTimeout: askIndexQueryAskTimeout,
AskIndexRefreshInterval: askIndexRefreshInterval,
Expand Down Expand Up @@ -387,6 +389,7 @@ func setupFlags() error {
pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'.")
pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector.")
pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin.")
pflag.Duration("ffsretrievalnexteventtimeout", time.Hour, "Maximum amount of time to wait for the next retrieval event before erroring it.")
pflag.String("ffsschedmaxparallel", "1000", "Maximum amount of Jobs executed in parallel.")
pflag.String("ffsdealfinalitytimeout", "4320", "Deadline in minutes in which a deal must prove liveness changing status before considered abandoned.")
pflag.String("ffsmaxparalleldealpreparing", "2", "Max parallel deal preparing tasks.")
Expand Down
2 changes: 1 addition & 1 deletion deals/module/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (m *Module) recordRetrieval(addr string, offer api.QueryOffer, bytesReceive
RootCid: offer.Root,
Size: offer.Size,
MinPrice: offer.MinPrice.Uint64(),
Miner: offer.Miner.String(),
Miner: offer.MinerPeer.Address.String(),
MinerPeerID: offer.MinerPeer.ID.String(),
PaymentInterval: offer.PaymentInterval,
PaymentIntervalIncrease: offer.PaymentIntervalIncrease,
Expand Down
8 changes: 4 additions & 4 deletions deals/module/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l
break Loop
}
if e.Err != "" {
log.Infof("in progress retrieval errored: %s", err)
log.Infof("in progress retrieval errored: %s", e.Err)
errMsg = e.Err
}
if dtStart.IsZero() && e.Event == retrievalmarket.ClientEventBlocksReceived {
Expand All @@ -146,23 +146,23 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l
// payment channel creation. This isn't ideal, but
// it's better than missing the data.
// We WARN just to signal this might be happening.
if dtStart.IsZero() {
if dtStart.IsZero() && errMsg == "" {
dtStart = retrievalStartTime
log.Warnf("retrieval data-transfer start fallback to retrieval start")
}
// This is a fallback to not receiving an expected
// event in the retrieval. We just fallback to Now(),
// which should always be pretty close to the real
// event. We WARN just to signal this is happening.
if dtEnd.IsZero() {
if dtEnd.IsZero() && errMsg == "" {
dtEnd = time.Now()
log.Warnf("retrieval data-transfer end fallback to retrieval end")
}
m.recordRetrieval(waddr, o, bytesReceived, dtStart, dtEnd, errMsg)
}
}()

return o.Miner.String(), out, nil
return o.MinerPeer.Address.String(), out, nil
}

func getRetrievalOffers(ctx context.Context, lapi *apistruct.FullNodeStruct, payloadCid cid.Cid, pieceCid *cid.Cid, miners []string) []api.QueryOffer {
Expand Down
12 changes: 7 additions & 5 deletions ffs/coreipfs/coreipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ func New(ds datastore.TxnDatastore, ipfs iface.CoreAPI, l ffs.JobLogger) (*CoreI

// Stage adds the data of io.Reader in the storage, and creates a stage-pin on the resulting cid.
func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid.Cid, error) {
ci.lock.Lock()
defer ci.lock.Unlock()

p, err := ci.ipfs.Unixfs().Add(ctx, ipfsfiles.NewReaderFile(r), options.Unixfs.Pin(true))
if err != nil {
return cid.Undef, fmt.Errorf("adding data to ipfs: %s", err)
}
ci.lock.Lock()
defer ci.lock.Unlock()

if err := ci.ps.AddStaged(iid, p.Cid()); err != nil {
return cid.Undef, fmt.Errorf("saving new pin in pinstore: %s", err)
Expand All @@ -72,8 +71,11 @@ func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid.
return p.Cid(), nil
}

// StageCid stage-pin a Cid.
// StageCid pull the Cid data and stage-pin it.
func (ci *CoreIpfs) StageCid(ctx context.Context, iid ffs.APIID, c cid.Cid) error {
if err := ci.ipfs.Pin().Add(ctx, path.IpfsPath(c), options.Pin.Recursive(true)); err != nil {
return fmt.Errorf("adding data to ipfs: %s", err)
}
ci.lock.Lock()
defer ci.lock.Unlock()

Expand Down Expand Up @@ -263,7 +265,7 @@ Loop:

// Skip Cids that are excluded.
if _, ok := excludeMap[stagedPin.Cid]; ok {
log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin)
log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin.Cid)
continue Loop
}
// A Cid is only safe to GC if all existing stage-pin are older than
Expand Down
85 changes: 53 additions & 32 deletions ffs/filcold/filcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
iface "github.com/ipfs/interface-go-ipfs-core"
Expand All @@ -35,15 +36,16 @@ var (
// FilCold is a ColdStorage implementation which saves data in the Filecoin network.
// It assumes the underlying Filecoin client has access to an IPFS node where data is stored.
type FilCold struct {
ms ffs.MinerSelector
dm *dealsModule.Module
wm wallet.Module
ipfs iface.CoreAPI
chain FilChain
l ffs.JobLogger
lsm *lotus.SyncMonitor
minPieceSize uint64
semaphDealPrep chan struct{}
ms ffs.MinerSelector
dm *dealsModule.Module
wm wallet.Module
ipfs iface.CoreAPI
chain FilChain
l ffs.JobLogger
lsm *lotus.SyncMonitor
minPieceSize uint64
retrNextEventTimeout time.Duration
semaphDealPrep chan struct{}
}

var _ ffs.ColdStorage = (*FilCold)(nil)
Expand All @@ -54,17 +56,18 @@ type FilChain interface {
}

// New returns a new FilCold instance.
func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int) *FilCold {
func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int, retrievalNextEventTimeout time.Duration) *FilCold {
return &FilCold{
ms: ms,
dm: dm,
wm: wm,
ipfs: ipfs,
chain: chain,
l: l,
lsm: lsm,
minPieceSize: minPieceSize,
semaphDealPrep: make(chan struct{}, maxParallelDealPreparing),
ms: ms,
dm: dm,
wm: wm,
ipfs: ipfs,
chain: chain,
l: l,
lsm: lsm,
minPieceSize: minPieceSize,
retrNextEventTimeout: retrievalNextEventTimeout,
semaphDealPrep: make(chan struct{}, maxParallelDealPreparing),
}
}

Expand All @@ -76,21 +79,39 @@ func (fc *FilCold) Fetch(ctx context.Context, pyCid cid.Cid, piCid *cid.Cid, wad
return ffs.FetchInfo{}, fmt.Errorf("fetching from deal module: %s", err)
}
fc.l.Log(ctx, "Fetching from %s...", miner)
var fundsSpent uint64
var lastMsg string
for e := range events {
if e.Err != "" {
return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err)
}
strEvent := retrievalmarket.ClientEvents[e.Event]
strDealStatus := retrievalmarket.DealStatuses[e.Status]
fundsSpent = e.FundsSpent.Uint64()
newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus)
if newMsg != lastMsg {
fc.l.Log(ctx, newMsg)
lastMsg = newMsg

var (
fundsSpent uint64
lastMsg string
lastEvent marketevents.RetrievalEvent
)
Loop:
for {
select {
case <-time.After(fc.retrNextEventTimeout):
return ffs.FetchInfo{}, fmt.Errorf("didn't receive events for %d minutes", int64(fc.retrNextEventTimeout.Minutes()))
case e, ok := <-events:
if !ok {
break Loop
}
if e.Err != "" {
return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err)
}
strEvent := retrievalmarket.ClientEvents[e.Event]
strDealStatus := retrievalmarket.DealStatuses[e.Status]
fundsSpent = e.FundsSpent.Uint64()
newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus)
if newMsg != lastMsg {
fc.l.Log(ctx, newMsg)
lastMsg = newMsg
}
lastEvent = e
}
}
if lastEvent.Status != retrievalmarket.DealStatusCompleted {
return ffs.FetchInfo{}, fmt.Errorf("retrieval failed with status %s and message %s", retrievalmarket.DealStatuses[lastEvent.Status], lastMsg)
}

return ffs.FetchInfo{RetrievedMiner: miner, FundsSpent: fundsSpent}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion ffs/integrationtest/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewCustomFFSManager(t require.TestingT, ds datastore.TxnDatastore, cb lotus
l := joblogger.New(txndstr.Wrap(ds, "ffs/joblogger"))
lsm, err := lotus.NewSyncMonitor(cb)
require.NoError(t, err)
cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1)
cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1, time.Hour)
hl, err := coreipfs.New(ds, ipfsClient, l)
require.NoError(t, err)
sched, err := scheduler.New(txndstr.Wrap(ds, "ffs/scheduler"), l, hl, cl, 10, time.Minute*10, nil, scheduler.GCConfig{AutoGCInterval: 0})
Expand Down
2 changes: 1 addition & 1 deletion ffs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type HotStorage interface {
// Stage adds io.Reader and stage-pins it.
Stage(context.Context, APIID, io.Reader) (cid.Cid, error)

// StageCid stage-pins a cid.
// StageCid pulls Cid data and stage-pin it.
StageCid(context.Context, APIID, cid.Cid) error

// Unpin unpins a Cid.
Expand Down
2 changes: 1 addition & 1 deletion ffs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
Hot: ffs.HotConfig{
Enabled: false,
Ipfs: ffs.IpfsConfig{
AddTimeout: 480, // 8 min
AddTimeout: 15 * 60, // 15min
},
},
Cold: ffs.ColdConfig{
Expand Down
2 changes: 2 additions & 0 deletions ffs/scheduler/internal/sjstore/sjstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (s *Store) Enqueue(j ffs.StorageJob) error {
// GetExecutingJob returns a JobID that is currently executing for
// data with cid c in iid. If there's not such job, it returns nil.
func (s *Store) GetExecutingJob(iid ffs.APIID, c cid.Cid) *ffs.JobID {
s.lock.Lock()
defer s.lock.Unlock()
j, ok := s.executingJobs[iid][c]
if !ok {
return nil
Expand Down
Loading

0 comments on commit 9c642ba

Please sign in to comment.