-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenTelemetry metrics & Miner Index fix for document max-size limit & new deal-watcher #811
Conversation
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
IndexMinersRefreshOnStart: false, | ||
IndexMinersOnChainMaxParallel: 1, | ||
IndexMinersOnChainFrequency: time.Minute, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New knobs via env/flags.
minerIndex "github.com/textileio/powergate/v2/index/miner/module" | ||
minerIndex "github.com/textileio/powergate/v2/index/miner/lotusidx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switching names since module
still feels too abstract. I mention lotus
in the new name since this implementation of index/miner
uses a Lotus as a Filecoin client.
@@ -74,6 +76,7 @@ var ( | |||
2: migration.V2StorageInfoDealIDs, | |||
3: migration.V3StorageJobsIndexMigration, | |||
4: migration.V4RecordsMigration, | |||
5: migration.V5DeleteOldMinerIndex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple migration to delete old Miner Index store, since the new one uses another key namespace.
mi, err := minerModule.New(txndstr.Wrap(ds, "index/miner"), clientBuilder, fchost, mm, conf.IndexMinersRefreshOnStart, conf.DisableIndices) | ||
|
||
log.Info("Starting miner index...") | ||
minerIdxConf := minerIndex.Config{ | ||
RefreshOnStart: conf.IndexMinersRefreshOnStart, | ||
Disable: conf.DisableIndices, | ||
OnChainMaxParallel: conf.IndexMinersOnChainMaxParallel, | ||
OnChainFrequency: conf.IndexMinersOnChainFrequency, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid constructor parameter explosion: introducing some config struct.
return nil, fmt.Errorf("opening badger datastore: %s", err) | ||
} | ||
return ds, nil | ||
return measure.New("powergate.datastore", ds), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we now return a wrapped datastore (being Badger or MongoDB backed) that will intercept go-datastore
calls and produce counters and histograms for operations. These will be collected by OpenTelemetry and exposed as Prometehus endpoint.
} | ||
|
||
// SaveOnChain creates/updates on-chain information of miners. | ||
func (s *Store) SaveOnChain(ctx context.Context, index miner.ChainIndex) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The miner index has a lot of miners. I used a datastore.Batching
interface to allow to persist things in batches since single updates were too slow.
if i%1000 == 0 { | ||
if err := b.Commit(); err != nil { | ||
return fmt.Errorf("committing batch: %s", err) | ||
} | ||
b, err = s.ds.Batch() | ||
if err != nil { | ||
return fmt.Errorf("creating batch: %s", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every 1000 miner index entries, we commit the batch and start a new one.
@@ -7,19 +7,11 @@ import ( | |||
"time" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migrating some old metrics about Lotus health from OpenCensus to OpenTelemetry, plus adding other to know about sync lag. (See screenshot from Grafana dashboard in Lotus section).
Run: func(ds datastoreReaderWriter) error { | ||
q := query.Query{Prefix: "/index/miner/chainstore"} | ||
res, err := ds.Query(q) | ||
if err != nil { | ||
return fmt.Errorf("querying records: %s", err) | ||
} | ||
defer func() { _ = res.Close() }() | ||
|
||
var count int | ||
for v := range res.Next() { | ||
if err := ds.Delete(datastore.NewKey(v.Key)); err != nil { | ||
return fmt.Errorf("deleting miner chainstore key: %s", err) | ||
} | ||
count++ | ||
} | ||
log.Infof("deleted %d chainstore keys", count) | ||
|
||
return nil | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new miner-index store saves the data in a new namespace. The only thing this migration is doing is removing the old persisted miner-index data. If wasn't removed it would stay there forever. Isn't really that big, but doing it as to keep things clean.
@@ -18,6 +18,7 @@ import ( | |||
logger "github.com/ipfs/go-log/v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some metrics in the Wallet Module to have some counter of how many new addresses and transfers are happening. This would show interesting info for Powergate used by Hub, since we create a new wallet-address per new Account/User. It will be cool to see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea very interesting.
So good! Curious if any of the refactoring here makes it easier to eventually add in some rolling averages stuff (textileio/textile#519). |
The Hub Miner Index is built with That daemon is already querying Powergate on regular intervals about prices offered by miners, so it can already do further aggregations (being rolling averages, or other things) on its own. It would need to keep some short history of the last ask prices, and do more calculation on tops or similar. |
Gotcha, that makes sense 👍🏻 (thanks!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. No issues at all, just some general questions for my own curiosity.
@@ -105,6 +106,16 @@ func (m *Module) Watch(ctx context.Context, proposal cid.Cid) (<-chan deals.Stor | |||
go func() { | |||
defer close(updates) | |||
|
|||
watcherUpdates := make(chan struct{}, 20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool. Glad we're going to try the push based mechanism, and will either have success or file some new lotus issues. Good outcome either way.
@@ -103,6 +103,9 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l | |||
go func() { | |||
defer lapiCls() | |||
defer close(out) | |||
m.metricRetrievalTracking.Add(ctx, 1) | |||
defer m.metricRetrievalTracking.Add(ctx, -1) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how easy this api is to use. In the new services I've been creating, I've exposed this sort of metrics data through the api thinking that some external monitoring would poll the api. Is that even possible? Either way, this seems like a nicer solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that could be possible. I'd vote to let the daemon itself publish the metric instead of being an external component, just at to make metrics a baked-in feature of the daemon.
mi.metricLock.Lock() | ||
defer mi.metricLock.Unlock() | ||
result.Observe(mi.onchainProgress, onchainSubindex) | ||
result.Observe(mi.metaProgress, metaSubindex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome you can observe like this. Seems like magic, I guess it's probably simple since it's Go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I am really curious, when it comes to actually being notified the observed valued changed... how does that work? You update the observed value directly, not via some func wrapper, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This metric is defined as:
_ = metric.Must(meter).NewFloat64ValueObserver("powergate.index.ask.progress", ai.progressValueObserver, metric.WithDescription("Ask index refresh progress"), metric.WithUnit(unit.Dimensionless))
So what happens in the NewXXXXObserver
style of metrics, is that the OpenTelemetry library will call the callback (progressValueObserver
) on defined intervals.
This has some benefits of not needing to record quickly changing metrics, and also registering a ton of observations within the same timestamp.
In this case, since the update progress is a gauge, I let this metric be updated on every collection interval.
There are quite a few things to learn, and keep improving. I think this is the best reference as to understand how different measurements work. I still need to give a full read, but that's the best source of documentation for the metrics API.
@@ -18,6 +18,7 @@ import ( | |||
logger "github.com/ipfs/go-log/v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea very interesting.
This PR includes multiple improvements:
go-datastore
implementation is wrapped to a cusotmizedgo-ds-measure
to publish Prometheus metrics of Get, Put, Query, Commit, and other usual operations ofgo-datastore
. We forked the original repo to add Txn based metrics.go-datastore
not allowing to save documents bigger than 16MB.DealWatcher
component was created to provide an instant-based solution to detect deal state changes. The existing poll-style is kept because we aren't totally sure how reliable this new way is. In fact, this was done originally and we switched to poll-based since long-running websocket connections ofgo-jsonrpc
for the Lotus client were unreliable. Keeping both allows to get the benefits of instant updating, but if for some reason that breaks, we still do polling once in a while. (Apart from trying to recover that broken watcher).A new set of application metrics are exposed:
Datastore:
go-datastore
latencies.I believe there's a lot of room to continue thinking if these are the right metrics or other ones that are useful to be gathered. The main point here was doing some kickoff and we can keep making some incremental improvements not that we have things wired.
Here's a quick screenshot of the Grafana dashboard. It has some rough edges, but we'll eventually also publish in Grafana dashboards:
While doing this work, some secondary repos were forked+edited and created: