diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index 8f33b45094..b480613e86 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -67,6 +67,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Refactor - [#587](https://github.com/umee-network/umee/pull/587) Clean up logs from price feeder providers. +- [#610](https://github.com/umee-network/umee/pull/610) Split subscribtion of ticker and candle channels. ## [v0.1.0](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.1.0) - 2022-02-07 diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index 7055784e9a..2166029bae 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -91,7 +91,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -100,26 +100,6 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ return provider, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - pairs := make([]string, len(cps)*2) - - iterator := 0 - for _, cp := range cps { - pairs[iterator] = currencyPairToBinanceTickerPair(cp) - iterator++ - pairs[iterator] = currencyPairToBinanceCandlePair(cp) - iterator++ - } - - if err := p.subscribePairs(pairs...); err != nil { - return err - } - - p.setSubscribedPairs(cps...) - return nil -} - // GetTickerPrices returns the tickerPrices based on the provided pairs. func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { tickerPrices := make(map[string]TickerPrice, len(pairs)) @@ -152,6 +132,55 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri return candlePrices, nil } +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *BinanceProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe to the ticker and candle channels for all currency pairs. +func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe to the ticker channel for all currency pairs. +func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToBinanceTickerPair(cp) + } + + return p.subscribePairs(pairs...) +} + +// subscribeCandles subscribe to the candle channel for all currency pairs. +func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToBinanceCandlePair(cp) + } + + return p.subscribePairs(pairs...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as a slice. +func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return mapPairsToSlice(p.subscribedPairs) +} + func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { p.mtx.RLock() defer p.mtx.RUnlock() @@ -289,16 +318,8 @@ func (p *BinanceProvider) reconnect() error { } p.wsClient = wsConn - pairs := make([]string, len(p.subscribedPairs)*2) - iterator := 0 - for _, cp := range p.subscribedPairs { - pairs[iterator] = currencyPairToBinanceTickerPair(cp) - iterator++ - pairs[iterator] = currencyPairToBinanceCandlePair(cp) - iterator++ - } - - return p.subscribePairs(pairs...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in reconnect. diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index aefe6291cb..83191f9004 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -98,7 +98,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -137,21 +137,55 @@ func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string return candlePrices, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error { +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe all currency pairs into ticker channel. +func (p *HuobiProvider) subscribeTickers(cps ...types.CurrencyPair) error { for _, cp := range cps { if err := p.subscribeTickerPair(cp); err != nil { return err } + } + + return nil +} + +// subscribeCandles subscribe all currency pairs into candle channel. +func (p *HuobiProvider) subscribeCandles(cps ...types.CurrencyPair) error { + for _, cp := range cps { if err := p.subscribeCandlePair(cp); err != nil { return err } } - p.setSubscribedPairs(cps...) return nil } +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return mapPairsToSlice(p.subscribedPairs) +} + func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) { reconnectTicker := time.NewTicker(huobiReconnectTime) for { @@ -292,16 +326,8 @@ func (p *HuobiProvider) reconnect() error { } p.wsClient = wsConn - for _, cp := range p.subscribedPairs { - if err := p.subscribeTickerPair(cp); err != nil { - return err - } - if err := p.subscribeCandlePair(cp); err != nil { - return err - } - } - - return nil + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // subscribeTickerPair write the subscription ticker msg to the provider. diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index efa501cd35..1afe69601e 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -105,7 +105,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -149,6 +149,39 @@ func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[strin return candlePrices, nil } +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *KrakenProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *KrakenProvider) subscribeChannels(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToKrakenPair(cp) + } + + if err := p.subscribeTickers(pairs...); err != nil { + return err + } + + return p.subscribeCandles(pairs...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *KrakenProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return mapPairsToSlice(p.subscribedPairs) +} + func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) { return newCandlePrice( "Kraken", @@ -179,25 +212,6 @@ func (p *KrakenProvider) getCandlePrices(key string) ([]CandlePrice, error) { return candleList, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - pairs := make([]string, len(cps)) - - for i, cp := range cps { - pairs[i] = currencyPairToKrakenPair(cp) - } - - if err := p.subscribeTickerPairs(pairs...); err != nil { - return err - } - if err := p.subscribeCandlePairs(pairs...); err != nil { - return err - } - - p.setSubscribedPairs(cps...) - return nil -} - // handleWebSocketMsgs receive all the messages from the provider and controls the // reconnect function to the web socket. func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { @@ -211,6 +225,12 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { case <-time.After(defaultReadNewWSMessage): messageType, bz, err := p.wsClient.ReadMessage() if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) { + p.logger.Err(err).Msg("WebSocket closed unexpectedly") + p.keepReconnecting() + continue + } + // if some error occurs continue to try to read the next message. p.logger.Err(err).Msg("could not read message") if err := p.ping(); err != nil { @@ -395,6 +415,7 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error { // reconnect closes the last WS connection and create a new one. func (p *KrakenProvider) reconnect() error { p.wsClient.Close() + p.logger.Debug().Msg("trying to reconnect") wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil) if err != nil { @@ -402,17 +423,8 @@ func (p *KrakenProvider) reconnect() error { } p.wsClient = wsConn - pairs := make([]string, len(p.subscribedPairs)) - iterator := 0 - for _, cp := range p.subscribedPairs { - pairs[iterator] = currencyPairToKrakenPair(cp) - iterator++ - } - - if err := p.subscribeTickerPairs(pairs...); err != nil { - return err - } - return p.subscribeCandlePairs(pairs...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in recconnect. @@ -501,14 +513,14 @@ func (p *KrakenProvider) ping() error { return p.wsClient.WriteMessage(websocket.PingMessage, ping) } -// subscribeTickerPairs write the subscription msg to the provider. -func (p *KrakenProvider) subscribeTickerPairs(pairs ...string) error { +// subscribeTickers write the subscription msg to the provider. +func (p *KrakenProvider) subscribeTickers(pairs ...string) error { subsMsg := newKrakenTickerSubscriptionMsg(pairs...) return p.wsClient.WriteJSON(subsMsg) } -// subscribeCandlePairs write the subscription msg to the provider. -func (p *KrakenProvider) subscribeCandlePairs(pairs ...string) error { +// subscribeCandles write the subscription msg to the provider. +func (p *KrakenProvider) subscribeCandles(pairs ...string) error { subsMsg := newKrakenCandleSubscriptionMsg(pairs...) return p.wsClient.WriteJSON(subsMsg) } diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index f726bb467b..38ac84e6ca 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -109,7 +109,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C } provider.wsClient.SetPongHandler(provider.pongHandler) - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -150,20 +150,9 @@ func (p *OkxProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][ return candlePrices, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - topics := make([]OkxSubscriptionTopic, len(cps)*2) - - iterator := 0 - for _, cp := range cps { - instId := currencyPairToOkxPair(cp) - topics[iterator] = newOkxSubscriptionTopic(instId) - iterator++ - topics[iterator] = newOkxCandleSubscriptionTopic(instId) - iterator++ - } - - if err := p.subscribePairs(topics...); err != nil { +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { return err } @@ -171,6 +160,45 @@ func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { return nil } +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe all currency pairs into ticker channel. +func (p *OkxProvider) subscribeTickers(cps ...types.CurrencyPair) error { + topics := make([]OkxSubscriptionTopic, len(cps)) + + for i, cp := range cps { + topics[i] = newOkxTickerSubscriptionTopic(currencyPairToOkxPair(cp)) + } + + return p.subscribePairs(topics...) +} + +// subscribeCandles subscribe all currency pairs into candle channel. +func (p *OkxProvider) subscribeCandles(cps ...types.CurrencyPair) error { + topics := make([]OkxSubscriptionTopic, len(cps)) + + for i, cp := range cps { + topics[i] = newOkxCandleSubscriptionTopic(currencyPairToOkxPair(cp)) + } + + return p.subscribePairs(topics...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *OkxProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return mapPairsToSlice(p.subscribedPairs) +} + func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { p.mtx.RLock() defer p.mtx.RUnlock() @@ -342,17 +370,8 @@ func (p *OkxProvider) reconnect() error { wsConn.SetPongHandler(p.pongHandler) p.wsClient = wsConn - topics := make([]OkxSubscriptionTopic, len(p.subscribedPairs)*2) - iterator := 0 - for _, cp := range p.subscribedPairs { - instId := currencyPairToOkxPair(cp) - topics[iterator] = newOkxSubscriptionTopic(instId) - iterator++ - topics[iterator] = newOkxCandleSubscriptionTopic(instId) - iterator++ - } - - return p.subscribePairs(topics...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // ping to check websocket connection. @@ -379,8 +398,8 @@ func currencyPairToOkxPair(pair types.CurrencyPair) string { return pair.Base + "-" + pair.Quote } -// newOkxSubscriptionTopic returns a new subscription topic. -func newOkxSubscriptionTopic(instId string) OkxSubscriptionTopic { +// newOkxTickerSubscriptionTopic returns a new subscription topic. +func newOkxTickerSubscriptionTopic(instId string) OkxSubscriptionTopic { return OkxSubscriptionTopic{ Channel: "tickers", InstId: instId, diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 61673bb618..6892d54124 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -91,3 +91,16 @@ func newCandlePrice(provider, symbol, lastPrice, volume string, timeStamp int64) func PastUnixTime(t time.Duration) int64 { return time.Now().Add(t*-1).Unix() * int64(time.Second/time.Millisecond) } + +// mapPairsToSlice returns the map of currency pairs as slice. +func mapPairsToSlice(mapPairs map[string]types.CurrencyPair) []types.CurrencyPair { + currencyPairs := make([]types.CurrencyPair, len(mapPairs)) + + iterator := 0 + for _, cp := range mapPairs { + currencyPairs[iterator] = cp + iterator++ + } + + return currencyPairs +}