Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove unnecessary instances of keepReconnecting #1442

Merged
merged 9 commits into from
Oct 3, 2022
42 changes: 6 additions & 36 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,65 +329,35 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
p.keepReconnecting()
}
}
}
}

// disconnect disconnects the existing websocket connection.
func (p *BinanceProvider) disconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Binance websocket %v", err)
}
return nil
}

// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
p.logger.Err(err).Msg("error closing binance websocket")
}

p.logger.Debug().Msg("reconnecting websocket")
wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error reconnect to binance websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderBinance)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
func (p *BinanceProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
Expand Down
62 changes: 11 additions & 51 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,12 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
p.logger.Err(err).Msg("WebSocket closed unexpectedly")
p.keepReconnecting()
if err != nil {
adamewozniak marked this conversation as resolved.
Show resolved Hide resolved
// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("kraken: could not read message")
continue
}

// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("failed to send ping")
p.keepReconnecting()
}
continue
}

Expand All @@ -281,12 +275,8 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("attempted to reconnect")
p.keepReconnecting()
p.logger.Err(err).Msg("error reconnecting")
}
}
}
Expand Down Expand Up @@ -463,50 +453,23 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error {
return nil
}

// disconnect disconnects the existing websocket connection.
func (p *KrakenProvider) disconnect() error {
// reconnect closes the last WS connection then create a new one.
func (p *KrakenProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Kraken websocket %v", err)
p.logger.Err(err).Msg("error closing Kraken websocket")
}
return nil
}

// reconnect creates a new websocket connection.
func (p *KrakenProvider) reconnect() error {
p.logger.Debug().Msg("trying to reconnect")

wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error connecting to Kraken websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderKraken)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
func (p *KrakenProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// messageReceivedSubscriptionStatus handle the subscription status message
Expand Down Expand Up @@ -543,7 +506,9 @@ func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) {
return
}

p.keepReconnecting()
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
}
}

// setTickerPair sets an ticker to the map thread safe by the mutex.
Expand All @@ -570,11 +535,6 @@ func (p *KrakenProvider) setCandlePair(candle KrakenCandle) {
p.candles[candle.Symbol] = candleList
}

// ping to check websocket connection.
func (p *KrakenProvider) ping() error {
rbajollari marked this conversation as resolved.
Show resolved Hide resolved
return p.wsClient.WriteMessage(websocket.PingMessage, ping)
}

// subscribeTickers write the subscription msg to the provider.
func (p *KrakenProvider) subscribeTickers(pairs ...string) error {
subsMsg := newKrakenTickerSubscriptionMsg(pairs...)
Expand Down