Skip to content

Commit

Permalink
fix #3957: Lister onOpen should be called before marking the connec…
Browse files Browse the repository at this point in the history
…tion as open
  • Loading branch information
AdrianFarmadin authored and manusa committed Mar 22, 2022
1 parent b095b78 commit eaf7a15
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fix #3859: refined how a deserialization class is chosen to not confuse types with the same kind
* Fix #3745: the client will throw better exceptions when a namespace is not discernible for an operation
* Fix #3936: Kubernetes Mock Server .metadata.generation field is an integer
* Fix #3957: Lister `onOpen` should be called before marking the connection as open

#### Improvements
* Fix #3811: Reintroduce `Replaceable` interface in `NonNamespaceOperation`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.util.concurrent.CompletableFuture;

class OkHttpWebSocketImpl implements WebSocket {

static class BuilderImpl implements WebSocket.Builder {

private Request.Builder builder = new Request.Builder();
private OkHttpClient httpClient;

public BuilderImpl(OkHttpClient httpClient) {
this.httpClient = httpClient;
}

@Override
public Builder uri(URI uri) {
builder.url(HttpUrl.get(uri));
Expand All @@ -53,8 +53,8 @@ public CompletableFuture<WebSocket> buildAsync(Listener listener) {
Request request = builder.build();
CompletableFuture<WebSocket> future = new CompletableFuture<>();
httpClient.newWebSocket(request, new WebSocketListener() {
private volatile boolean opened;
private volatile boolean opened;

@Override
public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {
if (response != null) {
Expand All @@ -63,44 +63,45 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
if (!opened) {
if (response != null) {
try {
future.completeExceptionally(new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t));
future.completeExceptionally(
new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t));
} catch (IOException e) {
// can't happen
}
}
} else {
future.completeExceptionally(t);
}
} else {
listener.onError(new OkHttpWebSocketImpl(webSocket), t);
}
}

@Override
public void onOpen(okhttp3.WebSocket webSocket, Response response) {
opened = true;
if (response != null) {
response.close();
}
OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket);
future.complete(value);
listener.onOpen(value);
future.complete(value);
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), bytes.asByteBuffer());
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, String text) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), text);
}

@Override
public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) {
listener.onClose(new OkHttpWebSocketImpl(webSocket), code, reason);
}

});
return future;
}
Expand All @@ -110,23 +111,23 @@ public WebSocket.Builder header(String name, String value) {
builder = builder.addHeader(name, value);
return this;
}

@Override
public WebSocket.Builder setHeader(String k, String v) {
builder = builder.header(k, v);
return this;
}

@Override
public Builder subprotocol(String protocol) {
builder.header("Sec-WebSocket-Protocol", protocol);
return this;
}

}

private okhttp3.WebSocket webSocket;

public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket) {
this.webSocket = webSocket;
}
Expand All @@ -140,10 +141,10 @@ public boolean send(ByteBuffer buffer) {
public boolean sendClose(int code, String reason) {
return webSocket.close(code, reason);
}

@Override
public long queueSize() {
return webSocket.queueSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,32 @@ public interface WebSocket {

public interface Listener {

default void onOpen(WebSocket webSocket) { }
default void onOpen(WebSocket webSocket) {
}

default void onMessage(WebSocket webSocket, String text) {}
default void onMessage(WebSocket webSocket, String text) {
}

default void onMessage(WebSocket webSocket, ByteBuffer bytes) {}
default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
}

default void onClose(WebSocket webSocket, int code, String reason) {}
default void onClose(WebSocket webSocket, int code, String reason) {
}

default void onError(WebSocket webSocket, Throwable error) {}
default void onError(WebSocket webSocket, Throwable error) {
}

}

public interface Builder extends BasicBuilder {

/**
* Builds a new WebSocket connection and waits asynchronously until the connection is opened.
* The listener onOpen callback is called before the returned future is completed.
*
* @param listener
* @return CompletableFuture which is completed after connection is opened
*/
CompletableFuture<WebSocket> buildAsync(Listener listener);

Builder subprotocol(String protocol);
Expand All @@ -55,12 +67,14 @@ public interface Builder extends BasicBuilder {

/**
* Send some data
*
* @return true if the message was successfully enqueued.
*/
boolean send(ByteBuffer buffer);

/**
* Send a close message
*
* @return true if the message was successfully enqueued.
*/
boolean sendClose(int code, String reason);
Expand Down

0 comments on commit eaf7a15

Please sign in to comment.