From 5af32c9233fb30829eea1549d46a22a800a0abbd Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 20:41:37 -0300 Subject: [PATCH 01/11] refactor: split subscribed channels of tickers and candles --- price-feeder/oracle/provider/binance.go | 90 ++++++++++++++++--------- 1 file changed, 59 insertions(+), 31 deletions(-) diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index 7055784e9a..19123b9ff4 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -91,7 +91,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPais(pairs...); err != nil { return nil, err } @@ -100,26 +100,6 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ return provider, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - pairs := make([]string, len(cps)*2) - - iterator := 0 - for _, cp := range cps { - pairs[iterator] = currencyPairToBinanceTickerPair(cp) - iterator++ - pairs[iterator] = currencyPairToBinanceCandlePair(cp) - iterator++ - } - - if err := p.subscribePairs(pairs...); err != nil { - return err - } - - p.setSubscribedPairs(cps...) - return nil -} - // GetTickerPrices returns the tickerPrices based on the provided pairs. func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { tickerPrices := make(map[string]TickerPrice, len(pairs)) @@ -152,6 +132,62 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri return candlePrices, nil } +// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. +func (p *BinanceProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe all currency pairs into ticker channel. +func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToBinanceTickerPair(cp) + } + + return p.subscribePairs(pairs...) +} + +// subscribeCandles subscribe all currency pairs into candle channel. +func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToBinanceCandlePair(cp) + } + + return p.subscribePairs(pairs...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + currencyPairs := make([]types.CurrencyPair, len(p.subscribedPairs)) + iterator := 0 + for _, cp := range p.subscribedPairs { + currencyPairs[iterator] = cp + iterator++ + } + + return currencyPairs +} + func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { p.mtx.RLock() defer p.mtx.RUnlock() @@ -289,16 +325,8 @@ func (p *BinanceProvider) reconnect() error { } p.wsClient = wsConn - pairs := make([]string, len(p.subscribedPairs)*2) - iterator := 0 - for _, cp := range p.subscribedPairs { - pairs[iterator] = currencyPairToBinanceTickerPair(cp) - iterator++ - pairs[iterator] = currencyPairToBinanceCandlePair(cp) - iterator++ - } - - return p.subscribePairs(pairs...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in reconnect. From bcc69a40c95d6fc411fc3d4d583b4c306092b835 Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 20:52:22 -0300 Subject: [PATCH 02/11] refactor: split subscribed channels of tickers and candles --- price-feeder/oracle/provider/huobi.go | 61 +++++++++++++++++++++------ 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index aefe6291cb..51e4c3601c 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -98,7 +98,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPais(pairs...); err != nil { return nil, err } @@ -137,21 +137,62 @@ func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string return candlePrices, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error { +// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe all currency pairs into ticker channel. +func (p *HuobiProvider) subscribeTickers(cps ...types.CurrencyPair) error { for _, cp := range cps { if err := p.subscribeTickerPair(cp); err != nil { return err } + } + + return nil +} + +// subscribeCandles subscribe all currency pairs into candle channel. +func (p *HuobiProvider) subscribeCandles(cps ...types.CurrencyPair) error { + for _, cp := range cps { if err := p.subscribeCandlePair(cp); err != nil { return err } } - p.setSubscribedPairs(cps...) return nil } +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + currencyPairs := make([]types.CurrencyPair, len(p.subscribedPairs)) + iterator := 0 + for _, cp := range p.subscribedPairs { + currencyPairs[iterator] = cp + iterator++ + } + + return currencyPairs +} + func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) { reconnectTicker := time.NewTicker(huobiReconnectTime) for { @@ -292,16 +333,8 @@ func (p *HuobiProvider) reconnect() error { } p.wsClient = wsConn - for _, cp := range p.subscribedPairs { - if err := p.subscribeTickerPair(cp); err != nil { - return err - } - if err := p.subscribeCandlePair(cp); err != nil { - return err - } - } - - return nil + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // subscribeTickerPair write the subscription ticker msg to the provider. From a4ab0588a5cd929f18a7f43b869d79d8606f4c0a Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 21:03:55 -0300 Subject: [PATCH 03/11] refactor: created func subscribedPairsToSlice in provider --- price-feeder/oracle/provider/binance.go | 9 +-------- price-feeder/oracle/provider/huobi.go | 9 +-------- price-feeder/oracle/provider/provider.go | 13 +++++++++++++ 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index 19123b9ff4..2d114e58f8 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -178,14 +178,7 @@ func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - currencyPairs := make([]types.CurrencyPair, len(p.subscribedPairs)) - iterator := 0 - for _, cp := range p.subscribedPairs { - currencyPairs[iterator] = cp - iterator++ - } - - return currencyPairs + return subscribedPairsToSlice(p.subscribedPairs) } func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index 51e4c3601c..6d217a190f 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -183,14 +183,7 @@ func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - currencyPairs := make([]types.CurrencyPair, len(p.subscribedPairs)) - iterator := 0 - for _, cp := range p.subscribedPairs { - currencyPairs[iterator] = cp - iterator++ - } - - return currencyPairs + return subscribedPairsToSlice(p.subscribedPairs) } func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) { diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 61673bb618..0c03e620bc 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -91,3 +91,16 @@ func newCandlePrice(provider, symbol, lastPrice, volume string, timeStamp int64) func PastUnixTime(t time.Duration) int64 { return time.Now().Add(t*-1).Unix() * int64(time.Second/time.Millisecond) } + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func subscribedPairsToSlice(subscribedPairs map[string]types.CurrencyPair) []types.CurrencyPair { + currencyPairs := make([]types.CurrencyPair, len(subscribedPairs)) + + iterator := 0 + for _, cp := range subscribedPairs { + currencyPairs[iterator] = cp + iterator++ + } + + return currencyPairs +} From 9d91ae75458ece338baeb1a11e826cbceffac714 Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 21:04:11 -0300 Subject: [PATCH 04/11] refactor: split subscribed channels of tickers and candles --- price-feeder/oracle/provider/kraken.go | 75 ++++++++++++++------------ 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index efa501cd35..81b477c3b7 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -105,7 +105,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPais(pairs...); err != nil { return nil, err } @@ -149,6 +149,39 @@ func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[strin return candlePrices, nil } +// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. +func (p *KrakenProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *KrakenProvider) subscribeChannels(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToKrakenPair(cp) + } + + if err := p.subscribeTickers(pairs...); err != nil { + return err + } + + return p.subscribeCandles(pairs...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *KrakenProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return subscribedPairsToSlice(p.subscribedPairs) +} + func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) { return newCandlePrice( "Kraken", @@ -179,25 +212,6 @@ func (p *KrakenProvider) getCandlePrices(key string) ([]CandlePrice, error) { return candleList, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - pairs := make([]string, len(cps)) - - for i, cp := range cps { - pairs[i] = currencyPairToKrakenPair(cp) - } - - if err := p.subscribeTickerPairs(pairs...); err != nil { - return err - } - if err := p.subscribeCandlePairs(pairs...); err != nil { - return err - } - - p.setSubscribedPairs(cps...) - return nil -} - // handleWebSocketMsgs receive all the messages from the provider and controls the // reconnect function to the web socket. func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { @@ -402,17 +416,8 @@ func (p *KrakenProvider) reconnect() error { } p.wsClient = wsConn - pairs := make([]string, len(p.subscribedPairs)) - iterator := 0 - for _, cp := range p.subscribedPairs { - pairs[iterator] = currencyPairToKrakenPair(cp) - iterator++ - } - - if err := p.subscribeTickerPairs(pairs...); err != nil { - return err - } - return p.subscribeCandlePairs(pairs...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in recconnect. @@ -501,14 +506,14 @@ func (p *KrakenProvider) ping() error { return p.wsClient.WriteMessage(websocket.PingMessage, ping) } -// subscribeTickerPairs write the subscription msg to the provider. -func (p *KrakenProvider) subscribeTickerPairs(pairs ...string) error { +// subscribeTickers write the subscription msg to the provider. +func (p *KrakenProvider) subscribeTickers(pairs ...string) error { subsMsg := newKrakenTickerSubscriptionMsg(pairs...) return p.wsClient.WriteJSON(subsMsg) } -// subscribeCandlePairs write the subscription msg to the provider. -func (p *KrakenProvider) subscribeCandlePairs(pairs ...string) error { +// subscribeCandles write the subscription msg to the provider. +func (p *KrakenProvider) subscribeCandles(pairs ...string) error { subsMsg := newKrakenCandleSubscriptionMsg(pairs...) return p.wsClient.WriteJSON(subsMsg) } From df18b76490cfa656c2abcd9c6b41b899b7de2455 Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 21:15:02 -0300 Subject: [PATCH 05/11] refactor: split subscribed channels of tickers and candles --- price-feeder/oracle/provider/okx.go | 75 ++++++++++++++++++----------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index f726bb467b..75a3f3f943 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -109,7 +109,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C } provider.wsClient.SetPongHandler(provider.pongHandler) - if err := provider.SubscribeTickers(pairs...); err != nil { + if err := provider.SubscribeCurrencyPais(pairs...); err != nil { return nil, err } @@ -150,20 +150,9 @@ func (p *OkxProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][ return candlePrices, nil } -// SubscribeTickers subscribe all currency pairs into ticker and candle channels. -func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { - topics := make([]OkxSubscriptionTopic, len(cps)*2) - - iterator := 0 - for _, cp := range cps { - instId := currencyPairToOkxPair(cp) - topics[iterator] = newOkxSubscriptionTopic(instId) - iterator++ - topics[iterator] = newOkxCandleSubscriptionTopic(instId) - iterator++ - } - - if err := p.subscribePairs(topics...); err != nil { +// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { + if err := p.subscribeChannels(cps...); err != nil { return err } @@ -171,6 +160,45 @@ func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { return nil } +// subscribeChannels subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) subscribeChannels(cps ...types.CurrencyPair) error { + if err := p.subscribeTickers(cps...); err != nil { + return err + } + + return p.subscribeCandles(cps...) +} + +// subscribeTickers subscribe all currency pairs into ticker channel. +func (p *OkxProvider) subscribeTickers(cps ...types.CurrencyPair) error { + topics := make([]OkxSubscriptionTopic, len(cps)) + + for i, cp := range cps { + topics[i] = newOkxTickerSubscriptionTopic(currencyPairToOkxPair(cp)) + } + + return p.subscribePairs(topics...) +} + +// subscribeCandles subscribe all currency pairs into candle channel. +func (p *OkxProvider) subscribeCandles(cps ...types.CurrencyPair) error { + topics := make([]OkxSubscriptionTopic, len(cps)) + + for i, cp := range cps { + topics[i] = newOkxCandleSubscriptionTopic(currencyPairToOkxPair(cp)) + } + + return p.subscribePairs(topics...) +} + +// subscribedPairsToSlice returns the map of subscribed pairs as slice +func (p *OkxProvider) subscribedPairsToSlice() []types.CurrencyPair { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return subscribedPairsToSlice(p.subscribedPairs) +} + func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { p.mtx.RLock() defer p.mtx.RUnlock() @@ -342,17 +370,8 @@ func (p *OkxProvider) reconnect() error { wsConn.SetPongHandler(p.pongHandler) p.wsClient = wsConn - topics := make([]OkxSubscriptionTopic, len(p.subscribedPairs)*2) - iterator := 0 - for _, cp := range p.subscribedPairs { - instId := currencyPairToOkxPair(cp) - topics[iterator] = newOkxSubscriptionTopic(instId) - iterator++ - topics[iterator] = newOkxCandleSubscriptionTopic(instId) - iterator++ - } - - return p.subscribePairs(topics...) + currencyPairs := p.subscribedPairsToSlice() + return p.subscribeChannels(currencyPairs...) } // ping to check websocket connection. @@ -379,8 +398,8 @@ func currencyPairToOkxPair(pair types.CurrencyPair) string { return pair.Base + "-" + pair.Quote } -// newOkxSubscriptionTopic returns a new subscription topic. -func newOkxSubscriptionTopic(instId string) OkxSubscriptionTopic { +// newOkxTickerSubscriptionTopic returns a new subscription topic. +func newOkxTickerSubscriptionTopic(instId string) OkxSubscriptionTopic { return OkxSubscriptionTopic{ Channel: "tickers", InstId: instId, From baad27a4edb43f365ef5c32713b428a2a22db688 Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 21:22:32 -0300 Subject: [PATCH 06/11] add split subscription of ticker and candle --- price-feeder/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index 8f33b45094..b480613e86 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -67,6 +67,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Refactor - [#587](https://github.com/umee-network/umee/pull/587) Clean up logs from price feeder providers. +- [#610](https://github.com/umee-network/umee/pull/610) Split subscribtion of ticker and candle channels. ## [v0.1.0](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.1.0) - 2022-02-07 From e1d1cb4dc4a73dd68a0676d356b4d0d85f12912a Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Sun, 6 Mar 2022 21:58:05 -0300 Subject: [PATCH 07/11] feat: add error check for abnormalClosure --- price-feeder/oracle/provider/kraken.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index 81b477c3b7..4bca2b8030 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -225,6 +225,12 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { case <-time.After(defaultReadNewWSMessage): messageType, bz, err := p.wsClient.ReadMessage() if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) { + p.logger.Err(err).Msg("WebSocket closed unexpectedly") + p.keepReconnecting() + continue + } + // if some error occurs continue to try to read the next message. p.logger.Err(err).Msg("could not read message") if err := p.ping(); err != nil { @@ -409,6 +415,7 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error { // reconnect closes the last WS connection and create a new one. func (p *KrakenProvider) reconnect() error { p.wsClient.Close() + p.logger.Debug().Msg("trying to reconnect") wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil) if err != nil { From aa0b20e6934c14973bcf293347df4652fd25b87d Mon Sep 17 00:00:00 2001 From: Rafael Tenfen Date: Mon, 7 Mar 2022 09:44:59 -0300 Subject: [PATCH 08/11] Update function names Co-authored-by: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> --- price-feeder/oracle/provider/binance.go | 12 ++++++------ price-feeder/oracle/provider/provider.go | 8 +++----- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index 2d114e58f8..dfa604c97a 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -91,7 +91,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeCurrencyPais(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -133,7 +133,7 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri } // SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. -func (p *BinanceProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { +func (p *BinanceProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { if err := p.subscribeChannels(cps...); err != nil { return err } @@ -142,7 +142,7 @@ func (p *BinanceProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error return nil } -// subscribeChannels subscribe all currency pairs into ticker and candle channels. +// subscribeChannels subscribe to the ticker and candle channels for all currency pairs. func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error { if err := p.subscribeTickers(cps...); err != nil { return err @@ -151,7 +151,7 @@ func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error { return p.subscribeCandles(cps...) } -// subscribeTickers subscribe all currency pairs into ticker channel. +// subscribeTickers subscribe to the ticker channel for all currency pairs. func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { pairs := make([]string, len(cps)) @@ -162,7 +162,7 @@ func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { return p.subscribePairs(pairs...) } -// subscribeCandles subscribe all currency pairs into candle channel. +// subscribeCandles subscribe to the candle channel for all currency pairs. func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error { pairs := make([]string, len(cps)) @@ -173,7 +173,7 @@ func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error { return p.subscribePairs(pairs...) } -// subscribedPairsToSlice returns the map of subscribed pairs as slice +// subscribedPairsToSlice returns the map of subscribed pairs as a slice. func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 0c03e620bc..2a07d89b1e 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -94,12 +94,10 @@ func PastUnixTime(t time.Duration) int64 { // subscribedPairsToSlice returns the map of subscribed pairs as slice func subscribedPairsToSlice(subscribedPairs map[string]types.CurrencyPair) []types.CurrencyPair { - currencyPairs := make([]types.CurrencyPair, len(subscribedPairs)) - - iterator := 0 + var currencyPairs []types.CurrencyPair + for _, cp := range subscribedPairs { - currencyPairs[iterator] = cp - iterator++ + currencyPairs = append(currencyPairs, cp) } return currencyPairs From 55a1c4310bcc55b4fc6284be10ca21a2e9c8402e Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Mon, 7 Mar 2022 09:48:57 -0300 Subject: [PATCH 09/11] refactor: update function names --- price-feeder/oracle/provider/binance.go | 4 ++-- price-feeder/oracle/provider/huobi.go | 8 ++++---- price-feeder/oracle/provider/kraken.go | 8 ++++---- price-feeder/oracle/provider/okx.go | 8 ++++---- price-feeder/oracle/provider/provider.go | 8 ++++---- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index dfa604c97a..2166029bae 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -132,7 +132,7 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri return candlePrices, nil } -// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. func (p *BinanceProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { if err := p.subscribeChannels(cps...); err != nil { return err @@ -178,7 +178,7 @@ func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - return subscribedPairsToSlice(p.subscribedPairs) + return mapPairsToSlice(p.subscribedPairs) } func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index 6d217a190f..83191f9004 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -98,7 +98,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeCurrencyPais(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -137,8 +137,8 @@ func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string return candlePrices, nil } -// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. -func (p *HuobiProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { if err := p.subscribeChannels(cps...); err != nil { return err } @@ -183,7 +183,7 @@ func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - return subscribedPairsToSlice(p.subscribedPairs) + return mapPairsToSlice(p.subscribedPairs) } func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) { diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index 4bca2b8030..1afe69601e 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -105,7 +105,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.SubscribeCurrencyPais(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -149,8 +149,8 @@ func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[strin return candlePrices, nil } -// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. -func (p *KrakenProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *KrakenProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { if err := p.subscribeChannels(cps...); err != nil { return err } @@ -179,7 +179,7 @@ func (p *KrakenProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - return subscribedPairsToSlice(p.subscribedPairs) + return mapPairsToSlice(p.subscribedPairs) } func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) { diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index 75a3f3f943..38ac84e6ca 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -109,7 +109,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C } provider.wsClient.SetPongHandler(provider.pongHandler) - if err := provider.SubscribeCurrencyPais(pairs...); err != nil { + if err := provider.SubscribeCurrencyPairs(pairs...); err != nil { return nil, err } @@ -150,8 +150,8 @@ func (p *OkxProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][ return candlePrices, nil } -// SubscribeCurrencyPais subscribe all currency pairs into ticker and candle channels. -func (p *OkxProvider) SubscribeCurrencyPais(cps ...types.CurrencyPair) error { +// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error { if err := p.subscribeChannels(cps...); err != nil { return err } @@ -196,7 +196,7 @@ func (p *OkxProvider) subscribedPairsToSlice() []types.CurrencyPair { p.mtx.RLock() defer p.mtx.RUnlock() - return subscribedPairsToSlice(p.subscribedPairs) + return mapPairsToSlice(p.subscribedPairs) } func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 2a07d89b1e..29ee51a5f2 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -92,11 +92,11 @@ func PastUnixTime(t time.Duration) int64 { return time.Now().Add(t*-1).Unix() * int64(time.Second/time.Millisecond) } -// subscribedPairsToSlice returns the map of subscribed pairs as slice -func subscribedPairsToSlice(subscribedPairs map[string]types.CurrencyPair) []types.CurrencyPair { +// mapPairsToSlice returns the map of currency pairs as slice. +func mapPairsToSlice(mapPairs map[string]types.CurrencyPair) []types.CurrencyPair { var currencyPairs []types.CurrencyPair - - for _, cp := range subscribedPairs { + + for _, cp := range mapPairs { currencyPairs = append(currencyPairs, cp) } From d654b0a3af54c596fd25eda907c1e00fd0b61adb Mon Sep 17 00:00:00 2001 From: Rafael Tenfen Date: Mon, 7 Mar 2022 11:59:22 -0300 Subject: [PATCH 10/11] Update mapPairsToSlice to use index Co-authored-by: Aleksandr Bezobchuk --- price-feeder/oracle/provider/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 29ee51a5f2..b17957bed4 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -94,7 +94,7 @@ func PastUnixTime(t time.Duration) int64 { // mapPairsToSlice returns the map of currency pairs as slice. func mapPairsToSlice(mapPairs map[string]types.CurrencyPair) []types.CurrencyPair { - var currencyPairs []types.CurrencyPair + currencyPairs := make([]types.CurrencyPair, len(mapPairs)) for _, cp := range mapPairs { currencyPairs = append(currencyPairs, cp) From 66108e045a6dd39fd644bef0495f020233d329cd Mon Sep 17 00:00:00 2001 From: RafilxTenfen Date: Mon, 7 Mar 2022 12:01:06 -0300 Subject: [PATCH 11/11] refactor: change from append to use iterator --- price-feeder/oracle/provider/provider.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index b17957bed4..6892d54124 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -96,8 +96,10 @@ func PastUnixTime(t time.Duration) int64 { func mapPairsToSlice(mapPairs map[string]types.CurrencyPair) []types.CurrencyPair { currencyPairs := make([]types.CurrencyPair, len(mapPairs)) + iterator := 0 for _, cp := range mapPairs { - currencyPairs = append(currencyPairs, cp) + currencyPairs[iterator] = cp + iterator++ } return currencyPairs