From 18e3b0a3dd2aa78ce0ac3603001ccc4a655e7f5b Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Thu, 25 Jan 2018 11:26:50 +0000 Subject: [PATCH 1/4] Improvements to error handling and the ability to shutdown the client and/or a active web socket. - Optional `onFailure` added to `BinanceApiCallback` to allow user to opt-in to receiving failure info. - `BinanceApiWebSocketClient` enhanced to return `Closeable`s from calls creating web sockets, so that said web sockets can be later closed, if needed. - `BinanceApiWebSocketClient` enhanced to be `Closeable` itself, closing the internal OKHttp dispatcher. - `BinanceApiWebSocketListener` no longer throws an exception from `onFailure`, as this was just being thrown up the stack to the `Thread.uncaughtExceptionHandler`. With these changes I can now: - detect failures on the web socket and choose to do something about them. - shutdown the `BinanceApiWebSocketClient`, and potentially recreate later, without any resource leaks or references being held to my callback objects. --- .../api/client/BinanceApiCallback.java | 13 +++++--- .../api/client/BinanceApiWebSocketClient.java | 14 +++++---- .../impl/BinanceApiWebSocketClientImpl.java | 30 +++++++++++-------- .../impl/BinanceApiWebSocketListener.java | 11 ++++++- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/binance/api/client/BinanceApiCallback.java b/src/main/java/com/binance/api/client/BinanceApiCallback.java index a3b68acae..f492cc1da 100644 --- a/src/main/java/com/binance/api/client/BinanceApiCallback.java +++ b/src/main/java/com/binance/api/client/BinanceApiCallback.java @@ -1,19 +1,24 @@ package com.binance.api.client; -import com.binance.api.client.exception.BinanceApiException; - /** * BinanceApiCallback is a functional interface used together with the BinanceApiAsyncClient to provide a non-blocking REST client. * * @param the return type from the callback */ +@FunctionalInterface public interface BinanceApiCallback { /** * Called whenever a response comes back from the Binance API. * * @param response the expected response object - * @throws BinanceApiException if it is not possible to obtain the expected response object (e.g. incorrect API-KEY). */ - void onResponse(T response) throws BinanceApiException; + void onResponse(T response); + + /** + * Called whenever an error occurs. + * + * @param cause the cause of the failure + */ + default void onFailure(Throwable cause) {} } \ No newline at end of file diff --git a/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java b/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java index 2ba16d61c..5a16a94d9 100644 --- a/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java +++ b/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java @@ -6,16 +6,20 @@ import com.binance.api.client.domain.event.UserDataUpdateEvent; import com.binance.api.client.domain.market.CandlestickInterval; +import java.io.Closeable; + /** * Binance API data streaming façade, supporting streaming of events through web sockets. */ -public interface BinanceApiWebSocketClient { +public interface BinanceApiWebSocketClient extends Closeable { + + Closeable onDepthEvent(String symbol, BinanceApiCallback callback); - void onDepthEvent(String symbol, BinanceApiCallback callback); + Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback); - void onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback); + Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback); - void onAggTradeEvent(String symbol, BinanceApiCallback callback); + Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback); - void onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback); + void close(); } diff --git a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java index d4cd1e3d2..6c42b4559 100644 --- a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java +++ b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java @@ -10,9 +10,9 @@ import com.binance.api.client.domain.market.CandlestickInterval; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.WebSocket; import java.io.Closeable; -import java.io.IOException; /** * Binance API WebSocket client implementation using OkHttp. @@ -25,34 +25,40 @@ public BinanceApiWebSocketClientImpl() { this.client = new OkHttpClient(); } - public void onDepthEvent(String symbol, BinanceApiCallback callback) { + public Closeable onDepthEvent(String symbol, BinanceApiCallback callback) { final String channel = String.format("%s@depth", symbol); - createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class)); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class)); } @Override - public void onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback) { + public Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback) { final String channel = String.format("%s@kline_%s", symbol, interval.getIntervalId()); - createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class)); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class)); } - public void onAggTradeEvent(String symbol, BinanceApiCallback callback) { + public Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback) { final String channel = String.format("%s@aggTrade", symbol); - createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class)); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class)); } - public void onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback) { - createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class)); + public Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback) { + return createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class)); } - private void createNewWebSocket(String channel, BinanceApiWebSocketListener listener) { + private Closeable createNewWebSocket(String channel, BinanceApiWebSocketListener listener) { String streamingUrl = String.format("%s/%s", BinanceApiConstants.WS_API_BASE_URL, channel); Request request = new Request.Builder().url(streamingUrl).build(); - client.newWebSocket(request, listener); + final WebSocket webSocket = client.newWebSocket(request, listener); + return () -> { + final int code = 1000; + listener.onClosing(webSocket, code, null); + webSocket.close(code, null); + listener.onClosed(webSocket, code, null); + }; } @Override - public void close() throws IOException { + public void close() { client.dispatcher().executorService().shutdown(); } } diff --git a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketListener.java b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketListener.java index 607249fb7..367b1b73c 100644 --- a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketListener.java +++ b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketListener.java @@ -20,6 +20,8 @@ public class BinanceApiWebSocketListener extends WebSocketListener { private ObjectMapper mapper; + private boolean closing = false; + public BinanceApiWebSocketListener(BinanceApiCallback callback, Class eventClass) { this(callback, eventClass, new ObjectMapper()); } @@ -40,8 +42,15 @@ public void onMessage(WebSocket webSocket, String text) { } } + @Override + public void onClosing(final WebSocket webSocket, final int code, final String reason) { + closing = true; + } + @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { - throw new BinanceApiException(t); + if (!closing) { + callback.onFailure(t); + } } } \ No newline at end of file From a10dc0c0ab7fd848381af677f93cbf678c831a4e Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Sun, 4 Feb 2018 15:00:57 +0000 Subject: [PATCH 2/4] -Removed unused imports -Added Java docs to make it clear you can close the websocket by calling close on the returned `Closeable` --- .../api/client/BinanceApiWebSocketClient.java | 51 ++++++++-- .../impl/BinanceApiWebSocketClientImpl.java | 94 +++++++++---------- 2 files changed, 88 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java b/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java index 9ff785605..b118efa9d 100644 --- a/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java +++ b/src/main/java/com/binance/api/client/BinanceApiWebSocketClient.java @@ -1,6 +1,5 @@ package com.binance.api.client; -import java.util.List; import com.binance.api.client.domain.event.AggTradeEvent; import com.binance.api.client.domain.event.AllMarketTickersEvent; import com.binance.api.client.domain.event.CandlestickEvent; @@ -9,21 +8,57 @@ import com.binance.api.client.domain.market.CandlestickInterval; import java.io.Closeable; +import java.util.List; /** * Binance API data streaming façade, supporting streaming of events through web sockets. */ public interface BinanceApiWebSocketClient extends Closeable { - Closeable onDepthEvent(String symbol, BinanceApiCallback callback); + /** + * Open a new web socket to receive {@link DepthEvent depthEvents} on a callback. + * + * @param symbol the market symbol to subscribe to + * @param callback the callback to call on new events + * @return a {@link Closeable} that allows the underlying web socket to be closed. + */ + Closeable onDepthEvent(String symbol, BinanceApiCallback callback); + + /** + * Open a new web socket to receive {@link CandlestickEvent candlestickEvents} on a callback. + * + * @param symbol the market symbol to subscribe to + * @param interval the interval of the candles tick events required + * @param callback the callback to call on new events + * @return a {@link Closeable} that allows the underlying web socket to be closed. + */ + Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback); - Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback); + /** + * Open a new web socket to receive {@link AggTradeEvent aggTradeEvents} on a callback. + * + * @param symbol the market symbol to subscribe to + * @param callback the callback to call on new events + * @return a {@link Closeable} that allows the underlying web socket to be closed. + */ + Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback); - Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback); + /** + * Open a new web socket to receive {@link UserDataUpdateEvent userDataUpdateEvents} on a callback. + * + * @param listenKey the listen key to subscribe to. + * @param callback the callback to call on new events + * @return a {@link Closeable} that allows the underlying web socket to be closed. + */ + Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback); - Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback); + /** + * Open a new web socket to receive {@link AllMarketTickersEvent allMarketTickersEvents} on a callback. + * + * @param callback the callback to call on new events + * @return a {@link Closeable} that allows the underlying web socket to be closed. + */ + Closeable onAllMarketTickersEvent(BinanceApiCallback> callback); - Closeable onAllMarketTickersEvent(BinanceApiCallback> callback); - - void close(); + void close(); } diff --git a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java index 1e874b8c7..87ff981ab 100644 --- a/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java +++ b/src/main/java/com/binance/api/client/impl/BinanceApiWebSocketClientImpl.java @@ -1,11 +1,5 @@ package com.binance.api.client.impl; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import okhttp3.Dispatcher; -import okhttp3.OkHttpClient; -import okhttp3.Request; import com.binance.api.client.BinanceApiCallback; import com.binance.api.client.BinanceApiWebSocketClient; import com.binance.api.client.constant.BinanceApiConstants; @@ -15,64 +9,66 @@ import com.binance.api.client.domain.event.DepthEvent; import com.binance.api.client.domain.event.UserDataUpdateEvent; import com.binance.api.client.domain.market.CandlestickInterval; +import okhttp3.Dispatcher; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.WebSocket; import java.io.Closeable; +import java.util.List; /** * Binance API WebSocket client implementation using OkHttp. */ public class BinanceApiWebSocketClientImpl implements BinanceApiWebSocketClient, Closeable { - private OkHttpClient client; + private OkHttpClient client; + + public BinanceApiWebSocketClientImpl() { + Dispatcher d = new Dispatcher(); + d.setMaxRequestsPerHost(100); + this.client = new OkHttpClient.Builder().dispatcher(d).build(); + } + + public Closeable onDepthEvent(String symbol, BinanceApiCallback callback) { + final String channel = String.format("%s@depth", symbol); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class)); + } - public BinanceApiWebSocketClientImpl() { - Dispatcher d = new Dispatcher(); - d.setMaxRequestsPerHost(100); - this.client = new OkHttpClient.Builder().dispatcher(d).build(); - } + @Override + public Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback) { + final String channel = String.format("%s@kline_%s", symbol, interval.getIntervalId()); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class)); + } - public Closeable onDepthEvent(String symbol, BinanceApiCallback callback) { - final String channel = String.format("%s@depth", symbol); - return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class)); - } + public Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback) { + final String channel = String.format("%s@aggTrade", symbol); + return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class)); + } - @Override - public Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback callback) { - final String channel = String.format("%s@kline_%s", symbol, interval.getIntervalId()); - return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class)); - } + public Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback) { + return createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class)); + } - public Closeable onAggTradeEvent(String symbol, BinanceApiCallback callback) { - final String channel = String.format("%s@aggTrade", symbol); - return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class)); - } + public Closeable onAllMarketTickersEvent(BinanceApiCallback> callback) { + final String channel = "!ticker@arr"; + return createNewWebSocket(channel, new BinanceApiWebSocketListener>(callback)); + } - public Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback callback) { - return createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class)); - } - - public Closeable onAllMarketTickersEvent(BinanceApiCallback> callback) { - final String channel = "!ticker@arr"; - return createNewWebSocket(channel, new BinanceApiWebSocketListener>(callback)); - } - - @Override - public void close() { - client.dispatcher().executorService().shutdown(); - } + @Override + public void close() { + client.dispatcher().executorService().shutdown(); + } - private Closeable createNewWebSocket(String channel, BinanceApiWebSocketListener listener) { - String streamingUrl = String.format("%s/%s", BinanceApiConstants.WS_API_BASE_URL, channel); - Request request = new Request.Builder().url(streamingUrl).build(); - final WebSocket webSocket = client.newWebSocket(request, listener); - return () -> { - final int code = 1000; - listener.onClosing(webSocket, code, null); - webSocket.close(code, null); - listener.onClosed(webSocket, code, null); - }; - } + private Closeable createNewWebSocket(String channel, BinanceApiWebSocketListener listener) { + String streamingUrl = String.format("%s/%s", BinanceApiConstants.WS_API_BASE_URL, channel); + Request request = new Request.Builder().url(streamingUrl).build(); + final WebSocket webSocket = client.newWebSocket(request, listener); + return () -> { + final int code = 1000; + listener.onClosing(webSocket, code, null); + webSocket.close(code, null); + listener.onClosed(webSocket, code, null); + }; + } } From 8664140b2330453455bf5b31a8ffae3c2bb583f2 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Mon, 5 Feb 2018 11:50:39 +0000 Subject: [PATCH 3/4] Add error handling and closure of websockets to readme --- README.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/README.md b/README.md index abe0f7c6d..3228627e8 100644 --- a/README.md +++ b/README.md @@ -279,6 +279,40 @@ client.closeUserDataStream(listenKey); BinanceApiWebSocketClient client = BinanceApiClientFactory.newInstance().newWebSocketClient(); ``` +#### Handling web socket errors + +Each of the methods `BinanceApiWebSocketClient` which open a new web socket takes a `BinanceApiCallback`, which is +called for each event received from the Binance servers. + +The `BinanceApiCallback` interface also has a `onFailure(Throwable)` method, which, optionally, can be implemented to +receive notifications if the web-socket fails, e.g. disconnection. + +```java +client.onAggTradeEvent(symbol.toLowerCase(), new BinanceApiCallback() { + @Override + public void onResponse(final AggTradeEvent response) { + System.out.println(response); + } + + @Override + public void onFailure(final Throwable cause) { + System.err.println("Web socket failed"); + cause.printStackTrace(System.err); + } +}); +``` + +#### Closing web sockets + +Each of the methods `BinanceApiWebSocketClient` which open a new web socket also return a `Closeable`. +This `Closeable` can be used to close the underlying web socket and free any associated resources, e.g. + +```java +Closable ws = client.onAggTradeEvent("ethbtc", someCallback); +// some time later... +ws.close(); +``` + #### Listen for aggregated trade events for ETH/BTC ```java client.onAggTradeEvent("ethbtc", (AggTradeEvent response) -> { From 37ef9a05bf738c82584e20e428a9c5a6a1fbfd59 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Mon, 5 Feb 2018 16:11:51 +0000 Subject: [PATCH 4/4] code review improvements --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3228627e8..46e008f74 100644 --- a/README.md +++ b/README.md @@ -281,7 +281,7 @@ BinanceApiWebSocketClient client = BinanceApiClientFactory.newInstance().newWebS #### Handling web socket errors -Each of the methods `BinanceApiWebSocketClient` which open a new web socket takes a `BinanceApiCallback`, which is +Each of the methods on `BinanceApiWebSocketClient`, which opens a new web socket, takes a `BinanceApiCallback`, which is called for each event received from the Binance servers. The `BinanceApiCallback` interface also has a `onFailure(Throwable)` method, which, optionally, can be implemented to @@ -304,7 +304,7 @@ client.onAggTradeEvent(symbol.toLowerCase(), new BinanceApiCallback