Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SubscribeTickers func to providers (backport #592) #600

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket.
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.

### Bug Fixes

Expand Down
149 changes: 89 additions & 60 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,38 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mu sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs []types.CurrencyPair
mtx sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

// BinanceTicker ticker price response
// https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys
// used by Marshal (either the struct field name or its tag),
// preferring an exact match but also accepting a case-insensitive match
// C is not used, but it avoids to implement specific UnmarshalJSON
// BinanceTicker ticker price response. https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys used by Marshal (either the
// struct field name or its tag), preferring an exact match but also accepting a
// case-insensitive match. C field which is Statistics close time is not used, but
// it avoids to implement specific UnmarshalJSON.
BinanceTicker struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
LastPrice string `json:"c"` // Last price ex.: 0.0025
Volume string `json:"v"` // Total traded base asset volume ex.: 1000
C uint64 `json:"C"` // Statistics close time
}

// BinanceCandleMetadata candle metadata used to compute tvwap price.
BinanceCandleMetadata struct {
Close string `json:"c"` // Price at close
TimeStamp int64 `json:"T"` // Close time in unix epoch ex.: 1645756200000
Volume string `json:"v"` // Volume during period
}

// BinanceCandle candle binance websocket channel "kline_1m" response.
BinanceCandle struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
Metadata BinanceCandleMetadata `json:"k"` // Metadata for candle
}

// BinanceSubscribeMsg Msg to subscribe all the tickers channels
// BinanceSubscribeMsg Msg to subscribe all the tickers channels.
BinanceSubscriptionMsg struct {
Method string `json:"method"` // SUBSCRIBE/UNSUBSCRIBE
Params []string `json:"params"` // streams to subscribe ex.: usdtatom@ticker
Expand All @@ -87,13 +88,10 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
candles: map[string]BinanceCandle{},
subscribedPairs: pairs,
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.subscribeTickers(pairs...); err != nil {
return nil, err
}
if err := provider.subscribeCandles(pairs...); err != nil {
if err := provider.SubscribeTickers(pairs...); err != nil {
return nil, err
}

Expand All @@ -102,6 +100,26 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
return provider, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps)*2)

iterator := 0
for _, cp := range cps {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

if err := p.subscribePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))
Expand Down Expand Up @@ -150,58 +168,36 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
if err := json.Unmarshal(bz, &tickerResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal ticker response")
}
if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
}

if len(tickerResp.LastPrice) != 0 {
p.setTickerPair(tickerResp)
return
}

if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
return
}
if len(candleResp.Metadata.Close) != 0 {
p.setCandlePair(candleResp)
}
}

func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.tickers[ticker.Symbol] = ticker
}

func (p *BinanceProvider) setCandlePair(candle BinanceCandle) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.Symbol] = candle
}

func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume)
}

// subscribeTickers subscribe to all currency pairs
func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@ticker")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

// subscribeCandles subscribe to all candles
func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@kline_1m")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()
Expand All @@ -213,7 +209,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
continue
}
Expand All @@ -233,13 +229,12 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
}
}

// reconnect closes the last WS connection and create a new one
// A single connection to stream.binance.com is only valid for 24 hours;
// expect to be disconnected at the 24 hour mark
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from
// the connection within a 10 minute period, the connection will be disconnected.
// Unsolicited pong frames are allowed.
// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
p.wsClient.Close()

Expand All @@ -250,10 +245,16 @@ func (p *BinanceProvider) reconnect() error {
}
p.wsClient = wsConn

if err := p.subscribeCandles(p.subscribedPairs...); err != nil {
return err
pairs := make([]string, len(p.subscribedPairs)*2)
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}
return p.subscribeTickers(p.subscribedPairs...)

return p.subscribePairs(pairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
Expand All @@ -276,7 +277,35 @@ func (p *BinanceProvider) keepReconnecting() {
}
}

// newBinanceSubscriptionMsg returns a new subscription Msg
// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *BinanceProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

for _, cp := range cps {
p.subscribedPairs[cp.String()] = cp
}
}

// subscribePairs write the subscription msg to the provider.
func (p *BinanceProvider) subscribePairs(pairs ...string) error {
subsMsg := newBinanceSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}

// currencyPairToBinanceTickerPair receives a currency pair and return binance
// ticker symbol atomusdt@ticker.
func currencyPairToBinanceTickerPair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@ticker")
}

// currencyPairToBinanceCandlePair receives a currency pair and return binance
// candle symbol atomusdt@kline_1m.
func currencyPairToBinanceCandlePair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@kline_1m")
}

// newBinanceSubscriptionMsg returns a new subscription Msg.
func newBinanceSubscriptionMsg(params ...string) BinanceSubscriptionMsg {
return BinanceSubscriptionMsg{
Method: "SUBSCRIBE",
Expand Down
6 changes: 6 additions & 0 deletions price-feeder/oracle/provider/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) {
require.Nil(t, prices)
})
}

func TestBinanceCurrencyPairToBinancePair(t *testing.T) {
cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"}
binanceSymbol := currencyPairToBinanceTickerPair(cp)
require.Equal(t, binanceSymbol, "atomusdt@ticker")
}
Loading