From 58d01fb2d4440a67febd278a9e8115f2a4e2bbfd Mon Sep 17 00:00:00 2001 From: Rafael Tenfen Date: Wed, 2 Mar 2022 20:04:04 -0300 Subject: [PATCH] feat: add SubscribeTickers func to providers (#592) * feat: add SubscribeTickers to binance provider * feat: add SubscribeTickers to huobi provider * feat: add SubscribeTickers to huobi provider * feat: add SubscribeTickers to Okx provider * add subscribe tickers func to changelog * removed unused method * fix: set right amount of topics on reconnect * chore: update godocs * chore: update godocs * chore: update godocs * chore: update godocs Co-authored-by: Aleksandr Bezobchuk (cherry picked from commit 380ec4925f90c3707ed916f1911d629d40d88cb0) --- price-feeder/CHANGELOG.md | 1 + price-feeder/oracle/provider/binance.go | 149 +++++++++------- price-feeder/oracle/provider/binance_test.go | 6 + price-feeder/oracle/provider/huobi.go | 169 ++++++++++--------- price-feeder/oracle/provider/huobi_test.go | 6 + price-feeder/oracle/provider/kraken.go | 60 +++---- price-feeder/oracle/provider/okx.go | 163 ++++++++++-------- price-feeder/oracle/provider/okx_test.go | 6 + 8 files changed, 315 insertions(+), 245 deletions(-) diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index c3239f6c93..d0b7e35f6b 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -54,6 +54,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket. - [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket. - [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket. +- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx. ### Bug Fixes diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index a42d619a33..ba2fdd5a1f 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -31,18 +31,17 @@ type ( wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mu sync.Mutex - tickers map[string]BinanceTicker // Symbol => BinanceTicker - candles map[string]BinanceCandle // Symbol => BinanceCandle - subscribedPairs []types.CurrencyPair + mtx sync.Mutex + tickers map[string]BinanceTicker // Symbol => BinanceTicker + candles map[string]BinanceCandle // Symbol => BinanceCandle + subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } - // BinanceTicker ticker price response - // https://pkg.go.dev/encoding/json#Unmarshal - // Unmarshal matches incoming object keys to the keys - // used by Marshal (either the struct field name or its tag), - // preferring an exact match but also accepting a case-insensitive match - // C is not used, but it avoids to implement specific UnmarshalJSON + // BinanceTicker ticker price response. https://pkg.go.dev/encoding/json#Unmarshal + // Unmarshal matches incoming object keys to the keys used by Marshal (either the + // struct field name or its tag), preferring an exact match but also accepting a + // case-insensitive match. C field which is Statistics close time is not used, but + // it avoids to implement specific UnmarshalJSON. BinanceTicker struct { Symbol string `json:"s"` // Symbol ex.: BTCUSDT LastPrice string `json:"c"` // Last price ex.: 0.0025 @@ -50,18 +49,20 @@ type ( C uint64 `json:"C"` // Statistics close time } + // BinanceCandleMetadata candle metadata used to compute tvwap price. BinanceCandleMetadata struct { Close string `json:"c"` // Price at close TimeStamp int64 `json:"T"` // Close time in unix epoch ex.: 1645756200000 Volume string `json:"v"` // Volume during period } + // BinanceCandle candle binance websocket channel "kline_1m" response. BinanceCandle struct { Symbol string `json:"s"` // Symbol ex.: BTCUSDT Metadata BinanceCandleMetadata `json:"k"` // Metadata for candle } - // BinanceSubscribeMsg Msg to subscribe all the tickers channels + // BinanceSubscribeMsg Msg to subscribe all the tickers channels. BinanceSubscriptionMsg struct { Method string `json:"method"` // SUBSCRIBE/UNSUBSCRIBE Params []string `json:"params"` // streams to subscribe ex.: usdtatom@ticker @@ -87,13 +88,10 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ logger: logger.With().Str("provider", "binance").Logger(), tickers: map[string]BinanceTicker{}, candles: map[string]BinanceCandle{}, - subscribedPairs: pairs, + subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.subscribeTickers(pairs...); err != nil { - return nil, err - } - if err := provider.subscribeCandles(pairs...); err != nil { + if err := provider.SubscribeTickers(pairs...); err != nil { return nil, err } @@ -102,6 +100,26 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ return provider, nil } +// SubscribeTickers subscribe all currency pairs into ticker and candle channels. +func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)*2) + + iterator := 0 + for _, cp := range cps { + pairs[iterator] = currencyPairToBinanceTickerPair(cp) + iterator++ + pairs[iterator] = currencyPairToBinanceCandlePair(cp) + iterator++ + } + + if err := p.subscribePairs(pairs...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + // GetTickerPrices returns the tickerPrices based on the provided pairs. func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { tickerPrices := make(map[string]TickerPrice, len(pairs)) @@ -150,12 +168,14 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) { if err := json.Unmarshal(bz, &tickerResp); err != nil { p.logger.Debug().Err(err).Msg("could not unmarshal ticker response") } - if err := json.Unmarshal(bz, &candleResp); err != nil { - p.logger.Debug().Err(err).Msg("could not unmarshal candle response") - } - if len(tickerResp.LastPrice) != 0 { p.setTickerPair(tickerResp) + return + } + + if err := json.Unmarshal(bz, &candleResp); err != nil { + p.logger.Debug().Err(err).Msg("could not unmarshal candle response") + return } if len(candleResp.Metadata.Close) != 0 { p.setCandlePair(candleResp) @@ -163,14 +183,14 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) { } func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() p.tickers[ticker.Symbol] = ticker } func (p *BinanceProvider) setCandlePair(candle BinanceCandle) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() p.candles[candle.Symbol] = candle } @@ -178,30 +198,6 @@ func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) { return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume) } -// subscribeTickers subscribe to all currency pairs -func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { - params := make([]string, len(cps)) - - for i, cp := range cps { - params[i] = strings.ToLower(cp.String() + "@ticker") - } - - subsMsg := newBinanceSubscriptionMsg(params...) - return p.wsClient.WriteJSON(subsMsg) -} - -// subscribeCandles subscribe to all candles -func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error { - params := make([]string, len(cps)) - - for i, cp := range cps { - params[i] = strings.ToLower(cp.String() + "@kline_1m") - } - - subsMsg := newBinanceSubscriptionMsg(params...) - return p.wsClient.WriteJSON(subsMsg) -} - func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { reconnectTicker := time.NewTicker(defaultMaxConnectionTime) defer reconnectTicker.Stop() @@ -213,7 +209,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { case <-time.After(defaultReadNewWSMessage): messageType, bz, err := p.wsClient.ReadMessage() if err != nil { - // if some error occurs continue to try to read the next message + // if some error occurs continue to try to read the next message. p.logger.Err(err).Msg("could not read message") continue } @@ -233,13 +229,12 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { } } -// reconnect closes the last WS connection and create a new one -// A single connection to stream.binance.com is only valid for 24 hours; -// expect to be disconnected at the 24 hour mark -// The websocket server will send a ping frame every 3 minutes. -// If the websocket server does not receive a pong frame back from -// the connection within a 10 minute period, the connection will be disconnected. -// Unsolicited pong frames are allowed. +// reconnect closes the last WS connection then create a new one and subscribe to +// all subscribed pairs in the ticker and candle pais. A single connection to +// stream.binance.com is only valid for 24 hours; expect to be disconnected at the +// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If +// the websocket server does not receive a pong frame back from the connection +// within a 10 minute period, the connection will be disconnected. func (p *BinanceProvider) reconnect() error { p.wsClient.Close() @@ -250,10 +245,16 @@ func (p *BinanceProvider) reconnect() error { } p.wsClient = wsConn - if err := p.subscribeCandles(p.subscribedPairs...); err != nil { - return err + pairs := make([]string, len(p.subscribedPairs)*2) + iterator := 0 + for _, cp := range p.subscribedPairs { + pairs[iterator] = currencyPairToBinanceTickerPair(cp) + iterator++ + pairs[iterator] = currencyPairToBinanceCandlePair(cp) + iterator++ } - return p.subscribeTickers(p.subscribedPairs...) + + return p.subscribePairs(pairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in reconnect. @@ -276,7 +277,35 @@ func (p *BinanceProvider) keepReconnecting() { } } -// newBinanceSubscriptionMsg returns a new subscription Msg +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *BinanceProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// subscribePairs write the subscription msg to the provider. +func (p *BinanceProvider) subscribePairs(pairs ...string) error { + subsMsg := newBinanceSubscriptionMsg(pairs...) + return p.wsClient.WriteJSON(subsMsg) +} + +// currencyPairToBinanceTickerPair receives a currency pair and return binance +// ticker symbol atomusdt@ticker. +func currencyPairToBinanceTickerPair(cp types.CurrencyPair) string { + return strings.ToLower(cp.String() + "@ticker") +} + +// currencyPairToBinanceCandlePair receives a currency pair and return binance +// candle symbol atomusdt@kline_1m. +func currencyPairToBinanceCandlePair(cp types.CurrencyPair) string { + return strings.ToLower(cp.String() + "@kline_1m") +} + +// newBinanceSubscriptionMsg returns a new subscription Msg. func newBinanceSubscriptionMsg(params ...string) BinanceSubscriptionMsg { return BinanceSubscriptionMsg{ Method: "SUBSCRIBE", diff --git a/price-feeder/oracle/provider/binance_test.go b/price-feeder/oracle/provider/binance_test.go index cf08637cbc..f7acbaa1b1 100644 --- a/price-feeder/oracle/provider/binance_test.go +++ b/price-feeder/oracle/provider/binance_test.go @@ -72,3 +72,9 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) { require.Nil(t, prices) }) } + +func TestBinanceCurrencyPairToBinancePair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + binanceSymbol := currencyPairToBinanceTickerPair(cp) + require.Equal(t, binanceSymbol, "atomusdt@ticker") +} diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index 229e1aa2f1..f2528a59f8 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -31,33 +31,33 @@ type ( // API. // // REF: https://huobiapi.github.io/docs/spot/v1/en/#market-ticker - // REF : https://huobiapi.github.io/docs/spot/v1/en/#get-klines-candles + // REF: https://huobiapi.github.io/docs/spot/v1/en/#get-klines-candles HuobiProvider struct { wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mu sync.Mutex - tickers map[string]HuobiTicker // market.$symbol.ticker => HuobiTicker - candles map[string]HuobiCandle // market.$symbol.kline.$period => HuobiCandle - subscribedPairs []types.CurrencyPair + mtx sync.Mutex + tickers map[string]HuobiTicker // market.$symbol.ticker => HuobiTicker + candles map[string]HuobiCandle // market.$symbol.kline.$period => HuobiCandle + subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } - // HuobiTicker defines the response type for the channel and - // the tick object for a given ticker/symbol. + // HuobiTicker defines the response type for the channel and the tick object for a + // given ticker/symbol. HuobiTicker struct { CH string `json:"ch"` // Channel name. Format:market.$symbol.ticker Tick HuobiTick `json:"tick"` } - // HuobiTick defines the response type for the last 24h market summary - // and the last traded price for a given ticker/symbol. + // HuobiTick defines the response type for the last 24h market summary and the last + // traded price for a given ticker/symbol. HuobiTick struct { Vol float64 `json:"vol"` // Accumulated trading value of last 24 hours LastPrice float64 `json:"lastPrice"` // Last traded price } - // HuobiCandle defines the response type for the channel and - // the tick object for a given ticker/symbol. + // HuobiCandle defines the response type for the channel and the tick object for a + // given ticker/symbol. HuobiCandle struct { CH string `json:"ch"` // Channel name. Format:market.$symbol.kline.$period Tick HuobiCandleTick `json:"tick"` @@ -70,7 +70,7 @@ type ( Volume float64 `json:"volume"` // Volume during this period } - // HuobiSubscriptionMsg Msg to subscribe to one ticker channel at time + // HuobiSubscriptionMsg Msg to subscribe to one ticker channel at time. HuobiSubscriptionMsg struct { Sub string `json:"sub"` // channel to subscribe market.$symbol.ticker } @@ -95,13 +95,10 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types logger: logger.With().Str("provider", "huobi").Logger(), tickers: map[string]HuobiTicker{}, candles: map[string]HuobiCandle{}, - subscribedPairs: pairs, + subscribedPairs: map[string]types.CurrencyPair{}, } - if err := provider.subscribeTickers(pairs...); err != nil { - return nil, err - } - if err := provider.subscribeCandles(pairs...); err != nil { + if err := provider.SubscribeTickers(pairs...); err != nil { return nil, err } @@ -110,27 +107,33 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types return provider, nil } -// subscribeTickers subscribe to all currency pairs. -func (p *HuobiProvider) subscribeTickers(cps ...types.CurrencyPair) error { - for _, cp := range cps { - huobiSubscriptionMsg := newHuobiSubscriptionMsg(cp) - if err := p.wsClient.WriteJSON(huobiSubscriptionMsg); err != nil { - return err +// GetTickerPrices returns the tickerPrices based on the saved map. +func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { + tickerPrices := make(map[string]TickerPrice, len(pairs)) + + for _, cp := range pairs { + price, err := p.getTickerPrice(cp) + if err != nil { + return nil, err } + tickerPrices[cp.String()] = price } - return nil + return tickerPrices, nil } -// subscribeCandles subscribe to candles for currency pairs. -func (p *HuobiProvider) subscribeCandles(cps ...types.CurrencyPair) error { +// SubscribeTickers subscribe all currency pairs into ticker and candle channels. +func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error { for _, cp := range cps { - huobiSubscriptionMsg := newHuobiCandleSubscriptionMsg(cp) - if err := p.wsClient.WriteJSON(huobiSubscriptionMsg); err != nil { + if err := p.subscribeTickerPair(cp); err != nil { + return err + } + if err := p.subscribeCandlePair(cp); err != nil { return err } } + p.setSubscribedPairs(cps...) return nil } @@ -169,9 +172,9 @@ func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) { } } -// messageReceived handles the received data from the Huobi websocket. -// All return data of websocket Market APIs are compressed with -// GZIP so they need to be decompressed. +// messageReceived handles the received data from the Huobi websocket. All return +// data of websocket Market APIs are compressed with GZIP so they need to be +// decompressed. func (p *HuobiProvider) messageReceived(messageType int, bz []byte, reconnectTicker *time.Ticker) { if messageType != websocket.BinaryMessage { return @@ -197,27 +200,26 @@ func (p *HuobiProvider) messageReceived(messageType int, bz []byte, reconnectTic if err := json.Unmarshal(bz, &tickerResp); err != nil { p.logger.Debug().Err(err).Msg("failed to unmarshal message") } - if err := json.Unmarshal(bz, &candleResp); err != nil { - p.logger.Debug().Err(err).Msg("failed to unmarshal message") - } - if tickerResp.Tick.LastPrice != 0 { p.setTickerPair(tickerResp) return } + + if err := json.Unmarshal(bz, &candleResp); err != nil { + p.logger.Debug().Err(err).Msg("failed to unmarshal message") + return + } if candleResp.Tick.Close != 0 { p.setCandlePair(candleResp) } } -// pong return a heartbeat message when a "ping" is received -// and reset the recconnect ticker because the connection is alive -// After connected to Huobi's Websocket server, -// the server will send heartbeat periodically (5s interval). -// When client receives an heartbeat message, it should respond -// with a matching "pong" message which has the same integer in it, e.g. -// {"ping": 1492420473027} and the return should be -// {"pong": 1492420473027} +// pong return a heartbeat message when a "ping" is received and reset the +// recconnect ticker because the connection is alive. After connected to Huobi's +// Websocket server, the server will send heartbeat periodically (5s interval). +// When client receives an heartbeat message, it should respond with a matching +// "pong" message which has the same integer in it, e.g. {"ping": 1492420473027} +// and then the return pong message should be {"pong": 1492420473027}. func (p *HuobiProvider) pong(bz []byte, reconnectTicker *time.Ticker) { reconnectTicker.Reset(huobiReconnectTime) var heartbeat struct { @@ -242,18 +244,18 @@ func (p *HuobiProvider) ping() error { } func (p *HuobiProvider) setTickerPair(ticker HuobiTicker) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() p.tickers[ticker.CH] = ticker } func (p *HuobiProvider) setCandlePair(candle HuobiCandle) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() p.candles[candle.CH] = candle } -// reconnect closes the last WS connection and create a new one +// reconnect closes the last WS connection and create a new one. func (p *HuobiProvider) reconnect() error { p.wsClient.Close() @@ -264,31 +266,32 @@ func (p *HuobiProvider) reconnect() error { } p.wsClient = wsConn - err = p.subscribeTickers(p.subscribedPairs...) - if err != nil { - return err + for _, cp := range p.subscribedPairs { + if err := p.subscribeTickerPair(cp); err != nil { + return err + } + if err := p.subscribeCandlePair(cp); err != nil { + return err + } } - return p.subscribeCandles(p.subscribedPairs...) + return nil } -// GetTickerPrices returns the tickerPrices based on the saved map. -func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { - tickerPrices := make(map[string]TickerPrice, len(pairs)) - - for _, cp := range pairs { - price, err := p.getTickerPrice(cp) - if err != nil { - return nil, err - } - tickerPrices[cp.String()] = price - } +// subscribeTickerPair write the subscription ticker msg to the provider. +func (p *HuobiProvider) subscribeTickerPair(cp types.CurrencyPair) error { + huobiSubscriptionMsg := newHuobiTickerSubscriptionMsg(cp) + return p.wsClient.WriteJSON(huobiSubscriptionMsg) +} - return tickerPrices, nil +// subscribeCandlePair write the subscription candle msg to the provider. +func (p *HuobiProvider) subscribeCandlePair(cp types.CurrencyPair) error { + huobiSubscriptionCandleMsg := newHuobiCandleSubscriptionMsg(cp) + return p.wsClient.WriteJSON(huobiSubscriptionCandleMsg) } func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { - ticker, ok := p.tickers[getChannelTicker(cp)] + ticker, ok := p.tickers[currencyPairToHuobiTickerPair(cp)] if !ok { return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String()) } @@ -296,9 +299,18 @@ func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, erro return ticker.toTickerPrice() } -// decompressGzip uncompress gzip compressed messages -// All data returned from the websocket Market APIs is compressed -// with GZIP, so it needs to be unzipped. +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *HuobiProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// decompressGzip uncompress gzip compressed messages. All data returned from the +// websocket Market APIs is compressed with GZIP, so it needs to be unzipped. func decompressGzip(bz []byte) ([]byte, error) { r, err := gzip.NewReader(bytes.NewReader(bz)) if err != nil { @@ -308,6 +320,7 @@ func decompressGzip(bz []byte) ([]byte, error) { return ioutil.ReadAll(r) } +// toTickerPrice converts current HuobiTicker to TickerPrice. func (ticker HuobiTicker) toTickerPrice() (TickerPrice, error) { return newTickerPrice( "Huobi", @@ -317,26 +330,28 @@ func (ticker HuobiTicker) toTickerPrice() (TickerPrice, error) { ) } -// newHuobiSubscriptionMsg returns a new subscription Msg -func newHuobiSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg { +// newHuobiTickerSubscriptionMsg returns a new ticker subscription Msg. +func newHuobiTickerSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg { return HuobiSubscriptionMsg{ - Sub: getChannelTicker(cp), + Sub: currencyPairToHuobiTickerPair(cp), } } -// getChannelTicker returns the channel name in the Format:market.$symbol.ticker -func getChannelTicker(cp types.CurrencyPair) string { +// currencyPairToHuobiTickerPair returns the channel name in the following format: +// "market.$symbol.ticker". +func currencyPairToHuobiTickerPair(cp types.CurrencyPair) string { return strings.ToLower("market." + cp.String() + ".ticker") } -// newHuobiSubscriptionMsg returns a new subscription Msg +// newHuobiSubscriptionMsg returns a new candle subscription Msg. func newHuobiCandleSubscriptionMsg(cp types.CurrencyPair) HuobiSubscriptionMsg { return HuobiSubscriptionMsg{ - Sub: getCandleTicker(cp), + Sub: currencyPairToHuobiCandlePair(cp), } } -// getCandleTicker returns the channel name in the Format:market.$symbol.line.$period -func getCandleTicker(cp types.CurrencyPair) string { +// currencyPairToHuobiCandlePair returns the channel name in the following format: +// "market.$symbol.line.$period". +func currencyPairToHuobiCandlePair(cp types.CurrencyPair) string { return strings.ToLower("market." + cp.String() + ".kline.1min") } diff --git a/price-feeder/oracle/provider/huobi_test.go b/price-feeder/oracle/provider/huobi_test.go index c3f0a87e07..48073ade5a 100644 --- a/price-feeder/oracle/provider/huobi_test.go +++ b/price-feeder/oracle/provider/huobi_test.go @@ -79,3 +79,9 @@ func TestHuobiProvider_GetTickerPrices(t *testing.T) { require.Nil(t, prices) }) } + +func TestHuobiCurrencyPairToHuobiPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + binanceSymbol := currencyPairToHuobiTickerPair(cp) + require.Equal(t, binanceSymbol, "market.atomusdt.ticker") +} diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index c098dd8e15..b77e4ea460 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -38,14 +38,14 @@ type ( } // KrakenTicker ticker price response from Kraken ticker channel. - // https://docs.kraken.com/websockets/#message-ticker + // REF: https://docs.kraken.com/websockets/#message-ticker KrakenTicker struct { C []string `json:"c"` // Close with Price in the first position V []string `json:"v"` // Volume with the value over last 24 hours in the second position } // KrakenCandle candle response from Kraken candle channel. - // REF : https://docs.kraken.com/websockets/#message-ohlc + // REF: https://docs.kraken.com/websockets/#message-ohlc KrakenCandle struct { Close string // Close price during this period TimeStamp int64 // Linux epoch timestamp @@ -106,9 +106,6 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type if err := provider.SubscribeTickers(pairs...); err != nil { return nil, err } - if err := provider.SubscribeCandles(pairs...); err != nil { - return nil, err - } go provider.handleWebSocketMsgs(ctx) @@ -131,8 +128,7 @@ func (p *KrakenProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[strin return tickerPrices, nil } -// SubscribeTickers subscribe to all currency pairs and -// add the new ones into the provider subscribed pairs. +// SubscribeTickers subscribe all currency pairs into ticker and candle channels. func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { pairs := make([]string, len(cps)) @@ -140,22 +136,10 @@ func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { pairs[i] = currencyPairToKrakenPair(cp) } - if err := p.subscribePairs(pairs...); err != nil { + if err := p.subscribeTickerPairs(pairs...); err != nil { return err } - - p.setSubscribedPairs(cps...) - return nil -} - -func (p *KrakenProvider) SubscribeCandles(cps ...types.CurrencyPair) error { - candles := make([]string, len(cps)) - - for i, cp := range cps { - candles[i] = currencyPairToKrakenPair(cp) - } - - if err := p.subscribeCandlePairs(candles...); err != nil { + if err := p.subscribeCandlePairs(pairs...); err != nil { return err } @@ -163,8 +147,8 @@ func (p *KrakenProvider) SubscribeCandles(cps ...types.CurrencyPair) error { return nil } -// handleWebSocketMsgs receive all the messages from the provider -// and controls to reconnect the web socket. +// handleWebSocketMsgs receive all the messages from the provider and controls the +// reconnect function to the web socket. func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { reconnectTicker := time.NewTicker(defaultMaxConnectionTime) defer reconnectTicker.Stop() @@ -176,7 +160,7 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { case <-time.After(defaultReadNewWSMessage): messageType, bz, err := p.wsClient.ReadMessage() if err != nil { - // if some error occurs continue to try to read the next message + // if some error occurs continue to try to read the next message. p.logger.Err(err).Msg("could not read message") if err := p.ping(); err != nil { p.logger.Err(err).Msg("failed to send ping") @@ -208,7 +192,7 @@ func (p *KrakenProvider) messageReceived(messageType int, bz []byte) { var krakenEvent KrakenEvent if err := json.Unmarshal(bz, &krakenEvent); err != nil { - // msg is not an event, it will try to marshal to ticker message + // msg is not an event, it will try to marshal to ticker message. p.logger.Debug().Msg("received a message that is not an event") } else { switch krakenEvent.Event { @@ -222,7 +206,7 @@ func (p *KrakenProvider) messageReceived(messageType int, bz []byte) { } if err := p.messageReceivedTickerPrice(bz); err != nil { - // msg is not a ticker, it will try to marshal to candle message + // msg is not a ticker, it will try to marshal to candle message. p.logger.Debug().Err(err).Msg("unable to unmarshal ticker") } else { return @@ -368,7 +352,10 @@ func (p *KrakenProvider) reconnect() error { iterator++ } - return p.subscribePairs(pairs...) + if err := p.subscribeTickerPairs(pairs...); err != nil { + return err + } + return p.subscribeCandlePairs(pairs...) } // keepReconnecting keeps trying to reconnect if an error occurs in recconnect. @@ -412,7 +399,8 @@ func (p *KrakenProvider) messageReceivedSubscriptionStatus(bz []byte) { } } -// messageReceivedSystemStatus handle the system status and try to reconnect if it is not online. +// messageReceivedSystemStatus handle the system status and try to reconnect if it +// is not online. func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) { var systemStatus KrakenEventSystemStatus if err := json.Unmarshal(bz, &systemStatus); err != nil { @@ -445,9 +433,9 @@ func (p *KrakenProvider) ping() error { return p.wsClient.WriteMessage(websocket.PingMessage, ping) } -// subscribePairs write the subscription msg to the provider. -func (p *KrakenProvider) subscribePairs(pairs ...string) error { - subsMsg := newKrakenSubscriptionMsg(pairs...) +// subscribeTickerPairs write the subscription msg to the provider. +func (p *KrakenProvider) subscribeTickerPairs(pairs ...string) error { + subsMsg := newKrakenTickerSubscriptionMsg(pairs...) return p.wsClient.WriteJSON(subsMsg) } @@ -482,13 +470,13 @@ func (ticker KrakenTicker) toTickerPrice(symbol string) (TickerPrice, error) { if len(ticker.C) != 2 || len(ticker.V) != 2 { return TickerPrice{}, fmt.Errorf("error converting KrakenTicker to TickerPrice") } - // ticker.C has the Price in the first position - // ticker.V has the totla Value over last 24 hours in the second position + // ticker.C has the Price in the first position. + // ticker.V has the totla Value over last 24 hours in the second position. return newTickerPrice("Kraken", symbol, ticker.C[0], ticker.V[1]) } -// newKrakenSubscriptionMsg returns a new subscription Msg. -func newKrakenSubscriptionMsg(pairs ...string) KrakenSubscriptionMsg { +// newKrakenTickerSubscriptionMsg returns a new subscription Msg. +func newKrakenTickerSubscriptionMsg(pairs ...string) KrakenSubscriptionMsg { return KrakenSubscriptionMsg{ Event: "subscribe", Pair: pairs, @@ -522,7 +510,7 @@ func currencyPairToKrakenPair(cp types.CurrencyPair) string { } // normalizeKrakenBTCPair changes XBT pairs to BTC, -// since other providers list bitcoin as BTC +// since other providers list bitcoin as BTC. func normalizeKrakenBTCPair(ticker string) string { return strings.Replace(ticker, "XBT", "BTC", 1) } diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index a4180155e3..1dfd62ef55 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -32,62 +32,59 @@ type ( wsURL url.URL wsClient *websocket.Conn logger zerolog.Logger - mu sync.Mutex - tickers map[string]OkxTickerPair // InstId => OkxTickerPair - candles map[string]OkxCandlePair // InstId => 0kxCandlePair reconnectTimer *time.Ticker - subscribedPairs []types.CurrencyPair + mtx sync.Mutex + tickers map[string]OkxTickerPair // InstId => OkxTickerPair + candles map[string]OkxCandlePair // InstId => 0kxCandlePair + subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } - // OkxTickerPair defines a ticker pair of Okx + // OkxTickerPair defines a ticker pair of Okx. OkxTickerPair struct { InstId string `json:"instId"` // Instrument ID ex.: BTC-USDT Last string `json:"last"` // Last traded price ex.: 43508.9 Vol24h string `json:"vol24h"` // 24h trading volume ex.: 11159.87127845 } - // OkxInst defines the structure containing ID information for the - // OkxResponses + // OkxInst defines the structure containing ID information for the OkxResponses. OkxID struct { Channel string `json:"channel"` InstID string `json:"instId"` } - // OkxTickerResponse defines the response structure of a Okx ticker - // request. + // OkxTickerResponse defines the response structure of a Okx ticker request. OkxTickerResponse struct { Data []OkxTickerPair `json:"data"` ID OkxID `json:"arg"` } - // OkxCandlePair defines a candle for Okx + // OkxCandlePair defines a candle for Okx. OkxCandlePair struct { Close string `json:"c"` // Close price for this time period TimeStamp int64 `json:"ts"` // Linux epoch timestamp Volume string `json:"vol"` // Volume for this time period } - // OkxCandleResponse defines the response structure of a Okx candle - // request. + // OkxCandleResponse defines the response structure of a Okx candle request. OkxCandleResponse struct { Data [][]string `json:"data"` ID OkxID `json:"arg"` } - // OkxSubscriptionTopic Topic with the ticker to be subscribed/unsubscribed + // OkxSubscriptionTopic Topic with the ticker to be subscribed/unsubscribed. OkxSubscriptionTopic struct { Channel string `json:"channel"` // Channel name ex.: tickers InstId string `json:"instId"` // Instrument ID ex.: BTC-USDT } - // OkxSubscriptionMsg Message to subscribe/unsubscribe with N Topics + // OkxSubscriptionMsg Message to subscribe/unsubscribe with N Topics. OkxSubscriptionMsg struct { Op string `json:"op"` // Operation ex.: subscribe Args []OkxSubscriptionTopic `json:"args"` } ) -// NewOkxProvider creates a new OkxProvider +// NewOkxProvider creates a new OkxProvider. func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.CurrencyPair) (*OkxProvider, error) { wsURL := url.URL{ Scheme: "wss", @@ -104,18 +101,14 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C wsURL: wsURL, wsClient: wsConn, logger: logger.With().Str("provider", "okx").Logger(), + reconnectTimer: time.NewTicker(okxPingCheck), tickers: map[string]OkxTickerPair{}, candles: map[string]OkxCandlePair{}, - reconnectTimer: time.NewTicker(okxPingCheck), - subscribedPairs: pairs, + subscribedPairs: map[string]types.CurrencyPair{}, } provider.wsClient.SetPongHandler(provider.pongHandler) - if err := provider.subscribeTickers(pairs...); err != nil { - return nil, err - } - - if err := provider.subscribeCandles(pairs...); err != nil { + if err := provider.SubscribeTickers(pairs...); err != nil { return nil, err } @@ -124,7 +117,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C return provider, nil } -// GetTickerPrices returns the tickerPrices based on the saved map +// GetTickerPrices returns the tickerPrices based on the saved map. func (p *OkxProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { tickerPrices := make(map[string]TickerPrice, len(pairs)) @@ -140,8 +133,29 @@ func (p *OkxProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]T return tickerPrices, nil } +// SubscribeTickers subscribe all currency pairs into ticker and candle channels. +func (p *OkxProvider) SubscribeTickers(cps ...types.CurrencyPair) error { + topics := make([]OkxSubscriptionTopic, len(cps)*2) + + iterator := 0 + for _, cp := range cps { + instId := currencyPairToOkxPair(cp) + topics[iterator] = newOkxSubscriptionTopic(instId) + iterator++ + topics[iterator] = newOkxCandleSubscriptionTopic(instId) + iterator++ + } + + if err := p.subscribePairs(topics...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil +} + func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { - instrumentId := getInstrumentId(cp) + instrumentId := currencyPairToOkxPair(cp) tickerPair, ok := p.tickers[instrumentId] if !ok { return TickerPrice{}, fmt.Errorf("okx provider failed to get ticker price for %s", instrumentId) @@ -158,7 +172,7 @@ func (p *OkxProvider) handleReceivedTickers(ctx context.Context) { case <-time.After(defaultReadNewWSMessage): messageType, bz, err := p.wsClient.ReadMessage() if err != nil { - // if some error occurs continue to try to read the next message + // if some error occurs continue to try to read the next message. p.logger.Err(err).Msg("could not read message") if err := p.ping(); err != nil { p.logger.Err(err).Msg("could not send ping") @@ -173,7 +187,7 @@ func (p *OkxProvider) handleReceivedTickers(ctx context.Context) { p.resetReconnectTimer() p.messageReceived(messageType, bz) - case <-p.reconnectTimer.C: // reset by the pongHandler + case <-p.reconnectTimer.C: // reset by the pongHandler. if err := p.reconnect(); err != nil { p.logger.Err(err).Msg("error reconnecting") } @@ -195,14 +209,16 @@ func (p *OkxProvider) messageReceived(messageType int, bz []byte) { if err := json.Unmarshal(bz, &tickerResp); err != nil { p.logger.Debug().Err(err).Msg("could not unmarshal ticker resp") } - if err := json.Unmarshal(bz, &candleResp); err != nil { - p.logger.Debug().Err(err).Msg("could not unmarshal candle resp") - } - if tickerResp.ID.Channel == "tickers" { for _, tickerPair := range tickerResp.Data { p.setTickerPair(tickerPair) } + return + } + + if err := json.Unmarshal(bz, &candleResp); err != nil { + p.logger.Debug().Err(err).Msg("could not unmarshal candle resp") + return } if candleResp.ID.Channel == "candle1m" { for _, candlePair := range candleResp.Data { @@ -212,20 +228,26 @@ func (p *OkxProvider) messageReceived(messageType int, bz []byte) { } func (p *OkxProvider) setTickerPair(tickerPair OkxTickerPair) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() p.tickers[tickerPair.InstId] = tickerPair } +// subscribePairs write the subscription msg to the provider. +func (p *OkxProvider) subscribePairs(pairs ...OkxSubscriptionTopic) error { + subsMsg := newOkxSubscriptionMsg(pairs...) + return p.wsClient.WriteJSON(subsMsg) +} + func (p *OkxProvider) setCandlePair(pairData []string, instID string) { - p.mu.Lock() - defer p.mu.Unlock() + p.mtx.Lock() + defer p.mtx.Unlock() ts, err := strconv.ParseInt(pairData[0], 10, 64) if err != nil { return } - // the candlesticks channel uses an array of strings + // the candlesticks channel uses an array of strings. p.candles[instID] = OkxCandlePair{ Close: pairData[4], Volume: pairData[5], @@ -233,44 +255,30 @@ func (p *OkxProvider) setCandlePair(pairData []string, instID string) { } } -// subscribeTickers subscribe to all currency pairs -func (p *OkxProvider) subscribeTickers(cps ...types.CurrencyPair) error { - topics := make([]OkxSubscriptionTopic, len(cps)) +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *OkxProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() - for i, cp := range cps { - instId := getInstrumentId(cp) - topics[i] = newOkxSubscriptionTopic(instId) + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp } - - subsMsg := newOkxSubscriptionMsg(topics...) - return p.wsClient.WriteJSON(subsMsg) -} - -// subscribeCandles subscribe to all candles pairs -func (p *OkxProvider) subscribeCandles(cps ...types.CurrencyPair) error { - topics := make([]OkxSubscriptionTopic, len(cps)) - - for i, cp := range cps { - instId := getInstrumentId(cp) - topics[i] = newOkxCandleSubscriptionTopic(instId) - } - - subsMsg := newOkxSubscriptionMsg(topics...) - return p.wsClient.WriteJSON(subsMsg) } func (p *OkxProvider) resetReconnectTimer() { p.reconnectTimer.Reset(okxPingCheck) } -// reconnect closes the last WS connection and creates a new one. -// If there’s a network problem, the system will automatically disable the connection. -// The connection will break automatically if the subscription is not established or -// data has not been pushed for more than 30 seconds. -// To keep the connection stable: -// 1. Set a timer of N seconds whenever a response message is received, where N is less than 30. -// 2. If the timer is triggered, which means that no new message is received within N seconds, send the String 'ping'. -// 3. Expect a 'pong' as a response. If the response message is not received within N seconds, please raise an error or reconnect. +// reconnect closes the last WS connection and creates a new one. If there’s a +// network problem, the system will automatically disable the connection. The +// connection will break automatically if the subscription is not established or +// data has not been pushed for more than 30 seconds. To keep the connection stable: +// 1. Set a timer of N seconds whenever a response message is received, where N is +// less than 30. +// 2. If the timer is triggered, which means that no new message is received within +// N seconds, send the String 'ping'. +// 3. Expect a 'pong' as a response. If the response message is not received within +// N seconds, please raise an error or reconnect. func (p *OkxProvider) reconnect() error { p.wsClient.Close() @@ -282,10 +290,20 @@ func (p *OkxProvider) reconnect() error { wsConn.SetPongHandler(p.pongHandler) p.wsClient = wsConn - return p.subscribeTickers(p.subscribedPairs...) + topics := make([]OkxSubscriptionTopic, len(p.subscribedPairs)*2) + iterator := 0 + for _, cp := range p.subscribedPairs { + instId := currencyPairToOkxPair(cp) + topics[iterator] = newOkxSubscriptionTopic(instId) + iterator++ + topics[iterator] = newOkxCandleSubscriptionTopic(instId) + iterator++ + } + + return p.subscribePairs(topics...) } -// ping to check websocket connection +// ping to check websocket connection. func (p *OkxProvider) ping() error { return p.wsClient.WriteMessage(websocket.PingMessage, ping) } @@ -299,12 +317,13 @@ func (ticker OkxTickerPair) toTickerPrice() (TickerPrice, error) { return newTickerPrice("Okx", ticker.InstId, ticker.Last, ticker.Vol24h) } -// getInstrumentId returns the expected pair instrument ID for Okx ex.: BTC-USDT -func getInstrumentId(pair types.CurrencyPair) string { +// currencyPairToOkxPair returns the expected pair instrument ID for Okx +// ex.: "BTC-USDT". +func currencyPairToOkxPair(pair types.CurrencyPair) string { return pair.Base + "-" + pair.Quote } -// newOkxSubscriptionTopic returns a new subscription topic +// newOkxSubscriptionTopic returns a new subscription topic. func newOkxSubscriptionTopic(instId string) OkxSubscriptionTopic { return OkxSubscriptionTopic{ Channel: "tickers", @@ -312,7 +331,7 @@ func newOkxSubscriptionTopic(instId string) OkxSubscriptionTopic { } } -// newOkxSubscriptionTopic returns a new subscription topic +// newOkxSubscriptionTopic returns a new subscription topic. func newOkxCandleSubscriptionTopic(instId string) OkxSubscriptionTopic { return OkxSubscriptionTopic{ Channel: "candle1m", @@ -320,7 +339,7 @@ func newOkxCandleSubscriptionTopic(instId string) OkxSubscriptionTopic { } } -// newOkxSubscriptionMsg returns a new subscription Msg for Okx +// newOkxSubscriptionMsg returns a new subscription Msg for Okx. func newOkxSubscriptionMsg(args ...OkxSubscriptionTopic) OkxSubscriptionMsg { return OkxSubscriptionMsg{ Op: "subscribe", diff --git a/price-feeder/oracle/provider/okx_test.go b/price-feeder/oracle/provider/okx_test.go index 904838bffc..c535f7d7aa 100644 --- a/price-feeder/oracle/provider/okx_test.go +++ b/price-feeder/oracle/provider/okx_test.go @@ -72,3 +72,9 @@ func TestOkxProvider_GetTickerPrices(t *testing.T) { require.Nil(t, prices) }) } + +func TestOkxCurrencyPairToOkxPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + okxSymbol := currencyPairToOkxPair(cp) + require.Equal(t, okxSymbol, "ATOM-USDT") +}