Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(price-feeder): Fix gate provider getCandlePrices #1791

Merged
merged 11 commits into from
Feb 8, 2023
8 changes: 1 addition & 7 deletions price-feeder/cmd/price-feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/umee-network/umee/price-feeder/v2/config"
"github.com/umee-network/umee/price-feeder/v2/oracle"
"github.com/umee-network/umee/price-feeder/v2/oracle/client"
"github.com/umee-network/umee/price-feeder/v2/oracle/provider"
v1 "github.com/umee-network/umee/price-feeder/v2/router/v1"
)

Expand Down Expand Up @@ -156,18 +155,13 @@ func priceFeederCmdHandler(cmd *cobra.Command, args []string) error {
deviations[deviation.Base] = threshold
}

endpoints := make(map[provider.Name]provider.Endpoint, len(cfg.ProviderEndpoints))
for _, endpoint := range cfg.ProviderEndpoints {
endpoints[endpoint.Name] = endpoint
}

oracle := oracle.New(
logger,
oracleClient,
cfg.ProviderPairs(),
providerTimeout,
deviations,
endpoints,
cfg.ProviderEndpointsMap(),
)

telemetryCfg := telemetry.Config{}
Expand Down
10 changes: 10 additions & 0 deletions price-feeder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ func (c Config) ProviderPairs() map[provider.Name][]types.CurrencyPair {
return providerPairs
}

// ProviderEndpointsMap converts the provider_endpoints from the config
// file into a map of provider.Endpoint where the key is the provider name
func (c Config) ProviderEndpointsMap() map[provider.Name]provider.Endpoint {
zarazan marked this conversation as resolved.
Show resolved Hide resolved
endpoints := make(map[provider.Name]provider.Endpoint, len(c.ProviderEndpoints))
for _, endpoint := range c.ProviderEndpoints {
endpoints[endpoint.Name] = endpoint
}
return endpoints
}

// ParseConfig attempts to read and parse configuration from the given file path.
// An error is returned if reading or parsing the config fails.
func ParseConfig(configPath string) (Config, error) {
Expand Down
2 changes: 1 addition & 1 deletion price-feeder/oracle/provider/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (p *GateProvider) getCandlePrices(cp types.CurrencyPair) ([]types.CandlePri
p.mtx.RLock()
defer p.mtx.RUnlock()

candles, ok := p.candles[cp.String()]
candles, ok := p.candles[currencyPairToGatePair(cp)]
if !ok {
return []types.CandlePrice{}, fmt.Errorf(
types.ErrCandleNotFound.Error(),
Expand Down
5 changes: 4 additions & 1 deletion price-feeder/oracle/provider/websocket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (conn *WebsocketConnection) iterateRetryCounter() time.Duration {
// subscribe sends the WebsocketControllers subscription messages to the websocket
func (conn *WebsocketConnection) subscribe(msg interface{}) error {
telemetryWebsocketSubscribeCurrencyPairs(conn.providerName, 1)
conn.logger.Debug().Interface("msg", msg).Msg("sending subscription message")
if err := conn.SendJSON(msg); err != nil {
return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err)
}
Expand All @@ -194,7 +195,6 @@ func (conn *WebsocketConnection) SendJSON(msg interface{}) error {
if conn.client == nil {
return fmt.Errorf("unable to send JSON on a closed connection")
}
conn.logger.Debug().Interface("msg", msg).Msg("sending websocket message")
if err := conn.client.WriteJSON(msg); err != nil {
return fmt.Errorf(types.ErrWebsocketSend.Error(), conn.providerName, err)
}
Expand Down Expand Up @@ -285,6 +285,9 @@ func (conn *WebsocketConnection) close() {

conn.logger.Debug().Msg("closing websocket")
zarazan marked this conversation as resolved.
Show resolved Hide resolved
conn.websocketCancelFunc()
if conn.client == nil {
return
}
if err := conn.client.Close(); err != nil {
conn.logger.Err(fmt.Errorf(types.ErrWebsocketClose.Error(), conn.providerName, err)).Send()
}
Expand Down
5 changes: 5 additions & 0 deletions price-feeder/price-feeder.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,8 @@ prometheus-retention-time = 100
name = "binance"
rest = "https://api1.binance.com"
websocket = "stream.binance.com:9443"

[[provider_endpoints]]
name = "osmosisv2"
rest = "https://api.osmo-api.prod.network.umee.cc"
websocket = "api.osmo-api.prod.network.umee.cc"
45 changes: 35 additions & 10 deletions price-feeder/tests/integration/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ func (s *IntegrationTestSuite) TestWebsocketProviders() {
cfg, err := config.ParseConfig("../../price-feeder.example.toml")
require.NoError(s.T(), err)

endpoints := cfg.ProviderEndpointsMap()

for key, pairs := range cfg.ProviderPairs() {
providerName := key
currencyPairs := pairs
endpoint := endpoints[providerName]
s.T().Run(string(providerName), func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), provider.Endpoint{}, currencyPairs...)
time.Sleep(30 * time.Second) // wait for provider to connect and receive some prices
checkForPrices(t, pvd, currencyPairs)
pvd, _ := oracle.NewProvider(ctx, providerName, getLogger(), endpoint, currencyPairs...)
time.Sleep(60 * time.Second) // wait for provider to connect and receive some prices
zarazan marked this conversation as resolved.
Show resolved Hide resolved
checkForPrices(t, pvd, currencyPairs, providerName.String())
cancel()
})
}
Expand All @@ -69,12 +72,12 @@ func (s *IntegrationTestSuite) TestSubscribeCurrencyPairs() {

time.Sleep(25 * time.Second) // wait for provider to connect and receive some prices

checkForPrices(s.T(), pvd, currencyPairs)
checkForPrices(s.T(), pvd, currencyPairs, "OKX")

cancel()
}

func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair) {
func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.CurrencyPair, providerName string) {
tickerPrices, err := pvd.GetTickerPrices(currencyPairs...)
require.NoError(t, err)

Expand All @@ -84,11 +87,33 @@ func checkForPrices(t *testing.T, pvd provider.Provider, currencyPairs []types.C
for _, cp := range currencyPairs {
currencyPairKey := cp.String()

// verify ticker price for currency pair is above zero
require.True(t, tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)))

// verify candle price for currency pair is above zero
require.True(t, candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)))
require.False(t,
tickerPrices[currencyPairKey].Price.IsNil(),
"no ticker price for %s pair %s",
providerName,
currencyPairKey,
)

require.True(t,
tickerPrices[currencyPairKey].Price.GT(sdk.NewDec(0)),
"ticker price is zero for %s pair %s",
providerName,
currencyPairKey,
)

require.NotEmpty(t,
candlePrices[currencyPairKey],
"no candle prices for %s pair %s",
providerName,
currencyPairKey,
)

require.True(t,
candlePrices[currencyPairKey][0].Price.GT(sdk.NewDec(0)),
"candle price iss zero for %s pair %s",
providerName,
currencyPairKey,
)
}
}

Expand Down