Skip to content

Commit

Permalink
refactor: decouple HTTP and WebSocket engines
Browse files Browse the repository at this point in the history
- Extracted HTTP calls and WebSocket listeners into a separate module.
- Introduced an abstraction layer for easier implementation swapping.
  • Loading branch information
ttypic committed Sep 25, 2024
1 parent 7d70cd9 commit 6f38c17
Show file tree
Hide file tree
Showing 42 changed files with 990 additions and 333 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ bin/
.project

local.properties

lombok.config
2 changes: 2 additions & 0 deletions android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies {
api(libs.gson)
implementation(libs.bundles.common)
testImplementation(libs.bundles.tests)
implementation(project(":network-client-core"))
runtimeOnly(project(":network-client-default"))
implementation(libs.firebase.messaging)
androidTestImplementation(libs.bundles.instrumental.android)
}
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.vanniktech.maven.publish.SonatypeHost
plugins {
alias(libs.plugins.android.library) apply false
alias(libs.plugins.maven.publish) apply false
alias(libs.plugins.lombok) apply false
}

subprojects {
Expand Down
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ android-test = "0.5"
dexmaker = "1.4"
android-retrostreams = "1.7.4"
maven-publish = "0.29.0"
lombok = "8.10"

[libraries]
gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" }
Expand All @@ -39,11 +40,12 @@ dexmaker-mockito = { group = "com.crittercism.dexmaker", name = "dexmaker-mockit
android-retrostreams = { group = "net.sourceforge.streamsupport", name = "android-retrostreams", version.ref = "android-retrostreams" }

[bundles]
common = ["msgpack", "java-websocket", "vcdiff-core"]
common = ["msgpack", "vcdiff-core"]
tests = ["junit","hamcrest-all", "nanohttpd", "nanohttpd-nanolets", "nanohttpd-websocket", "mockito-core", "concurrentunit", "slf4j-simple"]
instrumental-android = ["android-test-runner", "android-test-rules", "dexmaker", "dexmaker-dx", "dexmaker-mockito", "android-retrostreams"]

[plugins]
android-library = { id = "com.android.library", version.ref = "agp" }
build-config = { id = "com.github.gmazzo.buildconfig", version.ref = "build-config" }
maven-publish = { id = "com.vanniktech.maven.publish", version.ref = "maven-publish" }
lombok = { id = "io.freefair.lombok", version.ref = "lombok" }
2 changes: 2 additions & 0 deletions java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ tasks.withType<Jar> {
dependencies {
api(libs.gson)
implementation(libs.bundles.common)
implementation(project(":network-client-core"))
runtimeOnly(project(":network-client-default"))
testImplementation(libs.bundles.tests)
}

Expand Down
4 changes: 2 additions & 2 deletions lib/src/main/java/io/ably/lib/debug/DebugOptions.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.ably.lib.debug;

import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;

import io.ably.lib.http.HttpCore;
import io.ably.lib.network.HttpRequest;
import io.ably.lib.transport.ITransport;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
Expand All @@ -19,7 +19,7 @@ public interface RawProtocolListener {
}

public interface RawHttpListener {
HttpCore.Response onRawHttpRequest(String id, HttpURLConnection conn, String method, String authHeader, Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody);
HttpCore.Response onRawHttpRequest(String id, HttpRequest request, String authHeader, Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody);
void onRawHttpResponse(String id, String method, HttpCore.Response response);
void onRawHttpException(String id, String method, Throwable t);
}
Expand Down
357 changes: 144 additions & 213 deletions lib/src/main/java/io/ably/lib/http/HttpCore.java

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ably.lib.http;

import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
Expand All @@ -9,6 +8,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.ably.lib.network.HttpCall;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ErrorInfo;
Expand Down Expand Up @@ -331,15 +331,15 @@ protected void setError(ErrorInfo err) {
}
}
protected synchronized boolean disposeConnection() {
boolean hasConnection = conn != null;
boolean hasConnection = httpCall != null;
if(hasConnection) {
conn.disconnect();
conn = null;
httpCall.cancel();
httpCall = null;
}
return hasConnection;
}

protected HttpURLConnection conn;
protected HttpCall httpCall;
protected T result;
protected ErrorInfo err;

Expand Down
111 changes: 43 additions & 68 deletions lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package io.ably.lib.transport;

import io.ably.lib.http.HttpUtils;
import io.ably.lib.network.WebSocketClient;
import io.ably.lib.network.WebSocketEngine;
import io.ably.lib.network.WebSocketEngineConfig;
import io.ably.lib.network.WebSocketEngineFactory;
import io.ably.lib.network.WebSocketListener;
import io.ably.lib.network.NotConnectedException;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolSerializer;
import io.ably.lib.util.ClientOptionsUtils;
import io.ably.lib.util.Log;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.exceptions.WebsocketNotConnectedException;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;

import javax.net.ssl.HttpsURLConnection;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSession;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -50,7 +47,7 @@ public class WebSocketTransport implements ITransport {
private final boolean channelBinaryMode;
private String wsUri;
private ConnectListener connectListener;
private WsClient wsConnection;
private WebSocketClient webSocketClient;
/******************
* protected constructor
******************/
Expand Down Expand Up @@ -81,15 +78,26 @@ public void connect(ConnectListener connectListener) {

Log.d(TAG, "connect(); wsUri = " + wsUri);
synchronized (this) {
wsConnection = new WsClient(URI.create(wsUri), this::receive);
WebSocketEngineFactory engineFactory = WebSocketEngineFactory.getFirstAvailable();
Log.v(TAG, String.format("Using %s WebSocket Engine", engineFactory.getEngineType().name()));

WebSocketEngineConfig.WebSocketEngineConfigBuilder configBuilder = WebSocketEngineConfig.builder();
configBuilder
.tls(isTls)
.host(params.host)
.proxy(ClientOptionsUtils.convertToProxyConfig(params.getClientOptions()));

if (isTls) {
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null);
SafeSSLSocketFactory factory = new SafeSSLSocketFactory(sslContext.getSocketFactory());
wsConnection.setSocketFactory(factory);
configBuilder.sslSocketFactory(factory);
}

WebSocketEngine engine = engineFactory.create(configBuilder.build());
webSocketClient = engine.create(wsUri, new WebSocketHandler(this::receive));
}
wsConnection.connect();
webSocketClient.connect();
} catch (AblyException e) {
Log.e(TAG, "Unexpected exception attempting connection; wsUri = " + wsUri, e);
connectListener.onTransportUnavailable(this, e.errorInfo);
Expand All @@ -103,9 +111,9 @@ public void connect(ConnectListener connectListener) {
public void close() {
Log.d(TAG, "close()");
synchronized (this) {
if (wsConnection != null) {
wsConnection.close();
wsConnection = null;
if (webSocketClient != null) {
webSocketClient.close();
webSocketClient = null;
}
}
}
Expand All @@ -127,14 +135,14 @@ public void send(ProtocolMessage msg) throws AblyException {
ProtocolMessage decodedMsg = ProtocolSerializer.readMsgpack(encodedMsg);
Log.v(TAG, "send(): " + decodedMsg.action + ": " + new String(ProtocolSerializer.writeJSON(decodedMsg)));
}
wsConnection.send(encodedMsg);
webSocketClient.send(encodedMsg);
} else {
// Check the logging level to avoid performance hit associated with building the message
if (Log.level <= Log.VERBOSE)
Log.v(TAG, "send(): " + new String(ProtocolSerializer.writeJSON(msg)));
wsConnection.send(ProtocolSerializer.writeJSON(msg));
webSocketClient.send(ProtocolSerializer.writeJSON(msg));
}
} catch (WebsocketNotConnectedException e) {
} catch (NotConnectedException e) {
if (connectListener != null) {
connectListener.onTransportUnavailable(this, AblyException.fromThrowable(e).errorInfo);
} else
Expand Down Expand Up @@ -180,7 +188,7 @@ public WebSocketTransport getTransport(TransportParams params, ConnectionManager
* WebSocketHandler methods
**************************/

class WsClient extends WebSocketClient {
class WebSocketHandler implements WebSocketListener {
private final WebSocketReceiver receiver;
/***************************
* WsClient private members
Expand All @@ -189,38 +197,16 @@ class WsClient extends WebSocketClient {
private Timer timer = new Timer();
private TimerTask activityTimerTask = null;
private long lastActivityTime;
private boolean shouldExplicitlyVerifyHostname = true;

WsClient(URI serverUri, WebSocketReceiver receiver) {
super(serverUri);
WebSocketHandler(WebSocketReceiver receiver) {
this.receiver = receiver;
}

@Override
public void onOpen(ServerHandshake handshakedata) {
public void onOpen() {
Log.d(TAG, "onOpen()");
if (params.options.tls && shouldExplicitlyVerifyHostname && !isHostnameVerified(params.host)) {
close();
} else {
connectListener.onTransportAvailable(WebSocketTransport.this);
flagActivity();
}
}

/**
* Added because we had to override the onSetSSLParameters() that usually performs this verification.
* When the minSdkVersion will be updated to 24 we should remove this method and its usages.
* https://github.com/TooTallNate/Java-WebSocket/wiki/No-such-method-error-setEndpointIdentificationAlgorithm#workaround
*/
private boolean isHostnameVerified(String hostname) {
final SSLSession session = getSSLSession();
if (HttpsURLConnection.getDefaultHostnameVerifier().verify(hostname, session)) {
Log.v(TAG, "Successfully verified hostname");
return true;
} else {
Log.e(TAG, "Hostname verification failed, expected " + hostname + ", found " + session.getPeerHost());
return false;
}
connectListener.onTransportAvailable(WebSocketTransport.this);
flagActivity();
}

@Override
Expand Down Expand Up @@ -253,16 +239,14 @@ public void onMessage(String string) {

/* This allows us to detect a websocket ping, so we don't need Ably pings. */
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
public void onWebsocketPing() {
Log.d(TAG, "onWebsocketPing()");
/* Call superclass to ensure the pong is sent. */
super.onWebsocketPing(conn, f);
flagActivity();
}

@Override
public void onClose(final int wsCode, final String wsReason, final boolean remote) {
Log.d(TAG, "onClose(): wsCode = " + wsCode + "; wsReason = " + wsReason + "; remote = " + remote);
public void onClose(final int wsCode, final String wsReason) {
Log.d(TAG, "onClose(): wsCode = " + wsCode + "; wsReason = " + wsReason + "; remote = " + false);

ErrorInfo reason;
switch (wsCode) {
Expand Down Expand Up @@ -301,23 +285,14 @@ public void onClose(final int wsCode, final String wsReason, final boolean remot
}

@Override
public void onError(final Exception e) {
Log.e(TAG, "Connection error ", e);
connectListener.onTransportUnavailable(WebSocketTransport.this, new ErrorInfo(e.getMessage(), 503, 80000));
public void onError(Throwable throwable) {
Log.e(TAG, "Connection error ", throwable);
connectListener.onTransportUnavailable(WebSocketTransport.this, new ErrorInfo(throwable.getMessage(), 503, 80000));
}

@Override
protected void onSetSSLParameters(SSLParameters sslParameters) {
try {
super.onSetSSLParameters(sslParameters);
shouldExplicitlyVerifyHostname = false;
} catch (NoSuchMethodError exception) {
// This error will be thrown on Android below level 24.
// When the minSdkVersion will be updated to 24 we should remove this overridden method.
// https://github.com/TooTallNate/Java-WebSocket/wiki/No-such-method-error-setEndpointIdentificationAlgorithm#workaround
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", exception);
shouldExplicitlyVerifyHostname = true;
}
public void onOldJavaVersionDetected(Throwable throwable) {
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", throwable);
}

private synchronized void dispose() {
Expand Down Expand Up @@ -391,7 +366,7 @@ private synchronized void onActivityTimerExpiry() {
// If we have no time remaining, then close the connection
if (timeRemaining <= 0) {
Log.e(TAG, "No activity for " + getActivityTimeout() + "ms, closing connection");
closeConnection(CloseFrame.ABNORMAL_CLOSE, "timed out");
webSocketClient.cancel(ABNORMAL_CLOSE, "timed out");
return;
}

Expand Down
5 changes: 4 additions & 1 deletion lib/src/main/java/io/ably/lib/types/AblyException.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.types;

import io.ably.lib.network.FailedConnectionException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
Expand Down Expand Up @@ -50,6 +51,8 @@ public static AblyException fromThrowable(Throwable t) {
return (AblyException)t;
if(t instanceof ConnectException || t instanceof SocketTimeoutException || t instanceof UnknownHostException || t instanceof NoRouteToHostException)
return new HostFailedException(t, ErrorInfo.fromThrowable(t));
if (t instanceof FailedConnectionException)
return new HostFailedException(t.getCause(), ErrorInfo.fromThrowable(t.getCause()));

return new AblyException(t, ErrorInfo.fromThrowable(t));
}
Expand All @@ -61,4 +64,4 @@ public static class HostFailedException extends AblyException {
super(throwable, reason);
}
}
}
}
37 changes: 37 additions & 0 deletions lib/src/main/java/io/ably/lib/util/ClientOptionsUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.ably.lib.util;

import io.ably.lib.network.ProxyAuthType;
import io.ably.lib.network.ProxyConfig;
import io.ably.lib.types.ClientOptions;

import java.util.Arrays;

public class ClientOptionsUtils {

public static ProxyConfig convertToProxyConfig(ClientOptions clientOptions) {
if (clientOptions.proxy == null) return null;

ProxyConfig.ProxyConfigBuilder builder = ProxyConfig.builder();

builder
.host(clientOptions.proxy.host)
.port(clientOptions.proxy.port)
.username(clientOptions.proxy.username)
.password(clientOptions.proxy.password);

if (clientOptions.proxy.nonProxyHosts != null) {
builder.nonProxyHosts(Arrays.asList(clientOptions.proxy.nonProxyHosts));
}

switch (clientOptions.proxy.prefAuthType) {
case BASIC:
builder.authType(ProxyAuthType.BASIC);
break;
case DIGEST:
builder.authType(ProxyAuthType.DIGEST);
break;
}

return builder.build();
}
}
Loading

0 comments on commit 6f38c17

Please sign in to comment.