Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove unnecessary instances of keepReconnecting (backport #1442) #1466

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f9a58fc
docs: price-feeder updates (#1423)
adamewozniak Sep 19, 2022
6dfc8be
docs: release notes draft (#1404)
robert-zaremba Sep 19, 2022
3caffaf
Use 1.2 gas_adjustment in price-feeder e2e tests (#1420)
robert-zaremba Sep 19, 2022
6e3f3d1
fix: moved to v300rc1 (#1428)
RafilxTenfen Sep 20, 2022
f8863d9
fix: oracle EndBlocker must be before gov and staking (#1434)
robert-zaremba Sep 20, 2022
09bbf02
fix: RewardBallotWinners determinism (#1433)
robert-zaremba Sep 20, 2022
abcdb11
fix: oracle.RewardBallotWinner detect duplicated uumee (#1432)
robert-zaremba Sep 20, 2022
7c5df71
chore: update gravity bridge (#1435)
robert-zaremba Sep 20, 2022
421da96
fix: oracle endblocker order fix (#1438)
adamewozniak Sep 21, 2022
c54d166
feat: ibc ante handlers and upgrade cleanup (#1441)
robert-zaremba Sep 21, 2022
ae43ccb
docs: update release notes (#1427)
robert-zaremba Sep 21, 2022
8b54dd0
fix: fix the github workflow for upgrade (#1444)
gsk967 Sep 24, 2022
60b7c2d
fix: token registry cache (#1450)
robert-zaremba Sep 28, 2022
e21af73
docs: v3.0.1 release notes (#1453)
robert-zaremba Sep 28, 2022
81ba333
fix: update & fix gravity bridge (#1460)
robert-zaremba Sep 29, 2022
d84136f
build(deps): Bump github.com/cosmos/ibc-go/v5 from 5.0.0-rc2 to 5.0.0…
dependabot[bot] Sep 30, 2022
bc5cace
refactor: Cleaned up float->dec conversions (#1431)
rbajollari Sep 30, 2022
aa6a3d5
chore: improve cache build (#1461)
gsk967 Oct 2, 2022
77fa5a6
refactor: Remove unnecessary instances of keepReconnecting (#1442)
rbajollari Oct 3, 2022
17faad1
refactor: Remove unnecessary instances of keepReconnecting (#1442)
rbajollari Oct 3, 2022
f805484
ci: Disable upgrade ci (#1464)
RafilxTenfen Oct 4, 2022
dbc5f6c
Merge branch 'main' into mergify/bp/release/price-feeder/v1.x.x/pr-1442
rbajollari Oct 4, 2022
1db5150
Revert "Merge branch 'main' into mergify/bp/release/price-feeder/v1.x…
rbajollari Oct 4, 2022
c048a66
Resolve conflict
rbajollari Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#1175](https://github.com/umee-network/umee/pull/1175) Add type ProviderName.
- [#1255](https://github.com/umee-network/umee/pull/1255) Move TickerPrice and CandlePrice to types package
- [#1374](https://github.com/umee-network/umee/pull/1374) Add standard for telemetry metrics.
- [#1431](https://github.com/umee-network/umee/pull/1431) Convert floats to sdk decimal using helper functions in all providers.
- [#1442](https://github.com/umee-network/umee/pull/1442) Remove unnecessary method in recconection logic.

### Features

Expand Down
42 changes: 6 additions & 36 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,65 +329,35 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
p.keepReconnecting()
}
}
}
}

// disconnect disconnects the existing websocket connection.
func (p *BinanceProvider) disconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Binance websocket %v", err)
}
return nil
}

// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
p.logger.Err(err).Msg("error closing binance websocket")
}

p.logger.Debug().Msg("reconnecting websocket")
wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error reconnect to binance websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderBinance)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
func (p *BinanceProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
Expand Down
47 changes: 8 additions & 39 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,13 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
p.logger.Err(err).Msg("WebSocket closed unexpectedly")
p.keepReconnecting()
continue
}

// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("failed to send ping")
p.keepReconnecting()
}
continue
}
Expand All @@ -281,12 +279,8 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("attempted to reconnect")
p.keepReconnecting()
p.logger.Err(err).Msg("error reconnecting")
}
}
}
Expand Down Expand Up @@ -463,50 +457,23 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error {
return nil
}

// disconnect disconnects the existing websocket connection.
func (p *KrakenProvider) disconnect() error {
// reconnect closes the last WS connection then create a new one.
func (p *KrakenProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Kraken websocket %v", err)
p.logger.Err(err).Msg("error closing Kraken websocket")
}
return nil
}

// reconnect creates a new websocket connection.
func (p *KrakenProvider) reconnect() error {
p.logger.Debug().Msg("trying to reconnect")

wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error connecting to Kraken websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderKraken)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
func (p *KrakenProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// messageReceivedSubscriptionStatus handle the subscription status message
Expand Down Expand Up @@ -543,7 +510,9 @@ func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) {
return
}

p.keepReconnecting()
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
}
}

// setTickerPair sets an ticker to the map thread safe by the mutex.
Expand Down