Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry metrics & Miner Index fix for document max-size limit & new deal-watcher #811

Merged
merged 22 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,34 @@ func defaultServerConfig(t *testing.T) server.Config {

grpcMaddr := util.MustParseAddr(grpcHostAddress)
conf := server.Config{
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
WalletInitialFunds: *big.NewInt(int64(4000000000)),
IpfsAPIAddr: ipfsAddr,
LotusAddress: devnetAddr,
LotusAuthToken: "",
LotusMasterAddr: "",
LotusConnectionRetries: 5,
Devnet: true,
GrpcHostNetwork: grpcHostNetwork,
GrpcHostAddress: grpcMaddr,
GrpcWebProxyAddress: grpcWebProxyAddress,
RepoPath: repoPath,
GatewayHostAddr: gatewayHostAddr,
IndexRawJSONHostAddr: indexRawJSONHostAddr,
MaxMindDBFolder: "../../iplocation/maxmind",
MinerSelector: "reputation",
FFSDealFinalityTimeout: time.Minute * 30,
FFSMaxParallelDealPreparing: 1,
FFSGCAutomaticGCInterval: 0,
FFSRetrievalNextEventTimeout: time.Hour,
DealWatchPollDuration: time.Second * 15,
SchedMaxParallel: 10,
AskIndexQueryAskTimeout: time.Second * 3,
AskIndexRefreshInterval: time.Second * 3,
AskIndexRefreshOnStart: true,
AskindexMaxParallel: 2,
IndexMinersRefreshOnStart: false,
IndexMinersOnChainMaxParallel: 1,
IndexMinersOnChainFrequency: time.Minute,
Comment on lines +64 to +66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New knobs via env/flags.

}
return conf
}
Expand Down
2 changes: 1 addition & 1 deletion api/server/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/textileio/powergate/v2/ffs/manager"
"github.com/textileio/powergate/v2/ffs/scheduler"
askIndex "github.com/textileio/powergate/v2/index/ask/runner"
minerIndex "github.com/textileio/powergate/v2/index/miner/module"
minerIndex "github.com/textileio/powergate/v2/index/miner/lotusidx"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching names since module still feels too abstract. I mention lotus in the new name since this implementation of index/miner uses a Lotus as a Filecoin client.

"github.com/textileio/powergate/v2/wallet"
)

Expand Down
66 changes: 47 additions & 19 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
badger "github.com/ipfs/go-ds-badger2"
httpapi "github.com/ipfs/go-ipfs-http-client"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
measure "github.com/textileio/go-ds-measure"
mongods "github.com/textileio/go-ds-mongo"
adminPb "github.com/textileio/powergate/v2/api/gen/powergate/admin/v1"
userPb "github.com/textileio/powergate/v2/api/gen/powergate/user/v1"
Expand All @@ -44,7 +46,7 @@ import (
"github.com/textileio/powergate/v2/gateway"
ask "github.com/textileio/powergate/v2/index/ask/runner"
faultsModule "github.com/textileio/powergate/v2/index/faults/module"
minerModule "github.com/textileio/powergate/v2/index/miner/module"
minerIndex "github.com/textileio/powergate/v2/index/miner/lotusidx"
"github.com/textileio/powergate/v2/iplocation/maxmind"
"github.com/textileio/powergate/v2/lotus"
"github.com/textileio/powergate/v2/migration"
Expand Down Expand Up @@ -74,6 +76,7 @@ var (
2: migration.V2StorageInfoDealIDs,
3: migration.V3StorageJobsIndexMigration,
4: migration.V4RecordsMigration,
5: migration.V5DeleteOldMinerIndex,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple migration to delete old Miner Index store, since the new one uses another key namespace.

}
)

Expand All @@ -83,7 +86,7 @@ type Server struct {

mm *maxmind.MaxMind
ai *ask.Runner
mi *minerModule.Index
mi *minerIndex.Index
fi *faultsModule.Index
dm *dealsModule.Module
wm *lotusWallet.Module
Expand Down Expand Up @@ -146,7 +149,9 @@ type Config struct {
AskIndexRefreshInterval time.Duration
AskIndexRefreshOnStart bool

IndexMinersRefreshOnStart bool
IndexMinersRefreshOnStart bool
IndexMinersOnChainMaxParallel int
IndexMinersOnChainFrequency time.Duration

DisableIndices bool

Expand Down Expand Up @@ -214,32 +219,47 @@ func NewServer(conf Config) (*Server, error) {
if err != nil {
return nil, fmt.Errorf("opening maxmind database: %s", err)
}
askConf := ask.Config{
askIdxConf := ask.Config{
Disable: conf.DisableIndices,
QueryAskTimeout: conf.AskIndexQueryAskTimeout,
MaxParallel: conf.AskindexMaxParallel,
RefreshInterval: conf.AskIndexRefreshInterval,
RefreshOnStart: conf.Devnet || conf.AskIndexRefreshOnStart,
}
ai, err := ask.New(txndstr.Wrap(ds, "index/ask"), clientBuilder, askConf)
log.Info("Starting ask index...")
ai, err := ask.New(txndstr.Wrap(ds, "index/ask"), clientBuilder, askIdxConf)
if err != nil {
return nil, fmt.Errorf("creating ask index: %s", err)
}
mi, err := minerModule.New(txndstr.Wrap(ds, "index/miner"), clientBuilder, fchost, mm, conf.IndexMinersRefreshOnStart, conf.DisableIndices)

log.Info("Starting miner index...")
minerIdxConf := minerIndex.Config{
RefreshOnStart: conf.IndexMinersRefreshOnStart,
Disable: conf.DisableIndices,
OnChainMaxParallel: conf.IndexMinersOnChainMaxParallel,
OnChainFrequency: conf.IndexMinersOnChainFrequency,
}
Comment on lines -228 to +241
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid constructor parameter explosion: introducing some config struct.

mi, err := minerIndex.New(kt.Wrap(ds, kt.PrefixTransform{Prefix: datastore.NewKey("index/miner")}), clientBuilder, fchost, mm, minerIdxConf)
if err != nil {
return nil, fmt.Errorf("creating miner index: %s", err)
}

log.Info("Starting faults index...")
si, err := faultsModule.New(txndstr.Wrap(ds, "index/faults"), clientBuilder, conf.DisableIndices)
if err != nil {
return nil, fmt.Errorf("creating faults index: %s", err)
}
if conf.Devnet {
conf.DealWatchPollDuration = time.Second
}

log.Info("Starting deals module...")
dm, err := dealsModule.New(txndstr.Wrap(ds, "deals"), clientBuilder, conf.DealWatchPollDuration, conf.FFSDealFinalityTimeout, deals.WithImportPath(filepath.Join(conf.RepoPath, "imports")))
if err != nil {
return nil, fmt.Errorf("creating deal module: %s", err)
}

log.Info("Starting wallet module...")
wm, err := lotusWallet.New(clientBuilder, masterAddr, conf.WalletInitialFunds, conf.AutocreateMasterAddr, networkName)
if err != nil {
return nil, fmt.Errorf("creating wallet module: %s", err)
Expand Down Expand Up @@ -268,6 +288,7 @@ func NewServer(conf Config) (*Server, error) {
return nil, fmt.Errorf("creating coreipfs: %s", err)
}

log.Info("Starting FFS scheduler...")
var sr2rf func() (int, error)
if ms, ok := ms.(*sr2.MinerSelector); ok {
sr2rf = ms.GetReplicationFactor
Expand Down Expand Up @@ -521,6 +542,9 @@ func (s *Server) Close() {
if err := s.l.Close(); err != nil {
log.Errorf("closing joblogger: %s", err)
}
if err := s.dm.Close(); err != nil {
log.Errorf("closing deal module: %s", err)
}
if err := s.rm.Close(); err != nil {
log.Errorf("closing reputation module: %s", err)
}
Expand Down Expand Up @@ -549,6 +573,9 @@ func (s *Server) Close() {
}

func createDatastore(conf Config, longTimeout bool) (datastore.TxnDatastore, error) {
var ds datastore.TxnDatastore
var err error

if conf.MongoURI != "" {
log.Info("Opening Mongo database...")
mongoCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand All @@ -560,24 +587,25 @@ func createDatastore(conf Config, longTimeout bool) (datastore.TxnDatastore, err
if longTimeout {
opts = []mongods.Option{mongods.WithOpTimeout(time.Hour), mongods.WithTxnTimeout(time.Hour)}
}
ds, err := mongods.New(mongoCtx, conf.MongoURI, conf.MongoDB, opts...)
ds, err = mongods.New(mongoCtx, conf.MongoURI, conf.MongoDB, opts...)
if err != nil {
return nil, fmt.Errorf("opening mongo datastore: %s", err)
}
return ds, nil
}
} else {

log.Info("Opening badger database...")
path := filepath.Join(conf.RepoPath, datastoreFolderName)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating repo folder: %s", err)
}
opts := &badger.DefaultOptions
ds, err := badger.NewDatastore(path, opts)
if err != nil {
return nil, fmt.Errorf("opening badger datastore: %s", err)
log.Info("Opening badger database...")
path := filepath.Join(conf.RepoPath, datastoreFolderName)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("creating repo folder: %s", err)
}
opts := &badger.DefaultOptions
ds, err = badger.NewDatastore(path, opts)
if err != nil {
return nil, fmt.Errorf("opening badger datastore: %s", err)
}
}
return ds, nil

return measure.New("powergate.datastore", ds), nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we now return a wrapped datastore (being Badger or MongoDB backed) that will intercept go-datastore calls and produce counters and histograms for operations. These will be collected by OpenTelemetry and exposed as Prometehus endpoint.

}

func getMinerSelector(conf Config, rm *reputation.Module, ai *ask.Runner, cb lotus.ClientBuilder) (ffs.MinerSelector, error) {
Expand Down
70 changes: 40 additions & 30 deletions cmd/powd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ import (
"syscall"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
_ "net/http/pprof"

logging "github.com/ipfs/go-log/v2"
homedir "github.com/mitchellh/go-homedir"
ma "github.com/multiformats/go-multiaddr"
"github.com/spf13/pflag"
"github.com/spf13/viper"
metricsOpenTelemetry "github.com/textileio/go-metrics-opentelemetry"
"github.com/textileio/powergate/v2/api/server"
"github.com/textileio/powergate/v2/buildinfo"
"github.com/textileio/powergate/v2/util"
"go.opencensus.io/plugin/runmetrics"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var (
Expand Down Expand Up @@ -51,10 +57,10 @@ func main() {
log.Infof("starting powd:\n%s", buildinfo.Summary())

// Configuring Prometheus exporter.
closeInstr, err := setupInstrumentation()
if err != nil {
if err := setupInstrumentation(); err != nil {
log.Fatalf("starting instrumentation: %s", err)
}

confProtected := conf
if confProtected.MongoURI != "" {
confProtected.MongoURI = "<hidden>"
Expand All @@ -78,7 +84,6 @@ func main() {
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
<-ch
log.Info("Closing...")
closeInstr()
powd.Close()
if conf.Devnet {
if err := os.RemoveAll(conf.RepoPath); err != nil {
Expand Down Expand Up @@ -140,6 +145,8 @@ func configFromFlags() (server.Config, error) {
askIndexRefreshOnStart := config.GetBool("askindexrefreshonstart")
askIndexMaxParallel := config.GetInt("askindexmaxparallel")
indexMinersRefreshOnStart := config.GetBool("indexminersrefreshonstart")
indexMinersOnChainMaxParallel := config.GetInt("indexminersonchainmaxparallel")
indexMinersOnChainFrequency := config.GetDuration("indexminersonchainfrequency")
disableIndices := config.GetBool("disableindices")
disableNonCompliantAPIs := config.GetBool("disablenoncompliantapis")

Expand Down Expand Up @@ -186,45 +193,45 @@ func configFromFlags() (server.Config, error) {
AskIndexRefreshOnStart: askIndexRefreshOnStart,
AskindexMaxParallel: askIndexMaxParallel,

IndexMinersRefreshOnStart: indexMinersRefreshOnStart,
IndexMinersRefreshOnStart: indexMinersRefreshOnStart,
IndexMinersOnChainMaxParallel: indexMinersOnChainMaxParallel,
IndexMinersOnChainFrequency: indexMinersOnChainFrequency,

DisableIndices: disableIndices,

DisableNonCompliantAPIs: disableNonCompliantAPIs,
}, nil
}

func setupInstrumentation() (func(), error) {
err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
func setupInstrumentation() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of an ugly diff. Basically, OpenCensus wiring was removed and now OpenTelemetry is used.

exporter, err := prometheus.InstallNewPipeline(prometheus.Config{
DefaultHistogramBoundaries: []float64{1e-4, 1e-3, 1e-2, 1e-1, 1},
})
if err != nil {
return nil, fmt.Errorf("enabling runtime metrics: %s", err)
log.Panicf("failed to initialize prometheus exporter %v", err)
}
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "textilefc",
})
if err != nil {
return nil, fmt.Errorf("creating the prometheus stats exporter: %v", err)
}
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
srv := &http.Server{Addr: ":8888", Handler: mux}
http.HandleFunc("/metrics", exporter.ServeHTTP)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("running prometheus scrape endpoint: %v", err)
}
_ = http.ListenAndServe(":8888", nil)
}()
closeFunc := func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Errorf("shutting down prometheus server: %s", err)
}

if err := metricsOpenTelemetry.Inject(); err != nil {
return fmt.Errorf("injecting datastore open-telemetry: %s", err)
}
Comment on lines +218 to +220
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we use this new repo that we created: https://github.com/textileio/go-metrics-opentelemetry
What this does is to provide an implementation of https://github.com/ipfs/go-metrics-interface, which is used by https://github.com/textileio/go-ds-measure/ to collect whatever metrics are registered.
In summary, a kind of Singleton dependency injection.


if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second)); err != nil {
return fmt.Errorf("starting Go runtime metrics: %s", err)
}
Comment on lines +222 to 224
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We leverage an OpenTelemetry contrib plugin to collect metrics available by the Go runtime.


return closeFunc, nil
meter := global.Meter("powergate")
attrBuildDate := attribute.Key("builddate").String(buildinfo.BuildDate)
attrGitSummary := attribute.Key("gitsummary").String(buildinfo.GitSummary)
attrGitBranch := attribute.Key("gitbranch").String(buildinfo.GitBranch)
attrGitCommit := attribute.Key("gitcommit").String(buildinfo.GitCommit)
metricInfo := metric.Must(meter).NewInt64Counter("powergate.info")
metricInfo.Add(context.Background(), 1, attrBuildDate, attrGitSummary, attrGitBranch, attrGitCommit)
Comment on lines +226 to +232
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crafter a single metric to expose some build information as tags. This allows showing in the dashboard exactly what code is running. (cc @textileben )


return nil
}

func setupLogging(repoPath string) error {
Expand All @@ -250,6 +257,7 @@ func setupLogging(repoPath string) error {
// Deals Module
"deals",
"deals-records",
"deals-watcher",

// Wallet Module
"lotus-wallet",
Expand Down Expand Up @@ -403,6 +411,8 @@ func setupFlags() error {
pflag.String("askindexmaxparallel", "3", "Max parallel query ask to execute while updating index.")

pflag.Bool("indexminersrefreshonstart", false, "If true it will refresh the miner's on start.")
pflag.Int64("indexminersonchainmaxparallel", 20, "Max parallelization for building on-chain sub-index")
pflag.Duration("indexminersonchainfrequency", time.Hour*6, "Frequency of updating on-chain sub-index")

pflag.Bool("disableindices", false, "Disable all indices updates, useful to help Lotus syncing process.")
pflag.Bool("disablenoncompliantapis", false, "Disable APIs that may not easily comply with US law.")
Expand Down
Loading