Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tvwap calculations #601

Merged
merged 11 commits into from
Mar 4, 2022
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.
- [#601](https://github.com/umee-network/umee/pull/601) Use TVWAP formula for determining prices when available.

### Bug Fixes

Expand Down
64 changes: 49 additions & 15 deletions price-feeder/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,16 @@ func (o *Oracle) GetPrices() map[string]sdk.Dec {
return prices
}

// SetPrices retrieve all the prices from our set of providers as determined
// in the config, average them out, and update the oracle's current exchange
// rates.
// SetPrices retrieves all the prices and candles from our set of providers as
// determined in the config. If candles are available, uses TVWAP in order
// to determine prices. If candles are not available, uses the most recent prices
// with VWAP. Warns the the user of any missing prices, and filters out any faulty
// providers which do not report prices within 2𝜎 of the others.
func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList) error {
g := new(errgroup.Group)
mtx := new(sync.Mutex)
providerPrices := make(map[string]map[string]provider.TickerPrice)
providerCandles := make(map[string]map[string][]provider.CandlePrice)
requiredRates := make(map[string]struct{})

for providerName, currencyPairs := range o.providerPairs {
Expand Down Expand Up @@ -185,19 +188,36 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
return err
}

candles, err := priceProvider.GetCandlePrices(acceptedPairs...)
adamewozniak marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
telemetry.IncrCounter(1, "failure", "provider")
return err
}

// flatten and collect prices based on the base currency per provider
//
// e.g.: {ProviderKraken: {"ATOM": <price, volume>, ...}}
mtx.Lock()
for _, cp := range acceptedPairs {
for _, pair := range acceptedPairs {
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := providerPrices[providerName]; !ok {
providerPrices[providerName] = make(map[string]provider.TickerPrice)
}
if tp, ok := prices[cp.String()]; ok {
providerPrices[providerName][cp.Base] = tp
} else {
if _, ok := providerCandles[providerName]; !ok {
providerCandles[providerName] = make(map[string][]provider.CandlePrice)
}

tp, pricesOk := prices[pair.String()]
cp, candlesOk := candles[pair.String()]
if pricesOk {
providerPrices[providerName][pair.Base] = tp
}
if candlesOk {
providerCandles[providerName][pair.Base] = cp
}

if !pricesOk && !candlesOk {
mtx.Unlock()
return fmt.Errorf("failed to find exchange rate in provider response")
return fmt.Errorf("failed to find any exchange rates in provider response")
}
}
mtx.Unlock()
Expand All @@ -219,6 +239,7 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
}
}

// warn the user of any missing prices
if len(reportedRates) != len(requiredRates) {
return fmt.Errorf("unable to get prices for all exchange rates")
}
Expand All @@ -228,17 +249,29 @@ func (o *Oracle) SetPrices(ctx context.Context, acceptList oracletypes.DenomList
}
}

filteredProviderPrices, err := o.filterDeviations(providerPrices)
// attempt to use candles for tvwap calculations
tvwapPrices, err := ComputeTVWAP(providerCandles)
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

vwapPrices, err := ComputeVWAP(filteredProviderPrices)
if err != nil {
return err
}
// If TVWAP candles are not available or were filtered out due to staleness,
// use most recent prices & VWAP instead.
if len(tvwapPrices) == 0 {
filteredProviderPrices, err := o.filterDeviations(providerPrices)
if err != nil {
return err
}

o.prices = vwapPrices
vwapPrices, err := ComputeVWAP(filteredProviderPrices)
if err != nil {
return err
}

o.prices = vwapPrices
} else {
o.prices = tvwapPrices
}
adamewozniak marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand Down Expand Up @@ -323,7 +356,8 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro
// all assets, and filter out any providers that are not within 2𝜎 of the mean.
func (o *Oracle) filterDeviations(
prices map[string]map[string]provider.TickerPrice) (
map[string]map[string]provider.TickerPrice, error) {
map[string]map[string]provider.TickerPrice, error,
) {
var (
filteredPrices = make(map[string]map[string]provider.TickerPrice)
threshold = sdk.MustNewDecFromStr("2")
Expand Down
18 changes: 18 additions & 0 deletions price-feeder/oracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ func (m mockProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]provi
return m.prices, nil
}

func (m mockProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) {
candles := make(map[string][]provider.CandlePrice)
for pair, price := range m.prices {
candles[pair] = []provider.CandlePrice{
{
Price: price.Price,
TimeStamp: provider.PastUnixTime(1 * time.Minute),
Volume: price.Volume,
},
}
}
return candles, nil
}

type failingProvider struct {
prices map[string]provider.TickerPrice
}
Expand All @@ -35,6 +49,10 @@ func (m failingProvider) GetTickerPrices(_ ...types.CurrencyPair) (map[string]pr
return nil, fmt.Errorf("unable to get ticker prices")
}

func (m failingProvider) GetCandlePrices(_ ...types.CurrencyPair) (map[string][]provider.CandlePrice, error) {
return nil, fmt.Errorf("unable to get candle prices")
}

type OracleTestSuite struct {
suite.Suite

Expand Down
60 changes: 52 additions & 8 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mtx sync.Mutex
mtx sync.RWMutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
candles map[string][]BinanceCandle // Symbol => BinanceCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
wsClient: wsConn,
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
candles: map[string]BinanceCandle{},
candles: map[string][]BinanceCandle{},
subscribedPairs: map[string]types.CurrencyPair{},
}

Expand Down Expand Up @@ -136,7 +136,26 @@ func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[stri
return tickerPrices, nil
}

// GetCandlePrices returns the candlePrices based on the provided pairs.
func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) {
candlePrices := make(map[string][]CandlePrice, len(pairs))

for _, cp := range pairs {
key := cp.String()
prices, err := p.getCandlePrices(key)
if err != nil {
return nil, err
}
candlePrices[key] = prices
}

return candlePrices, nil
}

func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

ticker, ok := p.tickers[key]
if !ok {
return TickerPrice{}, fmt.Errorf("binance provider failed to get ticker price for %s", key)
Expand All @@ -145,13 +164,24 @@ func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
return ticker.toTickerPrice()
}

func (p *BinanceProvider) getTickerTrades(key string) (BinanceCandle, error) {
candle, ok := p.candles[key]
func (p *BinanceProvider) getCandlePrices(key string) ([]CandlePrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

candles, ok := p.candles[key]
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return BinanceCandle{}, fmt.Errorf("failed to get ticker trades for %s", key)
return []CandlePrice{}, fmt.Errorf("failed to get candle prices for %s", key)
}

return candle, nil
candleList := []CandlePrice{}
for _, candle := range candles {
cp, err := candle.toCandlePrice()
if err != nil {
return []CandlePrice{}, err
}
candleList = append(candleList, cp)
}
return candleList, nil
}

func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
Expand Down Expand Up @@ -191,13 +221,27 @@ func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) {
func (p *BinanceProvider) setCandlePair(candle BinanceCandle) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.Symbol] = candle
staleTime := PastUnixTime(providerCandlePeriod)
candleList := []BinanceCandle{}
candleList = append(candleList, candle)

for _, c := range p.candles[candle.Symbol] {
if staleTime < c.Metadata.TimeStamp {
candleList = append(candleList, c)
}
}
p.candles[candle.Symbol] = candleList
}

func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume)
}

func (candle BinanceCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice("Binance", candle.Symbol, candle.Metadata.Close, candle.Metadata.Volume,
candle.Metadata.TimeStamp)
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()
Expand Down
75 changes: 67 additions & 8 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mtx sync.Mutex
mtx sync.RWMutex
tickers map[string]HuobiTicker // market.$symbol.ticker => HuobiTicker
candles map[string]HuobiCandle // market.$symbol.kline.$period => HuobiCandle
candles map[string][]HuobiCandle // market.$symbol.kline.$period => HuobiCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

Expand All @@ -65,9 +65,9 @@ type (

// HuobiCandleTick defines the response type for the candle.
HuobiCandleTick struct {
Close float64 `json:"close"` // Closing price during this period
TimeStamp int64 `json:"id"` // TimeStamp for this as an ID
Volume float64 `json:"volume"` // Volume during this period
Close float64 `json:"close"` // Closing price during this period
TimeStamp int64 `json:"id"` // TimeStamp for this as an ID
Volume float64 `json:"vol"` // Volume during this period
}

// HuobiSubscriptionMsg Msg to subscribe to one ticker channel at time.
Expand All @@ -94,7 +94,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types
wsClient: wsConn,
logger: logger.With().Str("provider", "huobi").Logger(),
tickers: map[string]HuobiTicker{},
candles: map[string]HuobiCandle{},
candles: map[string][]HuobiCandle{},
subscribedPairs: map[string]types.CurrencyPair{},
}

Expand Down Expand Up @@ -122,6 +122,21 @@ func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string
return tickerPrices, nil
}

// GetTickerPrices returns the tickerPrices based on the saved map.
func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]CandlePrice, error) {
candlePrices := make(map[string][]CandlePrice, len(pairs))

for _, cp := range pairs {
price, err := p.getCandlePrices(cp)
if err != nil {
return nil, err
}
candlePrices[cp.String()] = price
}

return candlePrices, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
for _, cp := range cps {
Expand Down Expand Up @@ -252,7 +267,18 @@ func (p *HuobiProvider) setTickerPair(ticker HuobiTicker) {
func (p *HuobiProvider) setCandlePair(candle HuobiCandle) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.CH] = candle
// huobi time period comes in seconds
candle.Tick.TimeStamp = candle.Tick.TimeStamp * 1000
staleTime := PastUnixTime(providerCandlePeriod)
candleList := []HuobiCandle{}
candleList = append(candleList, candle)

for _, c := range p.candles[candle.CH] {
if staleTime < c.Tick.TimeStamp {
candleList = append(candleList, c)
}
}
p.candles[candle.CH] = candleList
}

// reconnect closes the last WS connection and create a new one.
Expand Down Expand Up @@ -291,14 +317,37 @@ func (p *HuobiProvider) subscribeCandlePair(cp types.CurrencyPair) error {
}

func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

ticker, ok := p.tickers[currencyPairToHuobiTickerPair(cp)]
if !ok {
return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String())
return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", cp.String())
}

return ticker.toTickerPrice()
}

func (p *HuobiProvider) getCandlePrices(cp types.CurrencyPair) ([]CandlePrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()

candles, ok := p.candles[currencyPairToHuobiCandlePair(cp)]
if !ok {
return []CandlePrice{}, fmt.Errorf("failed to get candles price for %s", cp.String())
}

candleList := []CandlePrice{}
for _, candle := range candles {
cp, err := candle.toCandlePrice()
if err != nil {
return []CandlePrice{}, err
}
candleList = append(candleList, cp)
}
return candleList, nil
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *HuobiProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
Expand Down Expand Up @@ -330,6 +379,16 @@ func (ticker HuobiTicker) toTickerPrice() (TickerPrice, error) {
)
}

func (candle HuobiCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice(
"Huobi",
candle.CH,
strconv.FormatFloat(candle.Tick.Close, 'f', -1, 64),
strconv.FormatFloat(candle.Tick.Volume, 'f', -1, 64),
candle.Tick.TimeStamp,
)
}

// newHuobiTickerSubscriptionMsg returns a new ticker subscription Msg.
func newHuobiTickerSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg {
return HuobiSubscriptionMsg{
Expand Down
Loading