diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index 725ffc2015..0aa017df3a 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -81,6 +81,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#1255](https://github.com/umee-network/umee/pull/1255) Move TickerPrice and CandlePrice to types package - [#1374](https://github.com/umee-network/umee/pull/1374) Add standard for telemetry metrics. - [#1431](https://github.com/umee-network/umee/pull/1431) Convert floats to sdk decimal using helper functions in all providers. +- [#1442](https://github.com/umee-network/umee/pull/1442) Remove unnecessary method in recconection logic. ### Features diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index c39ae18ef3..fee61573c7 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -329,26 +329,13 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { p.messageReceived(messageType, bz) case <-reconnectTicker.C: - if err := p.disconnect(); err != nil { - p.logger.Err(err).Msg("error disconnecting") - } if err := p.reconnect(); err != nil { p.logger.Err(err).Msg("error reconnecting") - p.keepReconnecting() } } } } -// disconnect disconnects the existing websocket connection. -func (p *BinanceProvider) disconnect() error { - err := p.wsClient.Close() - if err != nil { - return types.ErrProviderConnection.Wrapf("error closing Binance websocket %v", err) - } - return nil -} - // reconnect closes the last WS connection then create a new one and subscribe to // all subscribed pairs in the ticker and candle pais. A single connection to // stream.binance.com is only valid for 24 hours; expect to be disconnected at the @@ -356,6 +343,11 @@ func (p *BinanceProvider) disconnect() error { // the websocket server does not receive a pong frame back from the connection // within a 10 minute period, the connection will be disconnected. func (p *BinanceProvider) reconnect() error { + err := p.wsClient.Close() + if err != nil { + p.logger.Err(err).Msg("error closing binance websocket") + } + p.logger.Debug().Msg("reconnecting websocket") wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil) defer resp.Body.Close() @@ -363,31 +355,9 @@ func (p *BinanceProvider) reconnect() error { return fmt.Errorf("error reconnect to binance websocket: %w", err) } p.wsClient = wsConn - - currencyPairs := p.subscribedPairsToSlice() - telemetryWebsocketReconnect(ProviderBinance) - return p.subscribeChannels(currencyPairs...) -} - -// keepReconnecting keeps trying to reconnect if an error occurs in reconnect. -func (p *BinanceProvider) keepReconnecting() { - reconnectTicker := time.NewTicker(defaultReconnectTime) - defer reconnectTicker.Stop() - connectionTries := 1 - for time := range reconnectTicker.C { - if err := p.reconnect(); err != nil { - p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String()) - connectionTries++ - continue - } - - if connectionTries > maxReconnectionTries { - p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries) - } - return - } + return p.subscribeChannels(p.subscribedPairsToSlice()...) } // setSubscribedPairs sets N currency pairs to the map of subscribed pairs. diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index f8acc4274d..0103999e0b 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -261,7 +261,6 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { if err != nil { if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) { p.logger.Err(err).Msg("WebSocket closed unexpectedly") - p.keepReconnecting() continue } @@ -269,7 +268,6 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { p.logger.Err(err).Msg("could not read message") if err := p.ping(); err != nil { p.logger.Err(err).Msg("failed to send ping") - p.keepReconnecting() } continue } @@ -281,12 +279,8 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { p.messageReceived(messageType, bz) case <-reconnectTicker.C: - if err := p.disconnect(); err != nil { - p.logger.Err(err).Msg("error disconnecting") - } if err := p.reconnect(); err != nil { - p.logger.Err(err).Msg("attempted to reconnect") - p.keepReconnecting() + p.logger.Err(err).Msg("error reconnecting") } } } @@ -463,50 +457,23 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error { return nil } -// disconnect disconnects the existing websocket connection. -func (p *KrakenProvider) disconnect() error { +// reconnect closes the last WS connection then create a new one. +func (p *KrakenProvider) reconnect() error { err := p.wsClient.Close() if err != nil { - return types.ErrProviderConnection.Wrapf("error closing Kraken websocket %v", err) + p.logger.Err(err).Msg("error closing Kraken websocket") } - return nil -} -// reconnect creates a new websocket connection. -func (p *KrakenProvider) reconnect() error { p.logger.Debug().Msg("trying to reconnect") - wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil) defer resp.Body.Close() if err != nil { return fmt.Errorf("error connecting to Kraken websocket: %w", err) } p.wsClient = wsConn - - currencyPairs := p.subscribedPairsToSlice() - telemetryWebsocketReconnect(ProviderKraken) - return p.subscribeChannels(currencyPairs...) -} -// keepReconnecting keeps trying to reconnect if an error occurs in recconnect. -func (p *KrakenProvider) keepReconnecting() { - reconnectTicker := time.NewTicker(defaultReconnectTime) - defer reconnectTicker.Stop() - connectionTries := 1 - - for time := range reconnectTicker.C { - if err := p.reconnect(); err != nil { - p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String()) - connectionTries++ - continue - } - - if connectionTries > maxReconnectionTries { - p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries) - } - return - } + return p.subscribeChannels(p.subscribedPairsToSlice()...) } // messageReceivedSubscriptionStatus handle the subscription status message @@ -543,7 +510,9 @@ func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) { return } - p.keepReconnecting() + if err := p.reconnect(); err != nil { + p.logger.Err(err).Msg("error reconnecting") + } } // setTickerPair sets an ticker to the map thread safe by the mutex.