diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index d0b7e35f6b..e46ef1ca51 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -55,6 +55,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket. - [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket. - [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx. +- [#601](https://github.com/umee-network/umee/pull/601) Use TVWAP formula for determining prices when available. ### Bug Fixes diff --git a/price-feeder/oracle/oracle.go b/price-feeder/oracle/oracle.go index 620c9100d9..bcb9560479 100644 --- a/price-feeder/oracle/oracle.go +++ b/price-feeder/oracle/oracle.go @@ -148,13 +148,16 @@ func (o *Oracle) GetPrices() map[string]sdk.Dec { return prices } -// SetPrices retrieve all the prices from our set of providers as determined -// in the config, average them out, and update the oracle's current exchange -// rates. +// SetPrices retrieves all the prices and candles from our set of providers as +// determined in the config. If candles are available, uses TVWAP in order +// to determine prices. If candles are not available, uses the most recent prices +// with VWAP. Warns the the user of any missing prices, and filters out any faulty +// providers which do not report prices within 2𝜎 of the others. func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList) error { g := new(errgroup.Group) mtx := new(sync.Mutex) providerPrices := make(map[string]map[string]provider.TickerPrice) + providerCandles := make(map[string]map[string][]provider.CandlePrice) requiredRates := make(map[string]struct{}) for providerName, currencyPairs := range o.providerPairs { @@ -185,19 +188,36 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList return err } + candles, err := priceProvider.GetCandlePrices(acceptedPairs...) + if err != nil { + telemetry.IncrCounter(1, "failure", "provider") + return err + } + // flatten and collect prices based on the base currency per provider // // e.g.: {ProviderKraken: {"ATOM": , ...}} mtx.Lock() - for _, cp := range acceptedPairs { + for _, pair := range acceptedPairs { if _, ok := providerPrices[providerName]; !ok { providerPrices[providerName] = make(map[string]provider.TickerPrice) } - if tp, ok := prices[cp.String()]; ok { - providerPrices[providerName][cp.Base] = tp - } else { + if _, ok := providerCandles[providerName]; !ok { + providerCandles[providerName] = make(map[string][]provider.CandlePrice) + } + + tp, pricesOk := prices[pair.String()] + cp, candlesOk := candles[pair.String()] + if pricesOk { + providerPrices[providerName][pair.Base] = tp + } + if candlesOk { + providerCandles[providerName][pair.Base] = cp + } + + if !pricesOk && !candlesOk { mtx.Unlock() - return fmt.Errorf("failed to find exchange rate in provider response") + return fmt.Errorf("failed to find any exchange rates in provider response") } } mtx.Unlock() @@ -219,6 +239,7 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList } } + // warn the user of any missing prices if len(reportedRates) != len(requiredRates) { return fmt.Errorf("unable to get prices for all exchange rates") } @@ -228,17 +249,29 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList } } - filteredProviderPrices, err := o.filterDeviations(providerPrices) + // attempt to use candles for tvwap calculations + tvwapPrices, err := ComputeTVWAP(providerCandles) if err != nil { return err } - vwapPrices, err := ComputeVWAP(filteredProviderPrices) - if err != nil { - return err - } + // If TVWAP candles are not available or were filtered out due to staleness, + // use most recent prices & VWAP instead. + if len(tvwapPrices) == 0 { + filteredProviderPrices, err := o.filterDeviations(providerPrices) + if err != nil { + return err + } - o.prices = vwapPrices + vwapPrices, err := ComputeVWAP(filteredProviderPrices) + if err != nil { + return err + } + + o.prices = vwapPrices + } else { + o.prices = tvwapPrices + } return nil } @@ -323,7 +356,8 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro // all assets, and filter out any providers that are not within 2𝜎 of the mean. func (o *Oracle) filterDeviations( prices map[string]map[string]provider.TickerPrice) ( - map[string]map[string]provider.TickerPrice, error) { + map[string]map[string]provider.TickerPrice, error, +) { var ( filteredPrices = make(map[string]map[string]provider.TickerPrice) threshold = sdk.MustNewDecFromStr("2") diff --git a/price-feeder/oracle/oracle_test.go b/price-feeder/oracle/oracle_test.go index 82d964c2ab..99c039350d 100644 --- a/price-feeder/oracle/oracle_test.go +++ b/price-feeder/oracle/oracle_test.go @@ -27,6 +27,20 @@ func (m mockProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]provi return m.prices, nil } +func (m mockProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) { + candles := make(map[string][]provider.CandlePrice) + for pair, price := range m.prices { + candles[pair] = []provider.CandlePrice{ + { + Price: price.Price, + TimeStamp: provider.PastUnixTime(1 * time.Minute), + Volume: price.Volume, + }, + } + } + return candles, nil +} + type failingProvider struct { prices map[string]provider.TickerPrice } @@ -35,6 +49,10 @@ func (m failingProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]pr return nil, fmt.Errorf("unable to get ticker prices") } +func (m failingProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) { + return nil, fmt.Errorf("unable to get candle prices") +} + type OracleTestSuite struct { suite.Suite diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index ba2fdd5a1f..7055784e9a 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -31,9 +31,9 @@ type ( wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mtx sync.Mutex + mtx sync.RWMutex tickers map[string]BinanceTicker // Symbol => BinanceTicker - candles map[string]BinanceCandle // Symbol => BinanceCandle + candles map[string][]BinanceCandle // Symbol => BinanceCandle subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } @@ -87,7 +87,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ wsClient: wsConn, logger: logger.With().Str("provider", "binance").Logger(), tickers: map[string]BinanceTicker{}, - candles: map[string]BinanceCandle{}, + candles: map[string][]BinanceCandle{}, subscribedPairs: map[string]types.CurrencyPair{}, } @@ -136,7 +136,26 @@ func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[stri return tickerPrices, nil } +// GetCandlePrices returns the candlePrices based on the provided pairs. +func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + candlePrices := make(map[string][]CandlePrice, len(pairs)) + + for _, cp := range pairs { + key := cp.String() + prices, err := p.getCandlePrices(key) + if err != nil { + return nil, err + } + candlePrices[key] = prices + } + + return candlePrices, nil +} + func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + ticker, ok := p.tickers[key] if !ok { return TickerPrice{}, fmt.Errorf("binance provider failed to get ticker price for %s", key) @@ -145,13 +164,24 @@ func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { return ticker.toTickerPrice() } -func (p *BinanceProvider) getTickerTrades(key string) (BinanceCandle, error) { - candle, ok := p.candles[key] +func (p *BinanceProvider) getCandlePrices(key string) ([]CandlePrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + candles, ok := p.candles[key] if !ok { - return BinanceCandle{}, fmt.Errorf("failed to get ticker trades for %s", key) + return []CandlePrice{}, fmt.Errorf("failed to get candle prices for %s", key) } - return candle, nil + candleList := []CandlePrice{} + for _, candle := range candles { + cp, err := candle.toCandlePrice() + if err != nil { + return []CandlePrice{}, err + } + candleList = append(candleList, cp) + } + return candleList, nil } func (p *BinanceProvider) messageReceived(messageType int, bz []byte) { @@ -191,13 +221,27 @@ func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) { func (p *BinanceProvider) setCandlePair(candle BinanceCandle) { p.mtx.Lock() defer p.mtx.Unlock() - p.candles[candle.Symbol] = candle + staleTime := PastUnixTime(providerCandlePeriod) + candleList := []BinanceCandle{} + candleList = append(candleList, candle) + + for _, c := range p.candles[candle.Symbol] { + if staleTime < c.Metadata.TimeStamp { + candleList = append(candleList, c) + } + } + p.candles[candle.Symbol] = candleList } func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) { return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume) } +func (candle BinanceCandle) toCandlePrice() (CandlePrice, error) { + return newCandlePrice("Binance", candle.Symbol, candle.Metadata.Close, candle.Metadata.Volume, + candle.Metadata.TimeStamp) +} + func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { reconnectTicker := time.NewTicker(defaultMaxConnectionTime) defer reconnectTicker.Stop() diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index f2528a59f8..8d7c88c72b 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -36,9 +36,9 @@ type ( wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mtx sync.Mutex + mtx sync.RWMutex tickers map[string]HuobiTicker // market.$symbol.ticker => HuobiTicker - candles map[string]HuobiCandle // market.$symbol.kline.$period => HuobiCandle + candles map[string][]HuobiCandle // market.$symbol.kline.$period => HuobiCandle subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } @@ -65,9 +65,9 @@ type ( // HuobiCandleTick defines the response type for the candle. HuobiCandleTick struct { - Close float64 `json:"close"` // Closing price during this period - TimeStamp int64 `json:"id"` // TimeStamp for this as an ID - Volume float64 `json:"volume"` // Volume during this period + Close float64 `json:"close"` // Closing price during this period + TimeStamp int64 `json:"id"` // TimeStamp for this as an ID + Volume float64 `json:"vol"` // Volume during this period } // HuobiSubscriptionMsg Msg to subscribe to one ticker channel at time. @@ -94,7 +94,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types wsClient: wsConn, logger: logger.With().Str("provider", "huobi").Logger(), tickers: map[string]HuobiTicker{}, - candles: map[string]HuobiCandle{}, + candles: map[string][]HuobiCandle{}, subscribedPairs: map[string]types.CurrencyPair{}, } @@ -122,6 +122,21 @@ func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string return tickerPrices, nil } +// GetTickerPrices returns the tickerPrices based on the saved map. +func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + candlePrices := make(map[string][]CandlePrice, len(pairs)) + + for _, cp := range pairs { + price, err := p.getCandlePrices(cp) + if err != nil { + return nil, err + } + candlePrices[cp.String()] = price + } + + return candlePrices, nil +} + // SubscribeTickers subscribe all currency pairs into ticker and candle channels. func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error { for _, cp := range cps { @@ -252,7 +267,18 @@ func (p *HuobiProvider) setTickerPair(ticker HuobiTicker) { func (p *HuobiProvider) setCandlePair(candle HuobiCandle) { p.mtx.Lock() defer p.mtx.Unlock() - p.candles[candle.CH] = candle + // huobi time period comes in seconds + candle.Tick.TimeStamp = candle.Tick.TimeStamp * 1000 + staleTime := PastUnixTime(providerCandlePeriod) + candleList := []HuobiCandle{} + candleList = append(candleList, candle) + + for _, c := range p.candles[candle.CH] { + if staleTime < c.Tick.TimeStamp { + candleList = append(candleList, c) + } + } + p.candles[candle.CH] = candleList } // reconnect closes the last WS connection and create a new one. @@ -291,14 +317,37 @@ func (p *HuobiProvider) subscribeCandlePair(cp types.CurrencyPair) error { } func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + ticker, ok := p.tickers[currencyPairToHuobiTickerPair(cp)] if !ok { - return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String()) + return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", cp.String()) } return ticker.toTickerPrice() } +func (p *HuobiProvider) getCandlePrices(cp types.CurrencyPair) ([]CandlePrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + candles, ok := p.candles[currencyPairToHuobiCandlePair(cp)] + if !ok { + return []CandlePrice{}, fmt.Errorf("failed to get candles price for %s", cp.String()) + } + + candleList := []CandlePrice{} + for _, candle := range candles { + cp, err := candle.toCandlePrice() + if err != nil { + return []CandlePrice{}, err + } + candleList = append(candleList, cp) + } + return candleList, nil +} + // setSubscribedPairs sets N currency pairs to the map of subscribed pairs. func (p *HuobiProvider) setSubscribedPairs(cps ...types.CurrencyPair) { p.mtx.Lock() @@ -330,6 +379,16 @@ func (ticker HuobiTicker) toTickerPrice() (TickerPrice, error) { ) } +func (candle HuobiCandle) toCandlePrice() (CandlePrice, error) { + return newCandlePrice( + "Huobi", + candle.CH, + strconv.FormatFloat(candle.Tick.Close, 'f', -1, 64), + strconv.FormatFloat(candle.Tick.Volume, 'f', -1, 64), + candle.Tick.TimeStamp, + ) +} + // newHuobiTickerSubscriptionMsg returns a new ticker subscription Msg. func newHuobiTickerSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg { return HuobiSubscriptionMsg{ diff --git a/price-feeder/oracle/provider/huobi_test.go b/price-feeder/oracle/provider/huobi_test.go index 48073ade5a..9b87c5848b 100644 --- a/price-feeder/oracle/provider/huobi_test.go +++ b/price-feeder/oracle/provider/huobi_test.go @@ -75,7 +75,7 @@ func TestHuobiProvider_GetTickerPrices(t *testing.T) { t.Run("invalid_request_invalid_ticker", func(t *testing.T) { prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) require.Error(t, err) - require.Equal(t, "huobi provider failed to get ticker price for FOOBAR", err.Error()) + require.Equal(t, "failed to get ticker price for FOOBAR", err.Error()) require.Nil(t, prices) }) } diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index b77e4ea460..cacc42f8da 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/url" + "strconv" "strings" "sync" "time" @@ -31,9 +32,9 @@ type ( wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mtx sync.Mutex + mtx sync.RWMutex tickers map[string]TickerPrice // Symbol => TickerPrice - candles map[string]KrakenCandle // Symbol => KrakenCandle + candles map[string][]KrakenCandle // Symbol => KrakenCandle subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } @@ -50,6 +51,7 @@ type ( Close string // Close price during this period TimeStamp int64 // Linux epoch timestamp Volume string // Volume during this period + Symbol string // Symbol for this candle } // KrakenSubscriptionMsg Msg to subscribe to all the pairs at once. @@ -99,7 +101,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type wsClient: wsConn, logger: logger.With().Str("provider", "kraken").Logger(), tickers: map[string]TickerPrice{}, - candles: map[string]KrakenCandle{}, + candles: map[string][]KrakenCandle{}, subscribedPairs: map[string]types.CurrencyPair{}, } @@ -114,6 +116,9 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type // GetTickerPrices returns the tickerPrices based on the saved map. func (p *KrakenProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + tickerPrices := make(map[string]TickerPrice, len(pairs)) for _, cp := range pairs { @@ -128,6 +133,52 @@ func (p *KrakenProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[strin return tickerPrices, nil } +// GetCandlePrices returns the candlePrices based on the saved map. +func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + candlePrices := make(map[string][]CandlePrice, len(pairs)) + + for _, cp := range pairs { + key := cp.String() + candlePrice, err := p.getCandlePrices(key) + if err != nil { + return nil, err + } + candlePrices[key] = candlePrice + } + + return candlePrices, nil +} + +func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) { + return newCandlePrice( + "Kraken", + candle.Symbol, + candle.Close, + candle.Volume, + candle.TimeStamp, + ) +} + +func (p *KrakenProvider) getCandlePrices(key string) ([]CandlePrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + candles, ok := p.candles[key] + if !ok { + return []CandlePrice{}, fmt.Errorf("failed to get candle prices for %s", key) + } + + candleList := []CandlePrice{} + for _, candle := range candles { + cp, err := candle.toCandlePrice() + if err != nil { + return []CandlePrice{}, err + } + candleList = append(candleList, cp) + } + return candleList, nil +} + // SubscribeTickers subscribe all currency pairs into ticker and candle channels. func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { pairs := make([]string, len(cps)) @@ -229,7 +280,7 @@ func (p *KrakenProvider) messageReceivedTickerPrice(bz []byte) error { return fmt.Errorf("received an unexpected structure") } - channelName, ok := tickerMessage[1].(string) + channelName, ok := tickerMessage[2].(string) if !ok || channelName != "ticker" { return fmt.Errorf("received an unexpected channel name") } @@ -274,11 +325,16 @@ func (kc *KrakenCandle) UnmarshalJSON(buf []byte) error { return fmt.Errorf("wrong number of fields in candle") } - time, ok := tmp[2].(int64) + // timestamps come as a float string + time, ok := tmp[1].(string) if !ok { - return fmt.Errorf("time field must be an int64") + return fmt.Errorf("time field must be a string") + } + timeFloat, err := strconv.ParseFloat(time, 64) + if err != nil { + return fmt.Errorf("unable to convert time to float") } - kc.TimeStamp = time + kc.TimeStamp = int64(timeFloat) close, ok := tmp[5].(string) if !ok { @@ -319,7 +375,7 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error { } var krakenCandle KrakenCandle - if err = krakenCandle.UnmarshalJSON(tickerBz); err != nil { + if err := krakenCandle.UnmarshalJSON(tickerBz); err != nil { return err } @@ -330,8 +386,9 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error { krakenPair = normalizeKrakenBTCPair(krakenPair) currencyPairSymbol := krakenPairToCurrencyPairSymbol(krakenPair) + krakenCandle.Symbol = currencyPairSymbol - p.setCandlePair(currencyPairSymbol, krakenCandle) + p.setCandlePair(krakenCandle) return nil } @@ -422,10 +479,19 @@ func (p *KrakenProvider) setTickerPair(symbol string, ticker TickerPrice) { p.tickers[symbol] = ticker } -func (p *KrakenProvider) setCandlePair(symbol string, candle KrakenCandle) { +func (p *KrakenProvider) setCandlePair(candle KrakenCandle) { p.mtx.Lock() defer p.mtx.Unlock() - p.candles[symbol] = candle + staleTime := PastUnixTime(providerCandlePeriod) + candleList := []KrakenCandle{} + + candleList = append(candleList, candle) + for _, c := range p.candles[candle.Symbol] { + if staleTime < c.TimeStamp { + candleList = append(candleList, c) + } + } + p.candles[candle.Symbol] = candleList } // ping to check websocket connection. diff --git a/price-feeder/oracle/provider/mock.go b/price-feeder/oracle/provider/mock.go index 3172164664..a64f226d7b 100644 --- a/price-feeder/oracle/provider/mock.go +++ b/price-feeder/oracle/provider/mock.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "time" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/umee-network/umee/price-feeder/oracle/types" @@ -94,3 +95,21 @@ func (p MockProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]T return tickerPrices, nil } + +func (p MockProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + price, err := p.GetTickerPrices(pairs...) + if err != nil { + return nil, err + } + candles := make(map[string][]CandlePrice) + for pair, price := range price { + candles[pair] = []CandlePrice{ + { + Price: price.Price, + Volume: price.Volume, + TimeStamp: PastUnixTime(1 * time.Minute), + }, + } + } + return candles, nil +} diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index 1dfd62ef55..f726bb467b 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -33,9 +33,9 @@ type ( wsClient *websocket.Conn logger zerolog.Logger reconnectTimer *time.Ticker - mtx sync.Mutex + mtx sync.RWMutex tickers map[string]OkxTickerPair // InstId => OkxTickerPair - candles map[string]OkxCandlePair // InstId => 0kxCandlePair + candles map[string][]OkxCandlePair // InstId => 0kxCandlePair subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } @@ -60,9 +60,10 @@ type ( // OkxCandlePair defines a candle for Okx. OkxCandlePair struct { - Close string `json:"c"` // Close price for this time period - TimeStamp int64 `json:"ts"` // Linux epoch timestamp - Volume string `json:"vol"` // Volume for this time period + Close string `json:"c"` // Close price for this time period + TimeStamp int64 `json:"ts"` // Linux epoch timestamp + Volume string `json:"vol"` // Volume for this time period + InstId string `json:"instId"` // Instrument ID ex.: BTC-USDT } // OkxCandleResponse defines the response structure of a Okx candle request. @@ -103,7 +104,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C logger: logger.With().Str("provider", "okx").Logger(), reconnectTimer: time.NewTicker(okxPingCheck), tickers: map[string]OkxTickerPair{}, - candles: map[string]OkxCandlePair{}, + candles: map[string][]OkxCandlePair{}, subscribedPairs: map[string]types.CurrencyPair{}, } provider.wsClient.SetPongHandler(provider.pongHandler) @@ -133,6 +134,22 @@ func (p *OkxProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]T return tickerPrices, nil } +// GetCandlePrices returns the candlePrices based on the saved map +func (p *OkxProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + candlePrices := make(map[string][]CandlePrice, len(pairs)) + + for _, currencyPair := range pairs { + candles, err := p.getCandlePrices(currencyPair) + if err != nil { + return nil, err + } + + candlePrices[currencyPair.String()] = candles + } + + return candlePrices, nil +} + // SubscribeTickers subscribe all currency pairs into ticker and candle channels. func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { topics := make([]OkxSubscriptionTopic, len(cps)*2) @@ -155,6 +172,9 @@ func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { } func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + instrumentId := currencyPairToOkxPair(cp) tickerPair, ok := p.tickers[instrumentId] if !ok { @@ -164,6 +184,27 @@ func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) return tickerPair.toTickerPrice() } +func (p *OkxProvider) getCandlePrices(cp types.CurrencyPair) ([]CandlePrice, error) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + instrumentId := currencyPairToOkxPair(cp) + candles, ok := p.candles[instrumentId] + if !ok { + return []CandlePrice{}, fmt.Errorf("failed to get candle prices for %s", instrumentId) + } + candleList := []CandlePrice{} + for _, candle := range candles { + cp, err := candle.toCandlePrice() + if err != nil { + return []CandlePrice{}, err + } + candleList = append(candleList, cp) + } + + return candleList, nil +} + func (p *OkxProvider) handleReceivedTickers(ctx context.Context) { for { select { @@ -248,11 +289,22 @@ func (p *OkxProvider) setCandlePair(pairData []string, instID string) { return } // the candlesticks channel uses an array of strings. - p.candles[instID] = OkxCandlePair{ + candle := OkxCandlePair{ Close: pairData[4], + InstId: instID, Volume: pairData[5], TimeStamp: ts, } + staleTime := PastUnixTime(providerCandlePeriod) + candleList := []OkxCandlePair{} + + candleList = append(candleList, candle) + for _, c := range p.candles[instID] { + if staleTime < c.TimeStamp { + candleList = append(candleList, c) + } + } + p.candles[instID] = candleList } // setSubscribedPairs sets N currency pairs to the map of subscribed pairs. @@ -317,6 +369,10 @@ func (ticker OkxTickerPair) toTickerPrice() (TickerPrice, error) { return newTickerPrice("Okx", ticker.InstId, ticker.Last, ticker.Vol24h) } +func (candle OkxCandlePair) toCandlePrice() (CandlePrice, error) { + return newCandlePrice("Okx", candle.InstId, candle.Close, candle.Volume, candle.TimeStamp) +} + // currencyPairToOkxPair returns the expected pair instrument ID for Okx // ex.: "BTC-USDT". func currencyPairToOkxPair(pair types.CurrencyPair) string { diff --git a/price-feeder/oracle/provider/osmosis.go b/price-feeder/oracle/provider/osmosis.go index b6a31fe640..7b58caf0db 100644 --- a/price-feeder/oracle/provider/osmosis.go +++ b/price-feeder/oracle/provider/osmosis.go @@ -14,8 +14,9 @@ import ( ) const ( - osmosisBaseURL = "https://api-osmosis.imperator.co" - osmosisTokenEndpoint = "/tokens/v1" + osmosisBaseURL = "https://api-osmosis.imperator.co" + osmosisTokenEndpoint = "/tokens/v1" + osmosisCandleEndpoint = "/tokens/v2/historical" ) var _ Provider = (*OsmosisProvider)(nil) @@ -37,6 +38,14 @@ type ( Symbol string `json:"symbol"` Volume float64 `json:"volume_24h"` } + + // OsmosisCandleResponse defines the response structure for an Osmosis candle + // request. + OsmosisCandleResponse struct { + Time int64 `json:"time"` + Close float64 `json:"close"` + Volume float64 `json:"volume"` + } ) func NewOsmosisProvider() *OsmosisProvider { @@ -115,3 +124,45 @@ func (p OsmosisProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[strin return tickerPrices, nil } + +func (p OsmosisProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) { + candles := make(map[string][]CandlePrice) + for _, pair := range pairs { + if _, ok := candles[pair.Base]; !ok { + candles[pair.String()] = []CandlePrice{} + } + + path := fmt.Sprintf("%s%s/%s/chart?tf=5", p.baseURL, osmosisCandleEndpoint, pair.Base) + + resp, err := p.client.Get(path) + if err != nil { + return nil, fmt.Errorf("failed to make Osmosis request: %w", err) + } + + defer resp.Body.Close() + + bz, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read Osmosis response body: %w", err) + } + + var candlesResp []OsmosisCandleResponse + if err := json.Unmarshal(bz, &candlesResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal Osmosis response body: %w", err) + } + + candlePrices := []CandlePrice{} + for _, responseCandle := range candlesResp { + closeStr := fmt.Sprintf("%f", responseCandle.Close) + volumeStr := fmt.Sprintf("%f", responseCandle.Volume) + candlePrices = append(candlePrices, CandlePrice{ + Price: sdk.MustNewDecFromStr(closeStr), + Volume: sdk.MustNewDecFromStr(volumeStr), + TimeStamp: responseCandle.Time, + }) + } + candles[pair.String()] = candlePrices + } + + return candles, nil +} diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 0e8fa0ac02..c3b1dddf7e 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -15,6 +15,7 @@ const ( defaultMaxConnectionTime = time.Hour * 23 // should be < 24h defaultReconnectTime = time.Minute * 20 maxReconnectionTries = 3 + providerCandlePeriod = 10 * time.Minute ) var ping = []byte("ping") @@ -22,6 +23,7 @@ var ping = []byte("ping") // Provider defines an interface an exchange price provider must implement. type Provider interface { GetTickerPrices(...types.CurrencyPair) (map[string]TickerPrice, error) + GetCandlePrices(...types.CurrencyPair) (map[string][]CandlePrice, error) } // TickerPrice defines price and volume information for a symbol or ticker @@ -31,6 +33,14 @@ type TickerPrice struct { Volume sdk.Dec // 24h volume } +// CandlePrice defines price, volume, and time information for an +// exchange rate. +type CandlePrice struct { + Price sdk.Dec // last trade price + Volume sdk.Dec // volume + TimeStamp int64 // timestamp +} + // preventRedirect avoid any redirect in the http.Client the request call // will not return an error, but a valid response with redirect response code. func preventRedirect(_ *http.Request, _ []*http.Request) error { @@ -61,3 +71,23 @@ func newTickerPrice(provider, symbol, lastPrice, volume string) (TickerPrice, er return TickerPrice{Price: price, Volume: volumeDec}, nil } + +func newCandlePrice(provider, symbol, lastPrice, volume string, timeStamp int64) (CandlePrice, error) { + price, err := sdk.NewDecFromStr(lastPrice) + if err != nil { + return CandlePrice{}, fmt.Errorf("failed to parse %s price (%s) for %s", provider, lastPrice, symbol) + } + + volumeDec, err := sdk.NewDecFromStr(volume) + if err != nil { + return CandlePrice{}, fmt.Errorf("failed to parse %s volume (%s) for %s", provider, volume, symbol) + } + + return CandlePrice{Price: price, Volume: volumeDec, TimeStamp: timeStamp}, nil +} + +// PastUnixTime returns a millisecond timestamp that represents the unix time +// minus t. +func PastUnixTime(t time.Duration) int64 { + return time.Now().Add(t*-1).Unix() * 1000 +} diff --git a/price-feeder/oracle/util.go b/price-feeder/oracle/util.go index 669ddbb0ee..e1bdb9082e 100644 --- a/price-feeder/oracle/util.go +++ b/price-feeder/oracle/util.go @@ -2,11 +2,35 @@ package oracle import ( "fmt" + "sort" + "time" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/umee-network/umee/price-feeder/oracle/provider" ) +var minimumTimeWeight = sdk.MustNewDecFromStr("0.2") + +// tvwapCandlePeriod represents the time period we use for tvwap in minutes. +const tvwapCandlePeriod = 3 * time.Minute + +// compute VWAP for each base by dividing the Σ {P * V} by Σ {V} +func vwap(weightedPrices map[string]sdk.Dec, volumeSum map[string]sdk.Dec) (map[string]sdk.Dec, error) { + vwap := make(map[string]sdk.Dec) + + for base, p := range weightedPrices { + if !volumeSum[base].Equal(sdk.ZeroDec()) { + if _, ok := vwap[base]; !ok { + vwap[base] = sdk.ZeroDec() + } + + vwap[base] = p.Quo(volumeSum[base]) + } + } + + return vwap, nil +} + // ComputeVWAP computes the volume weighted average price for all price points // for each ticker/exchange pair. The provided prices argument reflects a mapping // of provider => { => , ...}. @@ -14,43 +38,92 @@ import ( // Ref: https://en.wikipedia.org/wiki/Volume-weighted_average_price func ComputeVWAP(prices map[string]map[string]provider.TickerPrice) (map[string]sdk.Dec, error) { var ( - vwap = make(map[string]sdk.Dec) - volumeSum = make(map[string]sdk.Dec) + weightedPrices = make(map[string]sdk.Dec) + volumeSum = make(map[string]sdk.Dec) ) for _, providerPrices := range prices { for base, tp := range providerPrices { - if _, ok := vwap[base]; !ok { - vwap[base] = sdk.ZeroDec() + if _, ok := weightedPrices[base]; !ok { + weightedPrices[base] = sdk.ZeroDec() } if _, ok := volumeSum[base]; !ok { volumeSum[base] = sdk.ZeroDec() } - // vwap[base] = Σ {P * V} for all TickerPrice - vwap[base] = vwap[base].Add(tp.Price.Mul(tp.Volume)) + // weightedPrices[base] = Σ {P * V} for all TickerPrice + weightedPrices[base] = weightedPrices[base].Add(tp.Price.Mul(tp.Volume)) // track total volume for each base volumeSum[base] = volumeSum[base].Add(tp.Volume) } } - // compute VWAP for each base by dividing the Σ {P * V} by Σ {V} - for base, p := range vwap { - if volumeSum[base] == sdk.ZeroDec() { - return nil, fmt.Errorf("unable to divide by zero") + return vwap(weightedPrices, volumeSum) +} + +// ComputeTVWAP computes the time volume weighted average price for all points +// for each exchange pair. Filters out any candles that did not occur within +// timePeriod. The provided prices argument reflects a mapping of +// provider => { => , ...}. +// +// Ref : https://en.wikipedia.org/wiki/Time-weighted_average_price +func ComputeTVWAP(prices map[string]map[string][]provider.CandlePrice) (map[string]sdk.Dec, error) { + var ( + weightedPrices = make(map[string]sdk.Dec) + volumeSum = make(map[string]sdk.Dec) + now = provider.PastUnixTime(0) + timePeriod = provider.PastUnixTime(tvwapCandlePeriod) + ) + + for _, providerPrices := range prices { + for base, cp := range providerPrices { + if _, ok := weightedPrices[base]; !ok { + weightedPrices[base] = sdk.ZeroDec() + } + if _, ok := volumeSum[base]; !ok { + volumeSum[base] = sdk.ZeroDec() + } + + // Sort by timestamp old -> new + sort.SliceStable(cp, func(i, j int) bool { + return cp[i].TimeStamp < cp[j].TimeStamp + }) + + period := sdk.NewDec(now - cp[0].TimeStamp) + if period.Equal(sdk.ZeroDec()) { + return nil, fmt.Errorf("unable to divide by zero") + } + // weightUnit = (1 - minimumTimeWeight) / period + weightUnit := sdk.OneDec().Sub(minimumTimeWeight).Quo(period) + + // get weighted prices, and sum of volumes + for _, candle := range cp { + // we only want candles within the last timePeriod + if timePeriod < candle.TimeStamp { + // timeDiff = now - candle.TimeStamp + timeDiff := sdk.NewDec(now - candle.TimeStamp) + // volume = candle.Volume * (weightUnit * (period - timeDiff) + minimumTimeWeight) + volume := candle.Volume.Mul( + weightUnit.Mul(period.Sub(timeDiff).Add(minimumTimeWeight)), + ) + volumeSum[base] = volumeSum[base].Add(volume) + weightedPrices[base] = weightedPrices[base].Add(candle.Price.Mul(volume)) + } + } + } - vwap[base] = p.Quo(volumeSum[base]) } - return vwap, nil + return vwap(weightedPrices, volumeSum) } // StandardDeviation returns maps of the standard deviations and means of assets. // Will skip calculating for an asset if there are less than 3 prices. func StandardDeviation( prices map[string]map[string]provider.TickerPrice) ( - map[string]sdk.Dec, map[string]sdk.Dec, error) { + map[string]sdk.Dec, map[string]sdk.Dec, error, +) { var ( deviations = make(map[string]sdk.Dec) means = make(map[string]sdk.Dec)