Skip to content

Commit

Permalink
SR2 miner selector price cap & index refactor (#668)
Browse files Browse the repository at this point in the history
* improve sr2 selector & refactor indexes

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* fix msg

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* limit

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* controlled error

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* skip external test

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* lints

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* nit

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* increase defaults

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* less noisy progress

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* nit

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>

* use power of two equivalent

Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
  • Loading branch information
jsign authored Oct 12, 2020
1 parent 0121275 commit d1840e2
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 109 deletions.
2 changes: 1 addition & 1 deletion cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func setupFlags() error {
pflag.String("mongodb", "", "Mongo database name. (if --mongouri is used, is mandatory")
pflag.String("ffsminerselector", "sr2", "Miner selector to be used by FFS: 'sr2', 'reputation'")
pflag.String("ffsminerselectorparams", "https://raw.githubusercontent.com/filecoin-project/slingshot/master/miners.json", "Miner selector configuration parameter, depends on --ffsminerselector")
pflag.String("ffsminimumpiecesize", "52428800", "Minimum piece size in bytes allowed to be stored in Filecoin")
pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin")
pflag.String("ffsschedmaxparallel", "1000", "Maximum amount of Jobs executed in parallel")
pflag.String("dealwatchpollduration", "900", "Poll interval in seconds used by Deals Module watch to detect state changes")
pflag.String("ffsdealfinalitytimeout", "4320", "Deadline in minutes in which a deal must prove liveness changing status before considered abandoned")
Expand Down
35 changes: 31 additions & 4 deletions ffs/minerselector/sr2/sr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/types"
logger "github.com/ipfs/go-log/v2"
"github.com/textileio/powergate/ffs"
"github.com/textileio/powergate/lotus"
)

const (
maxSR2Price = 100_000_000_000
)

var (
log = logger.Logger("sr2-miner-selector")
)
Expand Down Expand Up @@ -83,6 +88,10 @@ func (ms *MinerSelector) GetMiners(n int, f ffs.MinerSelectorFilter) ([]ffs.Mine
log.Warnf("sr2 miner %s query-ask errored: %s", miners[i], err)
continue
}
if sask > maxSR2Price {
log.Warnf("skipping miner %s since has price %d above maximum allowed for SR2", miners[i], sask)
continue
}
if f.MaxPrice > 0 && sask > f.MaxPrice {
log.Warnf("skipping miner %s with price %d higher than max-price %d", miners[i], sask, f.MaxPrice)
continue
Expand Down Expand Up @@ -150,9 +159,27 @@ func getMinerQueryAsk(c *apistruct.FullNodeStruct, addrStr string) (uint64, erro
return 0, fmt.Errorf("getting miner %s info: %s", addr, err)
}

sask, err := c.ClientQueryAsk(ctx, *mi.PeerId, addr)
if err != nil {
return 0, fmt.Errorf("query asking: %s", err)
type chAskRes struct {
Error string
Ask *storagemarket.StorageAsk
}
chAsk := make(chan chAskRes)
go func() {
sask, err := c.ClientQueryAsk(ctx, *mi.PeerId, addr)
if err != nil {
chAsk <- chAskRes{Error: err.Error()}
return
}
chAsk <- chAskRes{Ask: sask}
}()

select {
case <-time.After(time.Second * 10):
return 0, fmt.Errorf("query asking timed out")
case r := <-chAsk:
if r.Error != "" {
return 0, fmt.Errorf("query ask had controlled error: %s", r.Error)
}
return r.Ask.Price.Uint64(), nil
}
return sask.Price.Uint64(), nil
}
54 changes: 54 additions & 0 deletions ffs/minerselector/sr2/sr2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package sr2

import (
"context"
"fmt"
"testing"

"github.com/filecoin-project/go-address"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/textileio/powergate/ffs"
"github.com/textileio/powergate/lotus"
)

// TestMS is meant to be runned locally since it needs a fully
// synced Lotus node.
func TestMS(t *testing.T) {
t.SkipNow()
lotusHost, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/5555")
require.NoError(t, err)
lotusToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBbGxvdyI6WyJyZWFkIiwid3JpdGUiLCJzaWduIiwiYWRtaW4iXX0.4KpuySIvV4n6kBEXQOle-hi1Ec3lyUmRYCknz4NQyLM"

cb, err := lotus.NewBuilder(lotusHost, lotusToken, 1)
require.NoError(t, err)

url := "https://raw.githubusercontent.com/filecoin-project/slingshot/master/miners.json"
sr2, err := New(url, cb)
require.NoError(t, err)

for {
_, err := sr2.GetMiners(1, ffs.MinerSelectorFilter{})
require.NoError(t, err)
}
}

func TestCustom(t *testing.T) {
t.SkipNow()
lotusHost, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/5555")
require.NoError(t, err)
lotusToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBbGxvdyI6WyJyZWFkIiwid3JpdGUiLCJzaWduIiwiYWRtaW4iXX0.4KpuySIvV4n6kBEXQOle-hi1Ec3lyUmRYCknz4NQyLM"

cb, err := lotus.NewBuilder(lotusHost, lotusToken, 1)
require.NoError(t, err)

c, cls, err := cb()
require.NoError(t, err)
defer cls()

addr, err := address.NewFromString("t3rvsbv5yj5lil74o33bfn5mjsdlnnogukgqua5cnumtid3kgibqeer2uaipcm57iil2ndzykzq34ebp2xajwq")
require.NoError(t, err)
b, err := c.WalletBalance(context.Background(), addr)
require.NoError(t, err)
fmt.Printf("Balance: %d\n", b)
}
2 changes: 1 addition & 1 deletion ffs/scheduler/scheduler_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Scheduler) push(iid ffs.APIID, c cid.Cid, cfg ffs.StorageConfig, oldCid
return ffs.EmptyJobID, fmt.Errorf("enqueuing job: %s", err)
}
if jid := s.sjs.GetExecutingJob(c); jid != nil {
s.l.Log(ctx, "Job %s is already being executed for the same data, this job will be queued until it finishes or is canceled.")
s.l.Log(ctx, "Job %s is already being executed for the same data, this job will be queued until it finishes or is canceled.", jid)
}

select {
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblf
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.4.0 h1:zgVt4UpGxcqVOw97aRGxT4svlcmdK35fynLNctY32zI=
github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand All @@ -405,6 +406,7 @@ github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/status v1.0.3/go.mod h1:SavQ51ycCLnc7dGyJxp8YAmudx8xqiVrRf+6IXRsugc=
github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA=
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down
70 changes: 36 additions & 34 deletions index/miner/module/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,53 @@ import (
)

var (
metadataRefreshInterval = time.Hour
pingTimeout = time.Second * 5
pingRateLim = 1
metaRefreshInterval = time.Hour * 6
pingTimeout = time.Second * 5
pingRateLim = 1
)

var (
dsKeyMetaIndex = dsBase.ChildString("meta")
)

// metaWorker makes a pass on refreshing metadata information about known miners.
func (mi *Index) metaWorker() {
defer func() { mi.finished <- struct{}{} }()
mi.chMeta <- struct{}{}
for {
select {
case <-mi.ctx.Done():
log.Info("graceful shutdown of meta updater")
func (mi *Index) startMetaWorker(disabled bool) {
mi.wg.Add(1)
go func() {
defer mi.wg.Done()
if disabled {
log.Infof("meta index worker disabled")
return
case _, ok := <-mi.chMeta:
if !ok {
log.Info("meta worker channel closed")
}

for {
select {
case <-mi.ctx.Done():
log.Info("graceful shutdown of meta updater")
return
}
log.Info("updating meta index...")
// ToDo: coud have smarter ways of electing which addrs to refresh, and then
// doing a merge. Will depend if this too slow, but might not be the case
mi.lock.Lock()
addrs := make([]string, 0, len(mi.index.OnChain.Miners))
for addr, miner := range mi.index.OnChain.Miners {
if miner.Power > 0 {
addrs = append(addrs, addr)
case <-time.After(metaRefreshInterval):
log.Info("updating meta index...")
// ToDo: coud have smarter ways of electing which addrs to refresh, and then
// doing a merge. Will depend if this too slow, but might not be the case
mi.lock.Lock()
addrs := make([]string, 0, len(mi.index.OnChain.Miners))
for addr, miner := range mi.index.OnChain.Miners {
if miner.Power > 0 {
addrs = append(addrs, addr)
}
}
mi.lock.Unlock()
newIndex := updateMetaIndex(mi.ctx, mi.clientBuilder, mi.h, mi.lr, addrs)
if err := mi.persistMetaIndex(newIndex); err != nil {
log.Errorf("persisting meta index: %s", err)
}
mi.lock.Lock()
mi.index.Meta = newIndex
mi.lock.Unlock()
mi.signaler.Signal() // ToDo: consider a finer-grained signaling
log.Info("meta index updated")
}
mi.lock.Unlock()
newIndex := updateMetaIndex(mi.ctx, mi.clientBuilder, mi.h, mi.lr, addrs)
if err := mi.persistMetaIndex(newIndex); err != nil {
log.Errorf("persisting meta index: %s", err)
}
mi.lock.Lock()
mi.index.Meta = newIndex
mi.lock.Unlock()
mi.signaler.Signal() // ToDo: consider a finer-grained signaling
log.Info("meta index updated")
}
}
}()
}

// updateMetaIndex generates a new index that contains fresh metadata information
Expand Down
99 changes: 33 additions & 66 deletions index/miner/module/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (
txndstr "github.com/textileio/powergate/txndstransform"
)

const (
goroutinesCount = 2
)

var (
minersRefreshInterval = time.Minute * 30
minersRefreshInterval = time.Hour * 6
maxParallelism = 1
dsBase = datastore.NewKey("index")

Expand All @@ -48,17 +44,13 @@ type Index struct {
lr iplocation.LocationResolver
signaler *signaler.Signaler

chMeta chan struct{}
metaTicker *time.Ticker
minerTicker *time.Ticker
lock sync.Mutex
index miner.IndexSnapshot

ctx context.Context
cancel context.CancelFunc
finished chan struct{}
clsLock sync.Mutex
closed bool
lock sync.Mutex
index miner.IndexSnapshot

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
closed bool
}

// New returns a new MinerIndex. It loads from ds any previous state and starts
Expand All @@ -78,19 +70,16 @@ func New(ds datastore.TxnDatastore, clientBuilder lotus.ClientBuilder, h P2PHost
signaler: signaler.New(),
h: h,
lr: lr,
chMeta: make(chan struct{}, 1),
metaTicker: time.NewTicker(metadataRefreshInterval),
minerTicker: time.NewTicker(minersRefreshInterval),

ctx: ctx,
cancel: cancel,
finished: make(chan struct{}, 2),
ctx: ctx,
cancel: cancel,
}
if err := mi.loadFromDS(); err != nil {
return nil, err
}
go mi.start(disable)
go mi.metaWorker()

mi.startMinerWorker(disable)
mi.startMetaWorker(disable)
return mi, nil
}

Expand Down Expand Up @@ -133,64 +122,42 @@ func (mi *Index) Unregister(c chan struct{}) {
func (mi *Index) Close() error {
log.Info("closing...")
defer log.Info("closed")
mi.clsLock.Lock()
defer mi.clsLock.Unlock()
mi.lock.Lock()
defer mi.lock.Unlock()
if mi.closed {
return nil
}
mi.cancel()
for i := 0; i < goroutinesCount; i++ {
<-mi.finished
}
close(mi.finished)
mi.wg.Wait()
mi.signaler.Close()
mi.minerTicker.Stop()
mi.metaTicker.Stop()

mi.closed = true
return nil
}

// start is a long running job that keep the index up to date. It separates
// updating tasks in two components. Updating on-chain information whenever
// a new potential tipset is notified by the full node. And a metadata updater
// which do best-efforts to gather/update off-chain information about known
// miners.
func (mi *Index) start(disabled bool) {
defer func() { mi.finished <- struct{}{} }()

if !disabled {
func (mi *Index) startMinerWorker(disabled bool) {
mi.wg.Add(1)
go func() {
defer mi.wg.Done()
if disabled {
log.Infof("miners index worker disabled")
return
}
if err := mi.updateOnChainIndex(); err != nil {
log.Errorf("initial updating miner index: %s", err)
}
}
mi.chMeta <- struct{}{}
for {
select {
case <-mi.ctx.Done():
log.Info("graceful shutdown of background miner index")
return
case <-mi.metaTicker.C:
if disabled {
log.Infof("skipping meta index update since it's disabled")
continue
}
for {
select {
case mi.chMeta <- struct{}{}:
default:
log.Info("skipping meta index update since it's busy")
}
case <-mi.minerTicker.C:
if disabled {
log.Infof("skipping miner index update since it's disabled")
continue
}
if err := mi.updateOnChainIndex(); err != nil {
log.Errorf("updating miner index: %s", err)
continue
case <-mi.ctx.Done():
log.Info("graceful shutdown of background miner index")
return
case <-time.After(minersRefreshInterval):
if err := mi.updateOnChainIndex(); err != nil {
log.Errorf("updating miner index: %s", err)
}
}
}
}
}()
}

// loadFromDS loads persisted indexes to memory datastructures. No locks needed
Expand Down
Loading

0 comments on commit d1840e2

Please sign in to comment.