diff --git a/price-feeder/cmd/price-feeder.go b/price-feeder/cmd/price-feeder.go index 1af049b7d6..af060ecdd3 100644 --- a/price-feeder/cmd/price-feeder.go +++ b/price-feeder/cmd/price-feeder.go @@ -24,7 +24,6 @@ import ( "github.com/umee-network/umee/price-feeder/v2/config" "github.com/umee-network/umee/price-feeder/v2/oracle" "github.com/umee-network/umee/price-feeder/v2/oracle/client" - "github.com/umee-network/umee/price-feeder/v2/oracle/provider" v1 "github.com/umee-network/umee/price-feeder/v2/router/v1" ) @@ -156,18 +155,13 @@ func priceFeederCmdHandler(cmd *cobra.Command, args []string) error { deviations[deviation.Base] = threshold } - endpoints := make(map[provider.Name]provider.Endpoint, len(cfg.ProviderEndpoints)) - for _, endpoint := range cfg.ProviderEndpoints { - endpoints[endpoint.Name] = endpoint - } - oracle := oracle.New( logger, oracleClient, cfg.ProviderPairs(), providerTimeout, deviations, - endpoints, + cfg.ProviderEndpointsMap(), ) telemetryCfg := telemetry.Config{} diff --git a/price-feeder/config/config.go b/price-feeder/config/config.go index 5863221c16..6b07d21820 100644 --- a/price-feeder/config/config.go +++ b/price-feeder/config/config.go @@ -171,6 +171,16 @@ func (c Config) ProviderPairs() map[provider.Name][]types.CurrencyPair { return providerPairs } +// ProviderEndpointsMap converts the provider_endpoints from the config +// file into a map of provider.Endpoint where the key is the provider name +func (c Config) ProviderEndpointsMap() map[provider.Name]provider.Endpoint { + endpoints := make(map[provider.Name]provider.Endpoint, len(c.ProviderEndpoints)) + for _, endpoint := range c.ProviderEndpoints { + endpoints[endpoint.Name] = endpoint + } + return endpoints +} + // ParseConfig attempts to read and parse configuration from the given file path. // An error is returned if reading or parsing the config fails. func ParseConfig(configPath string) (Config, error) { diff --git a/price-feeder/oracle/provider/gate.go b/price-feeder/oracle/provider/gate.go index 11ea91533d..f8827b469e 100644 --- a/price-feeder/oracle/provider/gate.go +++ b/price-feeder/oracle/provider/gate.go @@ -237,7 +237,7 @@ func (p *GateProvider) getCandlePrices(cp types.CurrencyPair) ([]types.CandlePri p.mtx.RLock() defer p.mtx.RUnlock() - candles, ok := p.candles[cp.String()] + candles, ok := p.candles[currencyPairToGatePair(cp)] if !ok { return []types.CandlePrice{}, fmt.Errorf( types.ErrCandleNotFound.Error(), diff --git a/price-feeder/oracle/provider/websocket_controller.go b/price-feeder/oracle/provider/websocket_controller.go index 17db2d7b61..1b2f801bbe 100644 --- a/price-feeder/oracle/provider/websocket_controller.go +++ b/price-feeder/oracle/provider/websocket_controller.go @@ -179,6 +179,7 @@ func (conn *WebsocketConnection) iterateRetryCounter() time.Duration { // subscribe sends the WebsocketControllers subscription messages to the websocket func (conn *WebsocketConnection) subscribe(msg interface{}) error { telemetryWebsocketSubscribeCurrencyPairs(conn.providerName, 1) + conn.logger.Debug().Interface("msg", msg).Msg("sending subscription message") if err := conn.SendJSON(msg); err != nil { return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err) } @@ -194,7 +195,6 @@ func (conn *WebsocketConnection) SendJSON(msg interface{}) error { if conn.client == nil { return fmt.Errorf("unable to send JSON on a closed connection") } - conn.logger.Debug().Interface("msg", msg).Msg("sending websocket message") if err := conn.client.WriteJSON(msg); err != nil { return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err) } @@ -285,6 +285,9 @@ func (conn *WebsocketConnection) close() { conn.logger.Debug().Msg("closing websocket") conn.websocketCancelFunc() + if conn.client == nil { + return + } if err := conn.client.Close(); err != nil { conn.logger.Err(fmt.Errorf(types.ErrWebsocketClose.Error(), conn.providerName, err)).Send() } diff --git a/price-feeder/price-feeder.example.toml b/price-feeder/price-feeder.example.toml index 68bb401959..49a5b0328c 100644 --- a/price-feeder/price-feeder.example.toml +++ b/price-feeder/price-feeder.example.toml @@ -255,3 +255,8 @@ prometheus-retention-time = 100 name = "binance" rest = "https://api1.binance.com" websocket = "stream.binance.com:9443" + +[[provider_endpoints]] +name = "osmosisv2" +rest = "https://api.osmo-api.prod.network.umee.cc" +websocket = "api.osmo-api.prod.network.umee.cc" diff --git a/price-feeder/tests/integration/provider_test.go b/price-feeder/tests/integration/provider_test.go index 8eb9dd8ff9..b28bce3877 100644 --- a/price-feeder/tests/integration/provider_test.go +++ b/price-feeder/tests/integration/provider_test.go @@ -38,15 +38,18 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() { cfg, err := config.ParseConfig("../../price-feeder.example.toml") require.NoError(s.T(), err) + endpoints := cfg.ProviderEndpointsMap() + for key, pairs := range cfg.ProviderPairs() { providerName := key currencyPairs := pairs + endpoint := endpoints[providerName] s.T().Run(string(providerName), func(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), provider.Endpoint{}, currencyPairs...) - time.Sleep(30 * time.Second) // wait for provider to connect and receive some prices - checkForPrices(t, pvd, currencyPairs) + pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...) + time.Sleep(60 * time.Second) // wait for provider to connect and receive some prices + checkForPrices(t, pvd, currencyPairs, providerName.String()) cancel() }) } @@ -69,12 +72,12 @@ func (s *IntegrationTestSuite) TestSubscribeCurrencyPairs() { time.Sleep(25 * time.Second) // wait for provider to connect and receive some prices - checkForPrices(s.T(), pvd, currencyPairs) + checkForPrices(s.T(), pvd, currencyPairs, "OKX") cancel() } -func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair) { +func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair, providerName string) { tickerPrices, err := pvd.GetTickerPrices(currencyPairs...) require.NoError(t, err) @@ -84,11 +87,33 @@ func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.C for _, cp := range currencyPairs { currencyPairKey := cp.String() - // verify ticker price for currency pair is above zero - require.True(t, tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0))) - - // verify candle price for currency pair is above zero - require.True(t, candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0))) + require.False(t, + tickerPrices[currencyPairKey].Price.IsNil(), + "no ticker price for %s pair %s", + providerName, + currencyPairKey, + ) + + require.True(t, + tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)), + "ticker price is zero for %s pair %s", + providerName, + currencyPairKey, + ) + + require.NotEmpty(t, + candlePrices[currencyPairKey], + "no candle prices for %s pair %s", + providerName, + currencyPairKey, + ) + + require.True(t, + candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)), + "candle price iss zero for %s pair %s", + providerName, + currencyPairKey, + ) } }