Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate indexer provider #7622

Merged
merged 5 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ require (
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.13.3-0.20211117072527-8713155662ff
github.com/filecoin-project/go-indexer-core v0.2.4
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.2
github.com/filecoin-project/go-state-types v0.1.1
github.com/filecoin-project/go-statemachine v1.0.1
github.com/filecoin-project/go-statestore v0.1.1
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b
github.com/filecoin-project/specs-actors v0.9.14
github.com/filecoin-project/specs-actors/v2 v2.3.5
github.com/filecoin-project/specs-actors/v3 v3.1.1
Expand Down
8 changes: 3 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI=
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
github.com/filecoin-project/go-indexer-core v0.2.2/go.mod h1:wV+NmrF8fHG6Xii3ecoZf2JW3laGTe5xtsWz609jo+Y=
github.com/filecoin-project/go-indexer-core v0.2.3 h1:kaUL2r8CuihK53lhmtCScffb7Bzs+N1yRGpwvxzCN+U=
github.com/filecoin-project/go-indexer-core v0.2.3/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc=
github.com/filecoin-project/go-indexer-core v0.2.4 h1:90vvxoBeNZN+h4W+vZ+VsoxKaDBr/bfZJJNByapGeM0=
github.com/filecoin-project/go-indexer-core v0.2.4/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc=
github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk=
github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb/go.mod h1:lKwBnslfNGG7JnsP9uQZl3yK7f74fit1MyHcwuuOP3k=
Expand Down Expand Up @@ -383,8 +382,9 @@ github.com/filecoin-project/go-statestore v0.1.1 h1:ufMFq00VqnT2CAuDpcGnwLnCX1I/
github.com/filecoin-project/go-statestore v0.1.1/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89 h1:QwKK+eB+7jKbdtQxkBcoiWF2z3LusoPIj2N5UcZsV0w=
github.com/filecoin-project/index-provider v0.0.0-20211116211010-ae6b83454d89/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44=
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b h1:qVQpqoguf9+vPONSMQZ3xYVzxzwAITyBHjM238zAr6c=
github.com/filecoin-project/index-provider v0.0.0-20211117103856-70cd9b7ab68b/go.mod h1:wu0yi7NbT3VzYr3s0n2zheg3mpdSP09A0hBFIQfUs44=
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
github.com/filecoin-project/specs-actors v0.9.12/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
Expand Down Expand Up @@ -427,7 +427,6 @@ github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/gammazero/keymutex v0.0.2 h1:cmpLBJHdEwn+WlR5Z/o9/BN92znSZTp5AKPQDpu1QcI=
github.com/gammazero/keymutex v0.0.2/go.mod h1:qtzWCCLMisQUmVa4dvqHVgwfh4BP2YB7JxNDGXnsKrs=
github.com/gammazero/radixtree v0.2.5 h1:muPQ4eEgCkUymFWPiVQRuXOQv4IhWg8YXH2r71MoqPM=
github.com/gammazero/radixtree v0.2.5/go.mod h1:VPqqCDZ3YZZxAzUUsIF/ytFBigVWV7JIV1Stld8hri0=
Expand Down Expand Up @@ -910,7 +909,6 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA=
github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c h1:izfvqCuEqk2V7BRkh7GCm7lyKC2ItyAbzUu4WgNmggc=
github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c/go.mod h1:PwE6iq8TiWJRI3zMGA1RtkFAnrDMK93dLA5SUeu0lH8=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
Expand Down
17 changes: 11 additions & 6 deletions itests/deals_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"testing"
"time"

"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/stretchr/testify/require"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-state-types/abi"
provider "github.com/filecoin-project/index-provider"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
Expand Down Expand Up @@ -46,15 +48,18 @@ func TestDealWithMarketAndMinerNode(t *testing.T) {
runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) {
api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me

client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC())
idxProv := shared_testutil.NewMockIndexProvider()
idxProvOpt := kit.ConstructorOpts(node.Override(new(provider.Interface), idxProv))
client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC(), idxProvOpt)

dh := kit.NewDealHarness(t, client, main, market)

dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
N: n,
FastRetrieval: fastRetrieval,
CarExport: carExport,
StartEpoch: startEpoch,
N: n,
FastRetrieval: fastRetrieval,
CarExport: carExport,
StartEpoch: startEpoch,
IndexerProvider: idxProv,
})
}

Expand Down
9 changes: 9 additions & 0 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -374,6 +375,7 @@ type RunConcurrentDealsOpts struct {
CarExport bool
StartEpoch abi.ChainEpoch
UseCARFileForStorageDeal bool
IndexerProvider *shared_testutil.MockIndexProvider
}

func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
Expand All @@ -400,6 +402,13 @@ func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {
UseCARFileForStorageDeal: opts.UseCARFileForStorageDeal,
})

// Check that the storage provider announced the deal to indexers
if opts.IndexerProvider != nil {
notifs := opts.IndexerProvider.GetNotifs()
_, ok := notifs[string(deal.Bytes())]
require.True(dh.t, ok)
}

dh.t.Logf("retrieving deal %d/%d", i, opts.N)

outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, opts.CarExport)
Expand Down
23 changes: 10 additions & 13 deletions markets/dagstore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

carindex "github.com/ipld/go-car/v2/index"

"github.com/filecoin-project/go-indexer-core/store/storethehash"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
levelds "github.com/ipfs/go-ds-leveldb"
Expand Down Expand Up @@ -87,11 +85,11 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto
return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo: %w", err)
}

store, err := storethehash.New(indexDir)
if err != nil {
return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err)
}
topIndex := index.NewInverted(store)
//store, err := storethehash.New(indexDir)
//if err != nil {
//return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err)
//}
//topIndex := index.NewInverted(store)

dcfg := dagstore.Config{
TransientsDir: transientsDir,
Expand All @@ -100,7 +98,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto
MountRegistry: registry,
FailureCh: failureCh,
TraceCh: traceCh,
TopLevelIndex: topIndex,
//TopLevelIndex: topIndex,
// not limiting fetches globally, as the Lotus mount does
// conditional throttling.
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
Expand Down Expand Up @@ -281,11 +279,6 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s
return nil
}

func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) {
key := shard.KeyFromCID(pieceCid)
return w.dagst.GetIterableIndex(key)
}

func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) {
log := log.Named("migrator")

Expand Down Expand Up @@ -438,6 +431,10 @@ func (w *Wrapper) GetPiecesContainingBlock(blockCID cid.Cid) ([]cid.Cid, error)
return pieceCids, nil
}

func (w *Wrapper) GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error) {
return w.dagst.GetIterableIndex(shard.KeyFromCID(pieceCid))
}

func (w *Wrapper) Close() error {
// Cancel the context
w.cancel()
Expand Down
12 changes: 12 additions & 0 deletions markets/dagstore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"testing"
"time"

mh "github.com/multiformats/go-multihash"

carindex "github.com/ipld/go-car/v2/index"

"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/node/config"
Expand Down Expand Up @@ -132,6 +136,14 @@ type mockDagStore struct {
close chan struct{}
}

func (m *mockDagStore) GetIterableIndex(key shard.Key) (carindex.IterableIndex, error) {
return nil, nil
}

func (m *mockDagStore) ShardsContainingMultihash(h mh.Multihash) ([]shard.Key, error) {
return nil, nil
}

func (m *mockDagStore) GetShardKeysForCid(c cid.Cid) ([]shard.Key, error) {
panic("implement me")
}
Expand Down
2 changes: 2 additions & 0 deletions node/builder_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
"github.com/filecoin-project/go-state-types/abi"
provider "github.com/filecoin-project/index-provider"
storage2 "github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -163,6 +164,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(HandleRetrievalKey, modules.HandleRetrieval),

// Markets (storage)
Override(new(provider.Interface), modules.IndexerProvider(cfg.IndexerProvider)),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
Expand Down
9 changes: 9 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ func DefaultStorageMiner() *StorageMiner {
},
},

IndexerProvider: IndexerProviderConfig{
ListenAddresses: []string{
"/ip4/0.0.0.0/tcp/0",
"/ip6/::/tcp/0",
},

MaxSimultaneousTransfers: DefaultSimultaneousTransfers,
},

Subsystems: MinerSubsystemConfig{
EnableMining: true,
EnableSealing: true,
Expand Down
22 changes: 22 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 22 additions & 7 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/index-provider/config"

"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
)
Expand Down Expand Up @@ -43,13 +45,14 @@ type Backup struct {
type StorageMiner struct {
Common

Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig
IndexerProvider IndexerProviderConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
}

type DAGStoreConfig struct {
Expand Down Expand Up @@ -146,6 +149,18 @@ type DealmakingConfig struct {
RetrievalPricing *RetrievalPricing
}

type IndexerProviderConfig struct {
config.Ingest

// Binding address for the libp2p host - 0 means random port.
// Format: multiaddress; see https://multiformats.io/multiaddr/
ListenAddresses []string

// The maximum number of simultaneous data transfers between the indexers
// and the indexer provider
MaxSimultaneousTransfers uint64
}

type RetrievalPricing struct {
Strategy string // possible values: "default", "external"

Expand Down
3 changes: 3 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
provider "github.com/filecoin-project/index-provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -581,6 +582,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
h host.Host, ds dtypes.MetadataDS,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
indexer provider.Interface,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter,
Expand Down Expand Up @@ -609,6 +611,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
namespace.Wrap(ds, datastore.NewKey("/deals/provider")),
store,
dsw,
indexer,
pieceStore,
dataTransfer,
spn,
Expand Down
Loading