Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split subscribe ticker and candle channels #610

Merged
merged 14 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Refactor

- [#587](https://github.com/umee-network/umee/pull/587) Clean up logs from price feeder providers.
- [#610](https://github.com/umee-network/umee/pull/610) Split subscribtion of ticker and candle channels.

## [v0.1.0](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.1.0) - 2022-02-07

Expand Down
83 changes: 52 additions & 31 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand All @@ -100,26 +100,6 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
return provider, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps)*2)

iterator := 0
for _, cp := range cps {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

if err := p.subscribePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))
Expand Down Expand Up @@ -152,6 +132,55 @@ func (p *BinanceProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[stri
return candlePrices, nil
}

// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe to the ticker and candle channels for all currency pairs.
func (p *BinanceProvider) subscribeChannels(cps ...types.CurrencyPair) error {
if err := p.subscribeTickers(cps...); err != nil {
return err
}

return p.subscribeCandles(cps...)
}

// subscribeTickers subscribe to the ticker channel for all currency pairs.
func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToBinanceTickerPair(cp)
}

return p.subscribePairs(pairs...)
}

// subscribeCandles subscribe to the candle channel for all currency pairs.
func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToBinanceCandlePair(cp)
}

return p.subscribePairs(pairs...)
}

// subscribedPairsToSlice returns the map of subscribed pairs as a slice.
func (p *BinanceProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
p.mtx.RLock()
defer p.mtx.RUnlock()
Expand Down Expand Up @@ -289,16 +318,8 @@ func (p *BinanceProvider) reconnect() error {
}
p.wsClient = wsConn

pairs := make([]string, len(p.subscribedPairs)*2)
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

return p.subscribePairs(pairs...)
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
Expand Down
54 changes: 40 additions & 14 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand Down Expand Up @@ -137,21 +137,55 @@ func (p *HuobiProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string
return candlePrices, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe all currency pairs into ticker and candle channels.
func (p *HuobiProvider) subscribeChannels(cps ...types.CurrencyPair) error {
if err := p.subscribeTickers(cps...); err != nil {
return err
}

return p.subscribeCandles(cps...)
}

// subscribeTickers subscribe all currency pairs into ticker channel.
func (p *HuobiProvider) subscribeTickers(cps ...types.CurrencyPair) error {
for _, cp := range cps {
if err := p.subscribeTickerPair(cp); err != nil {
return err
}
}

return nil
}

// subscribeCandles subscribe all currency pairs into candle channel.
func (p *HuobiProvider) subscribeCandles(cps ...types.CurrencyPair) error {
for _, cp := range cps {
if err := p.subscribeCandlePair(cp); err != nil {
return err
}
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribedPairsToSlice returns the map of subscribed pairs as slice
func (p *HuobiProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(huobiReconnectTime)
for {
Expand Down Expand Up @@ -292,16 +326,8 @@ func (p *HuobiProvider) reconnect() error {
}
p.wsClient = wsConn

for _, cp := range p.subscribedPairs {
if err := p.subscribeTickerPair(cp); err != nil {
return err
}
if err := p.subscribeCandlePair(cp); err != nil {
return err
}
}

return nil
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// subscribeTickerPair write the subscription ticker msg to the provider.
Expand Down
82 changes: 47 additions & 35 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.SubscribeTickers(pairs...); err != nil {
if err := provider.SubscribeCurrencyPairs(pairs...); err != nil {
return nil, err
}

Expand Down Expand Up @@ -149,6 +149,39 @@ func (p *KrakenProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[strin
return candlePrices, nil
}

// SubscribeCurrencyPairs subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) error {
if err := p.subscribeChannels(cps...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// subscribeChannels subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) subscribeChannels(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToKrakenPair(cp)
}

if err := p.subscribeTickers(pairs...); err != nil {
return err
}

return p.subscribeCandles(pairs...)
}

// subscribedPairsToSlice returns the map of subscribed pairs as slice
func (p *KrakenProvider) subscribedPairsToSlice() []types.CurrencyPair {
p.mtx.RLock()
defer p.mtx.RUnlock()

return mapPairsToSlice(p.subscribedPairs)
}

func (candle KrakenCandle) toCandlePrice() (CandlePrice, error) {
return newCandlePrice(
"Kraken",
Expand Down Expand Up @@ -179,25 +212,6 @@ func (p *KrakenProvider) getCandlePrices(key string) ([]CandlePrice, error) {
return candleList, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps))

for i, cp := range cps {
pairs[i] = currencyPairToKrakenPair(cp)
}

if err := p.subscribeTickerPairs(pairs...); err != nil {
return err
}
if err := p.subscribeCandlePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// handleWebSocketMsgs receive all the messages from the provider and controls the
// reconnect function to the web socket.
func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
Expand All @@ -211,6 +225,12 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
p.logger.Err(err).Msg("WebSocket closed unexpectedly")
p.keepReconnecting()
continue
}

// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
Expand Down Expand Up @@ -395,24 +415,16 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error {
// reconnect closes the last WS connection and create a new one.
func (p *KrakenProvider) reconnect() error {
p.wsClient.Close()
p.logger.Debug().Msg("trying to reconnect")

wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
if err != nil {
return fmt.Errorf("error connecting to Kraken websocket: %w", err)
}
p.wsClient = wsConn

pairs := make([]string, len(p.subscribedPairs))
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToKrakenPair(cp)
iterator++
}

if err := p.subscribeTickerPairs(pairs...); err != nil {
return err
}
return p.subscribeCandlePairs(pairs...)
currencyPairs := p.subscribedPairsToSlice()
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
Expand Down Expand Up @@ -501,14 +513,14 @@ func (p *KrakenProvider) ping() error {
return p.wsClient.WriteMessage(websocket.PingMessage, ping)
}

// subscribeTickerPairs write the subscription msg to the provider.
func (p *KrakenProvider) subscribeTickerPairs(pairs ...string) error {
// subscribeTickers write the subscription msg to the provider.
func (p *KrakenProvider) subscribeTickers(pairs ...string) error {
subsMsg := newKrakenTickerSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}

// subscribeCandlePairs write the subscription msg to the provider.
func (p *KrakenProvider) subscribeCandlePairs(pairs ...string) error {
// subscribeCandles write the subscription msg to the provider.
func (p *KrakenProvider) subscribeCandles(pairs ...string) error {
subsMsg := newKrakenCandleSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}
Expand Down
Loading