Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SR2 miner selector & safer default StorageConfig config #614

Merged
merged 15 commits into from
Sep 26, 2020
1 change: 1 addition & 0 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func setupServer(t *testing.T) func() {
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
}
server, err := server.NewServer(conf)
checkErr(t, err)
Expand Down
28 changes: 27 additions & 1 deletion api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/textileio/powergate/ffs/joblogger"
"github.com/textileio/powergate/ffs/manager"
"github.com/textileio/powergate/ffs/minerselector/reptop"
"github.com/textileio/powergate/ffs/minerselector/sr2"
ffsRpc "github.com/textileio/powergate/ffs/rpc"
"github.com/textileio/powergate/ffs/scheduler"
"github.com/textileio/powergate/filchain"
Expand Down Expand Up @@ -117,6 +118,8 @@ type Config struct {
MaxMindDBFolder string
MongoURI string
MongoDB string
MinerSelector string
MinerSelectorParams string
}

// NewServer starts and returns a new server with the given configuration.
Expand Down Expand Up @@ -212,7 +215,11 @@ func NewServer(conf Config) (*Server, error) {
}

chain := filchain.New(clientBuilder)
ms := reptop.New(rm, ai)

ms, err := getMinerSelector(conf, rm, ai, clientBuilder)
if err != nil {
return nil, fmt.Errorf("creating miner selector: %s", err)
}

l := joblogger.New(txndstr.Wrap(ds, "ffs/joblogger"))
cs := filcold.New(ms, dm, ipfs, chain, l)
Expand Down Expand Up @@ -531,3 +538,22 @@ func createDatastore(conf Config) (datastore.TxnDatastore, error) {
}
return ds, nil
}

func getMinerSelector(conf Config, rm *reputation.Module, ai *ask.Runner, cb lotus.ClientBuilder) (ffs.MinerSelector, error) {
var ms ffs.MinerSelector
var err error

switch conf.MinerSelector {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MinerSelector configuration depending on flags/envs.

case "reputation":
ms = reptop.New(rm, ai)
case "sr2":
ms, err = sr2.New(conf.MinerSelectorParams, ai, cb)
if err != nil {
return nil, fmt.Errorf("creating sr2 miner selector: %s", err)
}
default:
return nil, fmt.Errorf("unknown miner selector: %s", conf.MinerSelector)
}

return ms, nil
}
9 changes: 9 additions & 0 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func configFromFlags() (server.Config, error) {
maxminddbfolder := config.GetString("maxminddbfolder")
mongoURI := config.GetString("mongouri")
mongoDB := config.GetString("mongodb")
minerSelector := config.GetString("ffsminerselector")
minerSelectorParams := config.GetString("ffsminerselectorparams")

return server.Config{
WalletInitialFunds: walletInitialFunds,
Expand All @@ -141,6 +143,8 @@ func configFromFlags() (server.Config, error) {
MaxMindDBFolder: maxminddbfolder,
MongoURI: mongoURI,
MongoDB: mongoDB,
MinerSelector: minerSelector,
MinerSelectorParams: minerSelectorParams,
}, nil
}

Expand Down Expand Up @@ -211,6 +215,9 @@ func setupLogging(repoPath string) error {
// Wallet Module
"wallet",

// Miner Selectors
"sr2-miner-selector",

// FFS
"ffs-scheduler",
"ffs-manager",
Expand Down Expand Up @@ -301,6 +308,8 @@ func setupFlags() error {
pflag.String("maxminddbfolder", ".", "Path of the folder containing GeoLite2-City.mmdb")
pflag.String("mongouri", "", "Mongo URI to connect to MongoDB database. (Optional: if empty, will use Badger)")
pflag.String("mongodb", "", "Mongo database name. (if --mongouri is used, is mandatory")
pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'")
pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector")
pflag.Parse()

config.SetEnvPrefix("POWD")
Expand Down
4 changes: 2 additions & 2 deletions docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ localnet:


up: down
LOTUS_IMAGE_TAG=v0.7.0 \
LOTUS_IMAGE_TAG=v0.8.0 \
docker-compose \
-p testnet \
-f docker-compose.yaml \
Expand All @@ -26,7 +26,7 @@ up: down
.PHONY: up

down:
LOTUS_IMAGE_TAG=v0.7.0 \
LOTUS_IMAGE_TAG=v0.8.0 \
docker-compose \
-p testnet \
-f docker-compose.yaml \
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-localnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
- 5001:5001

lotus:
image: textile/lotus-devnet:sha-ce1d788
image: textile/lotus-devnet:sha-35afaa3
ports:
- 7777:7777
environment:
Expand Down
2 changes: 1 addition & 1 deletion ffs/filcold/filcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (fc *FilCold) renewDeal(ctx context.Context, c cid.Cid, size uint64, p ffs.
// that were started successfully, and a slice of DealError with deals that failed to be started.
func (fc *FilCold) makeDeals(ctx context.Context, c cid.Cid, size uint64, cfgs []deals.StorageDealConfig, fcfg ffs.FilConfig) ([]cid.Cid, []ffs.DealError, error) {
for _, cfg := range cfgs {
fc.l.Log(ctx, "Proposing deal to miner %s with %d fil per epoch...", cfg.Miner, cfg.EpochPrice)
fc.l.Log(ctx, "Proposing deal to miner %s with %d attoFIL per epoch...", cfg.Miner, cfg.EpochPrice)
}

sres, err := fc.dm.Store(ctx, fcfg.Addr, c, size, cfgs, uint64(fcfg.DealMinDuration))
Expand Down
3 changes: 3 additions & 0 deletions ffs/integrationtest/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/textileio/powergate/ffs"
"github.com/textileio/powergate/ffs/api"
it "github.com/textileio/powergate/ffs/integrationtest"
"github.com/textileio/powergate/ffs/scheduler"
"github.com/textileio/powergate/tests"
"github.com/textileio/powergate/util"
)
Expand Down Expand Up @@ -231,6 +232,8 @@ func TestFilecoinEnableConfig(t *testing.T) {
}

func TestHotTimeoutConfig(t *testing.T) {
scheduler.HardcodedHotTimeout = time.Second * 10
t.SkipNow()
t.Parallel()
_, _, fapi, cls := it.NewAPI(t, 1)
defer cls()
Expand Down
2 changes: 2 additions & 0 deletions ffs/integrationtest/unfreeze/unfreeze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/textileio/powergate/ffs"
"github.com/textileio/powergate/ffs/api"
it "github.com/textileio/powergate/ffs/integrationtest"
"github.com/textileio/powergate/ffs/scheduler"
"github.com/textileio/powergate/tests"
"github.com/textileio/powergate/util"
)
Expand All @@ -26,6 +27,7 @@ func TestMain(m *testing.M) {

func TestUnfreeze(t *testing.T) {
t.Parallel()
scheduler.HardcodedHotTimeout = time.Second * 10
tests.RunFlaky(t, func(t *tests.FlakyT) {
ipfsAPI, _, fapi, cls := it.NewAPI(t, 1)
defer cls()
Expand Down
6 changes: 3 additions & 3 deletions ffs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ var (
// recover its last configured default StorageConfig from the datastore.
zeroConfig = ffs.StorageConfig{
Hot: ffs.HotConfig{
Enabled: true,
Enabled: false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided with @asutula to avoid disk usage explosion.
If hte user want's HotStorage enabled, should be set explicitly on pushing a config.

Ipfs: ffs.IpfsConfig{
AddTimeout: 30,
AddTimeout: 300, // 5 min
},
},
Cold: ffs.ColdConfig{
Enabled: true,
Filecoin: ffs.FilConfig{
RepFactor: 5,
TrustedMiners: []string{"t016303", "t016304", "t016305", "t016306", "t016309"},
DealMinDuration: util.MinDealDuration * 2,
DealMinDuration: util.MinDealDuration,
FastRetrieval: true,
DealStartOffset: 72 * 60 * 60 / util.EpochDurationSeconds, // 72hs
},
Expand Down
149 changes: 149 additions & 0 deletions ffs/minerselector/sr2/sr2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package sr2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of the selector:

  • Fetch the json file
  • Parse it
  • Get covered on some possibly weird values (amount < 0, etc)
  • Use the ask cache index as much as possible. Some miners might not be there if they have 0 power. So instead of changing the ask index to include also miners with 0 power (which are a lot and would affect index building times), I decided that if a miner in this trusted list hasn't a cached ask, we'd do that directly here.


import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/types"
logger "github.com/ipfs/go-log/v2"
"github.com/textileio/powergate/ffs"
askindex "github.com/textileio/powergate/index/ask/runner"
"github.com/textileio/powergate/lotus"
)

var (
log = logger.Logger("sr2-miner-selector")
)

// MinerSelector chooses miner under SR2 strategy.
type MinerSelector struct {
url string
ai *askindex.Runner
cb lotus.ClientBuilder
}

var _ ffs.MinerSelector = (*MinerSelector)(nil)

type minersBuckets struct {
Buckets []bucket
}

type bucket struct {
Amount int
MinerAddresses []string
}

// New returns a new SR2 miner selector.
func New(url string, ai *askindex.Runner, cb lotus.ClientBuilder) (*MinerSelector, error) {
ms := &MinerSelector{url: url, ai: ai, cb: cb}

_, err := ms.getMiners()
if err != nil {
return nil, fmt.Errorf("verifying sr2 url content: %s", err)
}

return ms, nil
}

// GetMiners returns miners from SR2.
func (ms *MinerSelector) GetMiners(n int, f ffs.MinerSelectorFilter) ([]ffs.MinerProposal, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important fact. This MinerSelector will ignore n and f.
The replication factor will be the sum of Amount of buckets, and not filtering is done.

So this is a very aggressive miner selector kind of forcing what was decided in SR2. No custom StorageConfig will change this fact, unless the user runs with --ffsminerselector=reputation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember, but I suppose the replication factor provided in the storage config is passed into a miner selector via ffs.MinerSelectorFilter? So this means that user provided replication factor is ignored? That is fine, just want to be sure I understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, is ignored. Now will be controled in this remote file now that --ffsminerselector=sr2 default miner selector.
If someone wants to come back to not being ignored, they could specify --ffsminerselector=reputation and things will work as before.

asks := ms.ai.Get()
mb, err := ms.getMiners()
if err != nil {
return nil, fmt.Errorf("getting miners from url: %s", err)
}

c, cls, err := ms.cb()
if err != nil {
return nil, fmt.Errorf("creating lotus client: %s", err)
}
defer cls()

var selected []string
for _, bucket := range mb.Buckets {
rand.Seed(time.Now().UnixNano())
miners := bucket.MinerAddresses
rand.Shuffle(len(miners), func(i, j int) { miners[i], miners[j] = miners[j], miners[i] })

// Stay safe.
if bucket.Amount < 0 {
bucket.Amount = 0
}
if bucket.Amount > len(miners) {
bucket.Amount = len(miners)
}
selected = append(selected, miners[:bucket.Amount]...)
}

if len(selected) == 0 {
return nil, fmt.Errorf("no SR2 miners are available")
}

res := make([]ffs.MinerProposal, 0, len(selected))
for _, miner := range selected {
sa, ok := asks.Storage[miner]
if !ok {
sask, err := getMinerQueryAsk(c, miner)
if err != nil {
log.Warnf("miner %s not in ask cache and query-ask errored: %s", miner, err)
continue
}

log.Infof("miner %s not in ask-cache, direct query-ask price: %d", miner, sask)
sa.Price = sask
}
res = append(res, ffs.MinerProposal{
Addr: miner,
EpochPrice: sa.Price,
})
}

return res, nil
}

func (ms *MinerSelector) getMiners() (minersBuckets, error) {
r, err := http.DefaultClient.Get(ms.url)
if err != nil {
return minersBuckets{}, fmt.Errorf("getting miners list from url: %s", err)
}
defer func() {
if err := r.Body.Close(); err != nil {
log.Warnf("closing request body from sr2 file: %s", err)
}
}()
content, err := ioutil.ReadAll(r.Body)
if err != nil {
return minersBuckets{}, fmt.Errorf("reading body: %s", err)
}
var res minersBuckets
if err := json.Unmarshal(content, &res); err != nil {
return minersBuckets{}, fmt.Errorf("unmarshaling url contents: %s", err)
}
return res, nil
}

func getMinerQueryAsk(c *apistruct.FullNodeStruct, addrStr string) (uint64, error) {
addr, err := address.NewFromString(addrStr)
if err != nil {
return 0, fmt.Errorf("miner address is invalid: %s", err)
}
ctx, cls := context.WithTimeout(context.Background(), time.Second*15)
defer cls()
mi, err := c.StateMinerInfo(ctx, addr, types.EmptyTSK)
if err != nil {
return 0, fmt.Errorf("getting miner %s info: %s", addr, err)
}

sask, err := c.ClientQueryAsk(ctx, *mi.PeerId, addr)
if err != nil {
return 0, fmt.Errorf("query asking: %s", err)
}
return sask.Ask.Price.Uint64(), nil
}
10 changes: 9 additions & 1 deletion ffs/scheduler/scheduler_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
"github.com/textileio/powergate/ffs/scheduler/internal/sjstore"
)

var (
// HardcodedHotTimeout is a temporary override of storage configs
// value for AddTimeout.
HardcodedHotTimeout = time.Second * 300
)

// PushConfig queues the specified StorageConfig to be executed as a new Job. It returns
// the created JobID for further tracking of its state.
func (s *Scheduler) PushConfig(iid ffs.APIID, c cid.Cid, cfg ffs.StorageConfig) (ffs.JobID, error) {
Expand Down Expand Up @@ -202,7 +208,9 @@ func (s *Scheduler) executeHotStorage(ctx context.Context, curr ffs.CidInfo, cfg
return ffs.HotInfo{Enabled: false}, nil
}

sctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(cfg.Ipfs.AddTimeout))
// ToDo: this is a hot-fix to force a big timeout until we have a
// migration tool to make this tunable again.
sctx, cancel := context.WithTimeout(ctx, HardcodedHotTimeout)
defer cancel()

var size int
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ require (
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20200731171407-e559a0579161 // indirect
github.com/filecoin-project/go-fil-markets v0.6.0
github.com/filecoin-project/go-fil-markets v0.6.3
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df
github.com/filecoin-project/lotus v0.6.2
github.com/filecoin-project/specs-actors v0.9.7
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc
github.com/filecoin-project/lotus v0.8.0
github.com/filecoin-project/specs-actors v0.9.11
github.com/gin-contrib/location v0.0.2
github.com/gin-contrib/static v0.0.0-20191128031702-f81c604d8ac2
github.com/gin-gonic/gin v1.6.3
Expand All @@ -35,8 +35,9 @@ require (
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4
github.com/jessevdk/go-assets v0.0.0-20160921144138-4f4301a06e15
github.com/kyokomi/emoji v2.2.4+incompatible // indirect
github.com/libp2p/go-libp2p v0.11.0
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-kad-dht v0.8.3
Expand Down Expand Up @@ -65,6 +66,6 @@ require (
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
)

replace github.com/dgraph-io/badger/v2 => github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200718033852-37ee16d8ad1c
replace github.com/dgraph-io/badger/v2 => github.com/dgraph-io/badger/v2 v2.2007.2

replace github.com/filecoin-project/filecoin-ffi => github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200904205544-405691046034
replace github.com/filecoin-project/filecoin-ffi => github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
Loading