Skip to content

Commit

Permalink
switch deal-making to dagstore + carv2.
Browse files Browse the repository at this point in the history
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.

Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.

When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.

Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.

Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.

Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.

Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.

A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.

Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.

NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:

- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport

At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.

Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
  • Loading branch information
3 people committed Aug 16, 2021
1 parent 2e5b492 commit e3f4136
Show file tree
Hide file tree
Showing 82 changed files with 4,721 additions and 1,020 deletions.
31 changes: 22 additions & 9 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"

apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
Expand All @@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
)

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_full.go -package=mocks . FullNode
Expand Down Expand Up @@ -336,7 +337,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore.
ClientImport(ctx context.Context, ref FileRef) (*ImportRes, error) //perm:admin
// ClientRemoveImport removes file import
ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error //perm:admin
ClientRemoveImport(ctx context.Context, importID imports.ID) error //perm:admin
// ClientStartDeal proposes a deal with a miner.
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error) //perm:admin
// ClientStatelessDeal fire-and-forget-proposes an offline deal to a miner without subsequent tracking.
Expand Down Expand Up @@ -728,16 +729,28 @@ type MinerSectors struct {

type ImportRes struct {
Root cid.Cid
ImportID multistore.StoreID
ImportID imports.ID
}

type Import struct {
Key multistore.StoreID
Key imports.ID
Err string

Root *cid.Cid
Source string
Root *cid.Cid

// Source is the provenance of the import, e.g. "import", "unknown", else.
// Currently useless but may be used in the future.
Source string

// FilePath is the path of the original file. It is important that the file
// is retained at this path, because it will be referenced during
// the transfer (when we do the UnixFS chunking, we don't duplicate the
// leaves, but rather point to chunks of the original data through
// positional references).
FilePath string

// CARPath is the path of the CAR file containing the DAG for this import.
CARPath string
}

type DealInfo struct {
Expand Down Expand Up @@ -920,7 +933,7 @@ type RetrievalOrder struct {
Piece *cid.Cid
Size uint64

LocalStore *multistore.StoreID // if specified, get data from local store
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt
Expand Down
80 changes: 77 additions & 3 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
Expand Down Expand Up @@ -166,6 +167,48 @@ type StorageMiner interface {
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error) //perm:write
MarketPublishPendingDeals(ctx context.Context) error //perm:admin

// DagstoreListShards returns information about all shards known to the
// DAG store. Only available on nodes running the markets subsystem.
DagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read

// DagstoreInitializeShard initializes an uninitialized shard.
//
// Initialization consists of fetching the shard's data (deal payload) from
// the storage subsystem, generating an index, and persisting the index
// to facilitate later retrievals, and/or to publish to external sources.
//
// This operation is intended to complement the initial migration. The
// migration registers a shard for every unique piece CID, with lazy
// initialization. Thus, shards are not initialized immediately to avoid
// IO activity competing with proving. Instead, shard are initialized
// when first accessed. This method forces the initialization of a shard by
// accessing it and immediately releasing it. This is useful to warm up the
// cache to facilitate subsequent retrievals, and to generate the indexes
// to publish them externally.
//
// This operation fails if the shard is not in ShardStateNew state.
// It blocks until initialization finishes.
DagstoreInitializeShard(ctx context.Context, key string) error //perm:write

// DagstoreRecoverShard attempts to recover a failed shard.
//
// This operation fails if the shard is not in ShardStateErrored state.
// It blocks until recovery finishes. If recovery failed, it returns the
// error.
DagstoreRecoverShard(ctx context.Context, key string) error //perm:write

// DagstoreInitializeAll initializes all uninitialized shards in bulk,
// according to the policy passed in the parameters.
//
// It is recommended to set a maximum concurrency to avoid extreme
// IO pressure if the storage subsystem has a large amount of deals.
//
// It returns a stream of events to report progress.
DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:write

// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin

// RuntimeSubsystems returns the subsystems that are enabled
// in this instance.
RuntimeSubsystems(ctx context.Context) (MinerSubsystems, error) //perm:read
Expand Down Expand Up @@ -336,3 +379,34 @@ type DealSchedule struct {
StartEpoch abi.ChainEpoch
EndEpoch abi.ChainEpoch
}

// DagstoreShardInfo is the serialized form of dagstore.DagstoreShardInfo that
// we expose through JSON-RPC to avoid clients having to depend on the
// dagstore lib.
type DagstoreShardInfo struct {
Key string
State string
Error string
}

// DagstoreShardResult enumerates results per shard.
type DagstoreShardResult struct {
Key string
Success bool
Error string
}

type DagstoreInitializeAllParams struct {
MaxConcurrency int
IncludeSealed bool
}

// DagstoreInitializeAllEvent represents an initialization event.
type DagstoreInitializeAllEvent struct {
Key string
Event string // "start", "end"
Success bool
Error string
Total int
Current int
}
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestReturnTypes(t *testing.T) {
seen[typ] = struct{}{}

if typ.Kind() == reflect.Interface && typ != bareIface && !typ.Implements(jmarsh) {
t.Error("methods can't return interfaces", m.Name)
t.Error("methods can't return interfaces or struct types not implementing json.Marshaller", m.Name)
}

switch typ.Kind() {
Expand Down
2 changes: 2 additions & 0 deletions api/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-multistore"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
Expand All @@ -27,7 +28,6 @@ import (
filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
Expand All @@ -43,6 +43,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
)

var ExampleValues = map[reflect.Type]interface{}{
Expand Down Expand Up @@ -90,6 +91,7 @@ func init() {
addExample(&pid)

multistoreIDExample := multistore.StoreID(50)
storeIDExample := imports.ID(50)

addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
Expand Down Expand Up @@ -120,6 +122,8 @@ func init() {
addExample(time.Minute)
addExample(datatransfer.TransferID(3))
addExample(datatransfer.Ongoing)
addExample(storeIDExample)
addExample(&storeIDExample)
addExample(multistoreIDExample)
addExample(&multistoreIDExample)
addExample(retrievalmarket.ClientEventDealAccepted)
Expand Down Expand Up @@ -176,7 +180,7 @@ func init() {

// miner specific
addExample(filestore2.Path(".lotusminer/fstmp123"))
si := multistore.StoreID(12)
si := uint64(12)
addExample(&si)
addExample(retrievalmarket.DealID(5))
addExample(abi.ActorID(1000))
Expand Down Expand Up @@ -271,6 +275,15 @@ func init() {
api.SubsystemSectorStorage,
api.SubsystemMarkets,
})
addExample(api.DagstoreShardResult{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
Error: "<error>",
})
addExample(api.DagstoreShardInfo{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
State: "ShardStateAvailable",
Error: "<error>",
})
}

func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {
Expand Down
4 changes: 2 additions & 2 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e3f4136

Please sign in to comment.