From d122d03e5c70e0d778af7f9a5c36a6a7b2ce223d Mon Sep 17 00:00:00 2001 From: zarazan Date: Mon, 6 Feb 2023 14:42:46 -0700 Subject: [PATCH 1/8] use custom endpoints in integration test --- price-feeder/cmd/price-feeder.go | 8 +------- price-feeder/config/config.go | 8 ++++++++ price-feeder/price-feeder.example.toml | 5 +++++ price-feeder/tests/integration/provider_test.go | 5 ++++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/price-feeder/cmd/price-feeder.go b/price-feeder/cmd/price-feeder.go index 1af049b7d6..af060ecdd3 100644 --- a/price-feeder/cmd/price-feeder.go +++ b/price-feeder/cmd/price-feeder.go @@ -24,7 +24,6 @@ import ( "github.com/umee-network/umee/price-feeder/v2/config" "github.com/umee-network/umee/price-feeder/v2/oracle" "github.com/umee-network/umee/price-feeder/v2/oracle/client" - "github.com/umee-network/umee/price-feeder/v2/oracle/provider" v1 "github.com/umee-network/umee/price-feeder/v2/router/v1" ) @@ -156,18 +155,13 @@ func priceFeederCmdHandler(cmd *cobra.Command, args []string) error { deviations[deviation.Base] = threshold } - endpoints := make(map[provider.Name]provider.Endpoint, len(cfg.ProviderEndpoints)) - for _, endpoint := range cfg.ProviderEndpoints { - endpoints[endpoint.Name] = endpoint - } - oracle := oracle.New( logger, oracleClient, cfg.ProviderPairs(), providerTimeout, deviations, - endpoints, + cfg.ProviderEndpointsMap(), ) telemetryCfg := telemetry.Config{} diff --git a/price-feeder/config/config.go b/price-feeder/config/config.go index 5863221c16..3dc4d66d94 100644 --- a/price-feeder/config/config.go +++ b/price-feeder/config/config.go @@ -171,6 +171,14 @@ func (c Config) ProviderPairs() map[provider.Name][]types.CurrencyPair { return providerPairs } +func (c Config) ProviderEndpointsMap() map[provider.Name]provider.Endpoint { + endpoints := make(map[provider.Name]provider.Endpoint, len(c.ProviderEndpoints)) + for _, endpoint := range c.ProviderEndpoints { + endpoints[endpoint.Name] = endpoint + } + return endpoints +} + // ParseConfig attempts to read and parse configuration from the given file path. // An error is returned if reading or parsing the config fails. func ParseConfig(configPath string) (Config, error) { diff --git a/price-feeder/price-feeder.example.toml b/price-feeder/price-feeder.example.toml index 80a0531e73..af070fa32a 100644 --- a/price-feeder/price-feeder.example.toml +++ b/price-feeder/price-feeder.example.toml @@ -274,3 +274,8 @@ prometheus-retention-time = 100 name = "binance" rest = "https://api1.binance.com" websocket = "stream.binance.com:9443" + +[[provider_endpoints]] +name = "osmosisv2" +rest = "https://api.osmo-api.prod.network.umee.cc" +websocket = "api.osmo-api.prod.network.umee.cc" diff --git a/price-feeder/tests/integration/provider_test.go b/price-feeder/tests/integration/provider_test.go index 8eb9dd8ff9..2a8712ed77 100644 --- a/price-feeder/tests/integration/provider_test.go +++ b/price-feeder/tests/integration/provider_test.go @@ -38,13 +38,16 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() { cfg, err := config.ParseConfig("../../price-feeder.example.toml") require.NoError(s.T(), err) + endpoints := cfg.ProviderEndpointsMap() + for key, pairs := range cfg.ProviderPairs() { providerName := key currencyPairs := pairs + endpoint := endpoints[providerName] s.T().Run(string(providerName), func(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), provider.Endpoint{}, currencyPairs...) + pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) time.Sleep(30 * time.Second) // wait for provider to connect and receive some prices checkForPrices(t, pvd, currencyPairs) cancel() From 431b6ae31e804acbd7fa4156596f56bd977604c0 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 08:31:28 -0700 Subject: [PATCH 2/8] remove send message logger --- price-feeder/oracle/provider/websocket_controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/price-feeder/oracle/provider/websocket_controller.go b/price-feeder/oracle/provider/websocket_controller.go index 17db2d7b61..7ba60ef830 100644 --- a/price-feeder/oracle/provider/websocket_controller.go +++ b/price-feeder/oracle/provider/websocket_controller.go @@ -194,7 +194,6 @@ func (conn *WebsocketConnection) SendJSON(msg interface{}) error { if conn.client == nil { return fmt.Errorf("unable to send JSON on a closed connection") } - conn.logger.Debug().Interface("msg", msg).Msg("sending websocket message") if err := conn.client.WriteJSON(msg); err != nil { return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err) } From 690363af7a0e02ae027ee742c0d58720206363a1 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 10:29:10 -0700 Subject: [PATCH 3/8] use gate currency pair when fetching candles --- price-feeder/oracle/provider/gate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/price-feeder/oracle/provider/gate.go b/price-feeder/oracle/provider/gate.go index 11ea91533d..f8827b469e 100644 --- a/price-feeder/oracle/provider/gate.go +++ b/price-feeder/oracle/provider/gate.go @@ -237,7 +237,7 @@ func (p *GateProvider) getCandlePrices(cp types.CurrencyPair) ([]types.CandlePri p.mtx.RLock() defer p.mtx.RUnlock() - candles, ok := p.candles[cp.String()] + candles, ok := p.candles[currencyPairToGatePair(cp)] if !ok { return []types.CandlePrice{}, fmt.Errorf( types.ErrCandleNotFound.Error(), From d7e4e0f88b32765b4a1c0fe451c4c83cc9118ea2 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 10:31:34 -0700 Subject: [PATCH 4/8] don't close a closed websocket connection --- price-feeder/oracle/provider/websocket_controller.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/price-feeder/oracle/provider/websocket_controller.go b/price-feeder/oracle/provider/websocket_controller.go index 7ba60ef830..1b2f801bbe 100644 --- a/price-feeder/oracle/provider/websocket_controller.go +++ b/price-feeder/oracle/provider/websocket_controller.go @@ -179,6 +179,7 @@ func (conn *WebsocketConnection) iterateRetryCounter() time.Duration { // subscribe sends the WebsocketControllers subscription messages to the websocket func (conn *WebsocketConnection) subscribe(msg interface{}) error { telemetryWebsocketSubscribeCurrencyPairs(conn.providerName, 1) + conn.logger.Debug().Interface("msg", msg).Msg("sending subscription message") if err := conn.SendJSON(msg); err != nil { return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err) } @@ -284,6 +285,9 @@ func (conn *WebsocketConnection) close() { conn.logger.Debug().Msg("closing websocket") conn.websocketCancelFunc() + if conn.client == nil { + return + } if err := conn.client.Close(); err != nil { conn.logger.Err(fmt.Errorf(types.ErrWebsocketClose.Error(), conn.providerName, err)).Send() } From 168edb59fb3f6f5c33440d07818f39708a8fd070 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 11:17:20 -0700 Subject: [PATCH 5/8] add two more price checks to avoid seg faults --- price-feeder/tests/integration/provider_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/price-feeder/tests/integration/provider_test.go b/price-feeder/tests/integration/provider_test.go index 2a8712ed77..40c46e60a2 100644 --- a/price-feeder/tests/integration/provider_test.go +++ b/price-feeder/tests/integration/provider_test.go @@ -48,7 +48,7 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) - time.Sleep(30 * time.Second) // wait for provider to connect and receive some prices + time.Sleep(45 * time.Second) // wait for provider to connect and receive some prices checkForPrices(t, pvd, currencyPairs) cancel() }) @@ -87,11 +87,13 @@ func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.C for _, cp := range currencyPairs { currencyPairKey := cp.String() - // verify ticker price for currency pair is above zero - require.True(t, tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0))) + require.False(t, tickerPrices[currencyPairKey].Price.IsNil(), "no ticker price for pair %s", currencyPairKey) - // verify candle price for currency pair is above zero - require.True(t, candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0))) + require.True(t, tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)), "ticker price is zero for pair %s", currencyPairKey) + + require.NotEmpty(t, candlePrices[currencyPairKey], "no candle prices for pair %s", currencyPairKey) + + require.True(t, candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)), "candle price iss zero for pair %s", currencyPairKey) } } From 85cf387fcedc6bec9ecc512813aac3737c83c889 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 11:29:15 -0700 Subject: [PATCH 6/8] cleanup error messages --- .../tests/integration/provider_test.go | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/price-feeder/tests/integration/provider_test.go b/price-feeder/tests/integration/provider_test.go index 40c46e60a2..41503ba2c8 100644 --- a/price-feeder/tests/integration/provider_test.go +++ b/price-feeder/tests/integration/provider_test.go @@ -48,8 +48,8 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) - time.Sleep(45 * time.Second) // wait for provider to connect and receive some prices - checkForPrices(t, pvd, currencyPairs) + time.Sleep(60 * time.Second) // wait for provider to connect and receive some prices + checkForPrices(t, pvd, currencyPairs, providerName.String()) cancel() }) } @@ -72,12 +72,12 @@ func (s *IntegrationTestSuite) TestSubscribeCurrencyPairs() { time.Sleep(25 * time.Second) // wait for provider to connect and receive some prices - checkForPrices(s.T(), pvd, currencyPairs) + checkForPrices(s.T(), pvd, currencyPairs, "Okx") cancel() } -func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair) { +func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair, providerName string) { tickerPrices, err := pvd.GetTickerPrices(currencyPairs...) require.NoError(t, err) @@ -87,13 +87,33 @@ func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.C for _, cp := range currencyPairs { currencyPairKey := cp.String() - require.False(t, tickerPrices[currencyPairKey].Price.IsNil(), "no ticker price for pair %s", currencyPairKey) - - require.True(t, tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)), "ticker price is zero for pair %s", currencyPairKey) - - require.NotEmpty(t, candlePrices[currencyPairKey], "no candle prices for pair %s", currencyPairKey) - - require.True(t, candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)), "candle price iss zero for pair %s", currencyPairKey) + require.False(t, + tickerPrices[currencyPairKey].Price.IsNil(), + "no ticker price for %s pair %s", + providerName, + currencyPairKey, + ) + + require.True(t, + tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)), + "ticker price is zero for %s pair %s", + providerName, + currencyPairKey, + ) + + require.NotEmpty(t, + candlePrices[currencyPairKey], + "no candle prices for %s pair %s", + providerName, + currencyPairKey, + ) + + require.True(t, + candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)), + "candle price iss zero for %s pair %s", + providerName, + currencyPairKey, + ) } } From 923039dcdc0e6803d2ddb7bd5b8097df8f541ce9 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 12:54:10 -0700 Subject: [PATCH 7/8] add gdoc --- price-feeder/config/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/price-feeder/config/config.go b/price-feeder/config/config.go index 3dc4d66d94..6b07d21820 100644 --- a/price-feeder/config/config.go +++ b/price-feeder/config/config.go @@ -171,6 +171,8 @@ func (c Config) ProviderPairs() map[provider.Name][]types.CurrencyPair { return providerPairs } +// ProviderEndpointsMap converts the provider_endpoints from the config +// file into a map of provider.Endpoint where the key is the provider name func (c Config) ProviderEndpointsMap() map[provider.Name]provider.Endpoint { endpoints := make(map[provider.Name]provider.Endpoint, len(c.ProviderEndpoints)) for _, endpoint := range c.ProviderEndpoints { From ceeb48c9decee07a966821efeb5285fc8f15afb4 Mon Sep 17 00:00:00 2001 From: zarazan Date: Tue, 7 Feb 2023 12:54:36 -0700 Subject: [PATCH 8/8] capitalize OKX --- price-feeder/tests/integration/provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/price-feeder/tests/integration/provider_test.go b/price-feeder/tests/integration/provider_test.go index 41503ba2c8..b28bce3877 100644 --- a/price-feeder/tests/integration/provider_test.go +++ b/price-feeder/tests/integration/provider_test.go @@ -72,7 +72,7 @@ func (s *IntegrationTestSuite) TestSubscribeCurrencyPairs() { time.Sleep(25 * time.Second) // wait for provider to connect and receive some prices - checkForPrices(s.T(), pvd, currencyPairs, "Okx") + checkForPrices(s.T(), pvd, currencyPairs, "OKX") cancel() }