Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: validation service #724

Merged
merged 47 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
9f761b8
flag
Aug 27, 2024
cbc904b
ok
Aug 28, 2024
6cd1cd0
utd
Aug 29, 2024
f411898
Merge branch 'main' into chore/debug-timeout
Aug 30, 2024
722027c
switch to mode
Aug 30, 2024
9c3e0be
compose
Aug 30, 2024
4890a65
flags proper
Aug 30, 2024
b3177d3
log better
Aug 30, 2024
b46f115
working validation timeout
Aug 30, 2024
0fb9422
nice
Aug 30, 2024
58c6e16
mode
Aug 30, 2024
ee41633
mod
Sep 4, 2024
bd0adc6
Merge branch 'main' into chore/debug-timeout
Sep 4, 2024
1b2eec6
ok
Sep 4, 2024
382d47b
format
Sep 4, 2024
111b60a
missing prices
Sep 4, 2024
a82408e
add
Sep 4, 2024
665b1f0
stub
Sep 4, 2024
0dec0b0
add validation service
Sep 4, 2024
c0d06c3
compose
Sep 4, 2024
d943967
lint and mocks
Sep 4, 2024
09ccec9
spell
Sep 4, 2024
3be91a3
fix
Sep 4, 2024
0aad85a
Merge branch 'main' into chore/debug-timeout
Sep 4, 2024
93e4ff3
update
Sep 4, 2024
92fb538
remove all traces
Sep 4, 2024
9b74e0c
finalize
Sep 4, 2024
15d6beb
Merge branch 'main' into chore/debug-timeout
Sep 5, 2024
dcc5c95
update probe
Sep 4, 2024
c71ab9b
ok
Sep 5, 2024
aad9b21
duration
Sep 5, 2024
c6aa4d3
early exit
Sep 5, 2024
c25635b
ok
Sep 5, 2024
54eb60a
fix
Sep 5, 2024
68519e9
Merge branch 'main' into chore/debug-timeout
Sep 5, 2024
9f03086
Update cmd/connect/main.go
Sep 6, 2024
cef98c0
Update cmd/connect/main.go
Sep 6, 2024
03f9037
Merge branch 'main' into chore/debug-timeout
Sep 9, 2024
5cb87bc
remove client
Sep 9, 2024
61836e5
tidy
Sep 9, 2024
44053ab
Merge branch 'main' into chore/debug-timeout
Sep 9, 2024
7bfb9e2
ok
Sep 9, 2024
3cc6030
add validate
Sep 9, 2024
8a3e5f9
test
Sep 9, 2024
d8d81fa
Merge branch 'main' into chore/debug-timeout
Sep 9, 2024
1614b00
Merge branch 'main' into chore/debug-timeout
Sep 9, 2024
bf62b8b
comment out
Sep 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions cmd/connect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import (
"context"
"errors"
"fmt"
"github.com/skip-mev/connect/v2/service/validation"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"syscall"

_ "net/http/pprof" //nolint: gosec
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"

//nolint: gosec

"github.com/skip-mev/connect/v2/cmd/build"
cmdconfig "github.com/skip-mev/connect/v2/cmd/connect/config"
"github.com/skip-mev/connect/v2/oracle"
Expand Down Expand Up @@ -58,6 +61,8 @@ var (
flagPort = "port"
flagUpdateInterval = "update-interval"
flagMaxPriceAge = "max-price-age"
flagMode = "mode"
flagValidationPeriod = "validation-period"

// flag-bound values.
oracleCfgPath string
Expand All @@ -75,12 +80,21 @@ var (
maxAge int
disableCompressLogs bool
disableRotatingLogs bool
mode string
validationPeriod int
)

const (
DefaultLegacyConfigPath = "./oracle.json"
)

type runMode string

var (
aljo242 marked this conversation as resolved.
Show resolved Hide resolved
modeExec runMode = "exec"
modeValidate runMode = "validate"
)

func init() {
rootCmd.Flags().StringVarP(
&marketMapProvider,
Expand Down Expand Up @@ -187,6 +201,20 @@ func init() {
"",
"Use a custom listen-to endpoint for market-map (overwrites what is provided in oracle-config).",
)
rootCmd.Flags().StringVarP(
&mode,
flagMode,
"m",
string(modeExec),
"Select the mode to run the oracle in. Default is \"exec\" which will fetch prices as configured. \"validate\" mode will run the oracle for a set period of time to validate the configuration.",
)
rootCmd.Flags().IntVarP(
&validationPeriod,
flagValidationPeriod,
"",
int(validation.DefaultValidationPeriod),
"Duration to run in validation mode. Note: this flag is only used if mode == \"validate\"",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what the units are for this int--hours minutes days etc.

aljo242 marked this conversation as resolved.
Show resolved Hide resolved
)

// these flags are connected to the OracleConfig.
rootCmd.Flags().Bool(
Expand Down Expand Up @@ -316,6 +344,8 @@ func runOracle() error {
nodeClient, _ = oraclemetrics.NewNodeClient(nodeEndpoint)
}

isValidateMode := runMode(mode) == modeValidate

metrics := oraclemetrics.NewMetricsFromConfig(cfg.Metrics, nodeClient)

aggregator, err := oraclemath.NewIndexPriceAggregator(
Expand Down Expand Up @@ -395,6 +425,28 @@ func runOracle() error {
}()
}

// run validation service if enabled and tear down if completed successfully
if isValidateMode {
valCfg := validation.DefaultConfig()
valCfg.ValidationPeriod = time.Duration(validationPeriod)
validatorService := validation.NewValidator(logger, metrics, valCfg)

go func(c context.CancelFunc) {
defer c()

_, err := validatorService.Run()
if err != nil {
logger.Error("failed to validate metrics", zap.Error(err))

// kill the process
os.Exit(1)
technicallyty marked this conversation as resolved.
Show resolved Hide resolved
}

logger.Info("shutting down gracefully after validation")

}(cancel)
}

// start server (blocks).
if err := srv.StartServer(ctx, cfg.Host, cfg.Port); err != nil {
logger.Error("stopping server", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions contrib/compose/docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ services:
"--market-config-path", "/data/markets.json",
"--pprof-port", "6060",
"--run-pprof",
# "-m", "validate" uncomment to run in validation mode with default config
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to run in validation mode

]
ports:
- "8080:8080" # main oracle port
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U=
github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down
69 changes: 56 additions & 13 deletions oracle/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"fmt"
"strings"
"sync"

"github.com/DataDog/datadog-go/statsd"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -33,6 +34,7 @@ const (
ProviderCountMetricName = "health_check_market_providers"
SlinkyBuildInfoMetricName = "slinky_build_info"
ConnectBuildInfoMetricName = "connect_build_info"
MissingPricesName = "missing_prices"
)

// Metrics is an interface that defines the API for oracle metrics.
Expand All @@ -45,7 +47,7 @@ type Metrics interface {

// AddTickerTick increments the number of ticks for a given ticker. Specifically, this
// is used to track the number of times a ticker was updated.
AddTickerTick(ticker string)
AddTickerTick(pairID string)

// UpdatePrice price updates the price for the given pairID for the provider.
UpdatePrice(name, pairID string, decimals uint64, price float64)
Expand All @@ -60,24 +62,33 @@ type Metrics interface {

// AddProviderCountForMarket increments the number of providers that were utilized
// to calculate the final price for a given market.
AddProviderCountForMarket(market string, count int)
AddProviderCountForMarket(pairID string, count int)

// SetConnectBuildInfo sets the build information for the Slinky binary.
SetConnectBuildInfo()

// MissingPrices sets a list of missing prices for the given aggregation tick.
MissingPrices(pairIDs []string)

// GetMissingPrices gets the current list of missing prices.
GetMissingPrices() []string
}

// OracleMetricsImpl is a Metrics implementation that does nothing.
type OracleMetricsImpl struct {
promTicks prometheus.Counter
promTickerTicks *prometheus.CounterVec
promPrices *prometheus.GaugeVec
promAggregatePrices *prometheus.GaugeVec
promProviderTick *prometheus.CounterVec
promProviderCount *prometheus.GaugeVec
promSlinkyBuildInfo *prometheus.GaugeVec
promConnectBuildInfo *prometheus.GaugeVec
statsdClient statsd.ClientInterface
nodeIdentifier string
promTicks prometheus.Counter
promTickerTicks *prometheus.CounterVec
promPrices *prometheus.GaugeVec
promAggregatePrices *prometheus.GaugeVec
promProviderTick *prometheus.CounterVec
promProviderCount *prometheus.GaugeVec
promSlinkyBuildInfo *prometheus.GaugeVec
promConnectBuildInfo *prometheus.GaugeVec
promMissingPrices *prometheus.GaugeVec
statsdClient statsd.ClientInterface
nodeIdentifier string
missingPricesInternal []string
missingPricesMtx sync.Mutex
}

// NewMetricsFromConfig returns an oracle Metrics implementation based on the provided
Expand Down Expand Up @@ -110,7 +121,9 @@ func NewMetricsFromConfig(config config.MetricsConfig, nodeClient NodeClient) Me

// NewMetrics returns a Metrics implementation that exposes metrics to Prometheus.
func NewMetrics(statsdClient statsd.ClientInterface, nodeIdentifier string) Metrics {
ret := OracleMetricsImpl{}
ret := OracleMetricsImpl{
missingPricesInternal: make([]string, 0),
}

ret.statsdClient = statsdClient
ret.nodeIdentifier = nodeIdentifier
Expand Down Expand Up @@ -156,6 +169,11 @@ func NewMetrics(statsdClient statsd.ClientInterface, nodeIdentifier string) Metr
Name: ConnectBuildInfoMetricName,
Help: "Information about the connect build",
}, []string{Version})
ret.promMissingPrices = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: OracleSubsystem,
Name: MissingPricesName,
Help: "Missing Prices",
}, []string{Version})

prometheus.MustRegister(ret.promTicks)
prometheus.MustRegister(ret.promTickerTicks)
Expand All @@ -165,6 +183,7 @@ func NewMetrics(statsdClient statsd.ClientInterface, nodeIdentifier string) Metr
prometheus.MustRegister(ret.promProviderCount)
prometheus.MustRegister(ret.promSlinkyBuildInfo)
prometheus.MustRegister(ret.promConnectBuildInfo)
prometheus.MustRegister(ret.promMissingPrices)

return &ret
}
Expand All @@ -176,6 +195,10 @@ func NewNopMetrics() Metrics {
return &noOpOracleMetrics{}
}

func (m *noOpOracleMetrics) MissingPrices(_ []string) {}

func (m *noOpOracleMetrics) GetMissingPrices() []string { return []string{} }

// AddTick increments the total number of ticks that have been processed by the oracle.
func (m *noOpOracleMetrics) AddTick() {}

Expand Down Expand Up @@ -279,6 +302,26 @@ func (m *OracleMetricsImpl) AddProviderCountForMarket(market string, count int)
m.statsdClient.Gauge(metricName, float64(count), []string{}, 1)
}

// MissingPrices updates the list of missing prices for the given tick.
func (m *OracleMetricsImpl) MissingPrices(pairIDs []string) {
if len(pairIDs) == 0 {
return
}

m.missingPricesMtx.Lock()
defer m.missingPricesMtx.Unlock()
m.missingPricesInternal = pairIDs

}

// GetMissingPrices gets the internal missing prices array.
func (m *OracleMetricsImpl) GetMissingPrices() []string {
m.missingPricesMtx.Lock()
defer m.missingPricesMtx.Unlock()

return m.missingPricesInternal
}

// SetConnectBuildInfo sets the build information for the Connect binary. The version exported
// is determined by the build time version in accordance with the build pkg.
func (m *OracleMetricsImpl) SetConnectBuildInfo() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/http/address.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package http

import "net"

func IsValidAddress(address string) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to a common folder so it could be used by other code

host, port, err := net.SplitHostPort(address)
if err != nil {
return false
}

if host == "" || port == "" {
return false
}

return true
}
1 change: 1 addition & 0 deletions pkg/math/oracle/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (m *IndexPriceAggregator) AggregatePrices() {
m.logger.Debug("calculated median prices for price feeds", zap.Int("num_prices", len(indexPrices)))
if len(missingPrices) > 0 {
m.logger.Info("failed to calculate prices for price feeds", zap.Strings("missing_prices", missingPrices))
m.metrics.MissingPrices(missingPrices)
}
m.indexPrices = indexPrices
m.scaledPrices = scaledPrices
Expand Down
2 changes: 1 addition & 1 deletion pkg/math/oracle/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *IndexPriceAggregator) GetIndexPrice(
return price, nil
}

// SetIndexPrice sets the index price for the given currency pair.
// SetIndexPrices sets the index price for the given currency pair.
func (m *IndexPriceAggregator) SetIndexPrices(
prices types.Prices,
) {
Expand Down
50 changes: 50 additions & 0 deletions service/clients/prometheus/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package prometheus

import (
"fmt"
"strings"

"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"go.uber.org/zap"

libhttp "github.com/skip-mev/connect/v2/pkg/http"

Check warning on line 11 in service/clients/prometheus/client.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)

Check warning on line 11 in service/clients/prometheus/client.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)
)

type Client struct {
v1.API
logger *zap.Logger
}

func NewClient(address string, logger *zap.Logger) (Client, error) {
if logger == nil {
logger = zap.NewNop()
}

logger = logger.With(zap.String("service", "prometheus_client"))
logger.Info("creating prometheus client", zap.String("address", address))

// get the prometheus server address
if address == "" || !libhttp.IsValidAddress(address) {

Check warning on line 28 in service/clients/prometheus/client.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)

Check warning on line 28 in service/clients/prometheus/client.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)
return Client{}, fmt.Errorf("invalid prometheus server address: %s", address)
}

const httpPrefix = "http://"
if !strings.HasPrefix(address, httpPrefix) {
address = httpPrefix + address
}

// Create a Prometheus API client
client, err := api.NewClient(api.Config{
Address: address, // Address of your Prometheus server
})
if err != nil {
return Client{}, fmt.Errorf("failed to create prometheus client: %w", err)
}

// Create a new Prometheus API v1 client
return Client{
API: v1.NewAPI(client),
logger: logger,
}, nil
}
17 changes: 2 additions & 15 deletions service/servers/prometheus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import (
"errors"
"fmt"
"net"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"

libhttp "github.com/skip-mev/connect/v2/pkg/http"

Check warning on line 13 in service/servers/prometheus/server.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)

Check warning on line 13 in service/servers/prometheus/server.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)
"github.com/skip-mev/connect/v2/pkg/sync"
)

Expand All @@ -34,7 +34,7 @@
// address is set, and valid. Notice, this method does not start the server.
func NewPrometheusServer(prometheusAddress string, logger *zap.Logger) (*PrometheusServer, error) {
// get the prometheus server address
if prometheusAddress == "" || !isValidAddress(prometheusAddress) {
if prometheusAddress == "" || !libhttp.IsValidAddress(prometheusAddress) {

Check warning on line 37 in service/servers/prometheus/server.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)

Check warning on line 37 in service/servers/prometheus/server.go

View workflow job for this annotation

GitHub Actions / Spell checking

`libhttp` is not a recognized word. (unrecognized-spelling)
return nil, fmt.Errorf("invalid prometheus server address: %s", prometheusAddress)
}
srv := &http.Server{
Expand Down Expand Up @@ -79,16 +79,3 @@
// close the done channel
close(ps.done)
}

func isValidAddress(address string) bool {
host, port, err := net.SplitHostPort(address)
if err != nil {
return false
}

if host == "" || port == "" {
return false
}

return true
}
Loading
Loading