Skip to content

Commit

Permalink
fix: websocket not receiving messages after some time (#791)
Browse files Browse the repository at this point in the history
* fix: websocket not receiving messages after some time

* chore: reduce ping interval to 20s

Co-authored-by: Mohamad Jaara <mohamad.jaara@wire.com>
  • Loading branch information
vitorhugods and MohamadJaara committed Aug 16, 2022
1 parent 83a9c42 commit dda279e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal class EventProcessorImpl(
is Event.FeatureConfig -> featureConfigEventReceiver.onEvent(event)
is Event.Unknown -> kaliumLogger.withFeatureId(EVENT_RECEIVER).i("Unhandled event id=${event.id}")
}
kaliumLogger.withFeatureId(EVENT_RECEIVER).i("Updating lastProcessedEventId ${event.id}")
eventRepository.updateLastProcessedEventId(event.id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ internal class IncrementalSyncManager(
}
incrementalSyncRepository.updateIncrementalSyncState(newState)
}
kaliumLogger.i("IncrementalSync stopped.")
}

private companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,14 @@ package com.wire.kalium.network

import io.ktor.client.engine.HttpClientEngine
import io.ktor.client.engine.okhttp.OkHttp
import okhttp3.OkHttpClient
import java.util.concurrent.TimeUnit

actual fun defaultHttpEngine(): HttpClientEngine = OkHttp.create()
actual fun defaultHttpEngine(): HttpClientEngine = OkHttp.create {
// OkHttp doesn't support configuring ping intervals dynamically,
// so they must be set when creating the Engine
// See https://youtrack.jetbrains.com/issue/KTOR-4752
val client = OkHttpClient.Builder().pingInterval(WEBSOCKET_PING_INTERVAL_MILLIS, TimeUnit.MILLISECONDS).build()
preconfigured = client
webSocketFactory = KaliumWebSocketFactory(client)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.wire.kalium.network

import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import okio.ByteString

/**
* Upon request, creates normal [WebSocket], but wraps the listener
* to add logging.
* @see WrapperListener
*/
class KaliumWebSocketFactory(private val okHttpClient: OkHttpClient) : WebSocket.Factory {

override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
val wrapperListener = WrapperListener(listener)
return okHttpClient.newWebSocket(request, wrapperListener)
}

/**
* Wraps the provided [wrappedListener], keeping the normal behaviour,
* but using [kaliumLogger] to log all operations.
*/
inner class WrapperListener(private val wrappedListener: WebSocketListener) : WebSocketListener() {
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
kaliumLogger.v("WEBSOCKET: onClosed($code, $reason)")
wrappedListener.onClosed(webSocket, code, reason)
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
kaliumLogger.v("WEBSOCKET: onClosing($code, $reason)")
wrappedListener.onClosing(webSocket, code, reason)
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
kaliumLogger.v("WEBSOCKET: onFailure($t, $response)")
wrappedListener.onFailure(webSocket, t, response)
}

override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
kaliumLogger.v("WEBSOCKET: onMessage($text)")
wrappedListener.onMessage(webSocket, text)
}

override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
super.onMessage(webSocket, bytes)
kaliumLogger.v("WEBSOCKET: onMessage($bytes)")
wrappedListener.onMessage(webSocket, bytes)
}

override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
kaliumLogger.v("WEBSOCKET: onOpen($response)")
wrappedListener.onOpen(webSocket, response)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ internal class AuthenticatedWebSocketClient(
mls()
xprotobuf()
}
install(WebSockets)
install(WebSockets) {
// Depending on the Engine (OkHttp for example), we might
// need to set this value there too, as this here won't work
pingInterval = WEBSOCKET_PING_INTERVAL_MILLIS
}
}
}

Expand Down Expand Up @@ -128,3 +132,4 @@ internal fun provideBaseHttpClient(

internal fun shouldAddApiVersion(apiVersion: Int): Boolean = apiVersion >= MINIMUM_API_VERSION_TO_ADD
private const val MINIMUM_API_VERSION_TO_ADD = 1
internal const val WEBSOCKET_PING_INTERVAL_MILLIS = 20_000L

0 comments on commit dda279e

Please sign in to comment.