Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Error handling and shutdown improvements #49

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,40 @@ client.closeUserDataStream(listenKey);
BinanceApiWebSocketClient client = BinanceApiClientFactory.newInstance().newWebSocketClient();
```

#### Handling web socket errors

Each of the methods on `BinanceApiWebSocketClient`, which opens a new web socket, takes a `BinanceApiCallback`, which is
called for each event received from the Binance servers.

The `BinanceApiCallback` interface also has a `onFailure(Throwable)` method, which, optionally, can be implemented to
receive notifications if the web-socket fails, e.g. disconnection.

```java
client.onAggTradeEvent(symbol.toLowerCase(), new BinanceApiCallback<AggTradeEvent>() {
@Override
public void onResponse(final AggTradeEvent response) {
System.out.println(response);
}

@Override
public void onFailure(final Throwable cause) {
System.err.println("Web socket failed");
cause.printStackTrace(System.err);
}
});
```

#### Closing web sockets

Each of the methods on `BinanceApiWebSocketClient`, which opens a new web socket, also returns a `Closeable`.
This `Closeable` can be used to close the underlying web socket and free any associated resources, e.g.

```java
Closable ws = client.onAggTradeEvent("ethbtc", someCallback);
// some time later...
ws.close();
```

#### Listen for aggregated trade events for ETH/BTC
```java
client.onAggTradeEvent("ethbtc", (AggTradeEvent response) -> {
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/binance/api/client/BinanceApiCallback.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package com.binance.api.client;

import com.binance.api.client.exception.BinanceApiException;

/**
* BinanceApiCallback is a functional interface used together with the BinanceApiAsyncClient to provide a non-blocking REST client.
*
* @param <T> the return type from the callback
*/
@FunctionalInterface
public interface BinanceApiCallback<T> {

/**
* Called whenever a response comes back from the Binance API.
*
* @param response the expected response object
* @throws BinanceApiException if it is not possible to obtain the expected response object (e.g. incorrect API-KEY).
*/
void onResponse(T response) throws BinanceApiException;
void onResponse(T response);

/**
* Called whenever an error occurs.
*
* @param cause the cause of the failure
*/
default void onFailure(Throwable cause) {}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,64 @@
package com.binance.api.client;

import java.util.List;
import com.binance.api.client.domain.event.AggTradeEvent;
import com.binance.api.client.domain.event.AllMarketTickersEvent;
import com.binance.api.client.domain.event.CandlestickEvent;
import com.binance.api.client.domain.event.DepthEvent;
import com.binance.api.client.domain.event.UserDataUpdateEvent;
import com.binance.api.client.domain.market.CandlestickInterval;

import java.io.Closeable;
import java.util.List;

/**
* Binance API data streaming façade, supporting streaming of events through web sockets.
*/
public interface BinanceApiWebSocketClient {
public interface BinanceApiWebSocketClient extends Closeable {

/**
* Open a new web socket to receive {@link DepthEvent depthEvents} on a callback.
*
* @param symbol the market symbol to subscribe to
* @param callback the callback to call on new events
* @return a {@link Closeable} that allows the underlying web socket to be closed.
*/
Closeable onDepthEvent(String symbol, BinanceApiCallback<DepthEvent> callback);

void onDepthEvent(String symbol, BinanceApiCallback<DepthEvent> callback);
/**
* Open a new web socket to receive {@link CandlestickEvent candlestickEvents} on a callback.
*
* @param symbol the market symbol to subscribe to
* @param interval the interval of the candles tick events required
* @param callback the callback to call on new events
* @return a {@link Closeable} that allows the underlying web socket to be closed.
*/
Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback<CandlestickEvent> callback);

void onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback<CandlestickEvent> callback);
/**
* Open a new web socket to receive {@link AggTradeEvent aggTradeEvents} on a callback.
*
* @param symbol the market symbol to subscribe to
* @param callback the callback to call on new events
* @return a {@link Closeable} that allows the underlying web socket to be closed.
*/
Closeable onAggTradeEvent(String symbol, BinanceApiCallback<AggTradeEvent> callback);

void onAggTradeEvent(String symbol, BinanceApiCallback<AggTradeEvent> callback);
/**
* Open a new web socket to receive {@link UserDataUpdateEvent userDataUpdateEvents} on a callback.
*
* @param listenKey the listen key to subscribe to.
* @param callback the callback to call on new events
* @return a {@link Closeable} that allows the underlying web socket to be closed.
*/
Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback<UserDataUpdateEvent> callback);

void onUserDataUpdateEvent(String listenKey, BinanceApiCallback<UserDataUpdateEvent> callback);
/**
* Open a new web socket to receive {@link AllMarketTickersEvent allMarketTickersEvents} on a callback.
*
* @param callback the callback to call on new events
* @return a {@link Closeable} that allows the underlying web socket to be closed.
*/
Closeable onAllMarketTickersEvent(BinanceApiCallback<List<AllMarketTickersEvent>> callback);

void onAllMarketTickersEvent(BinanceApiCallback<List<AllMarketTickersEvent>> callback);
void close();
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package com.binance.api.client.impl;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import com.binance.api.client.BinanceApiCallback;
import com.binance.api.client.BinanceApiWebSocketClient;
import com.binance.api.client.constant.BinanceApiConstants;
Expand All @@ -15,54 +9,66 @@
import com.binance.api.client.domain.event.DepthEvent;
import com.binance.api.client.domain.event.UserDataUpdateEvent;
import com.binance.api.client.domain.market.CandlestickInterval;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;

import java.io.Closeable;
import java.util.List;

/**
* Binance API WebSocket client implementation using OkHttp.
*/
public class BinanceApiWebSocketClientImpl implements BinanceApiWebSocketClient, Closeable {

private OkHttpClient client;

public BinanceApiWebSocketClientImpl() {
Dispatcher d = new Dispatcher();
d.setMaxRequestsPerHost(100);
this.client = new OkHttpClient.Builder().dispatcher(d).build();
}
private OkHttpClient client;

public void onDepthEvent(String symbol, BinanceApiCallback<DepthEvent> callback) {
final String channel = String.format("%s@depth", symbol);
createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class));
}
public BinanceApiWebSocketClientImpl() {
Dispatcher d = new Dispatcher();
d.setMaxRequestsPerHost(100);
this.client = new OkHttpClient.Builder().dispatcher(d).build();
}

@Override
public void onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback<CandlestickEvent> callback) {
final String channel = String.format("%s@kline_%s", symbol, interval.getIntervalId());
createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class));
}
public Closeable onDepthEvent(String symbol, BinanceApiCallback<DepthEvent> callback) {
final String channel = String.format("%s@depth", symbol);
return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, DepthEvent.class));
}

public void onAggTradeEvent(String symbol, BinanceApiCallback<AggTradeEvent> callback) {
final String channel = String.format("%s@aggTrade", symbol);
createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class));
}
@Override
public Closeable onCandlestickEvent(String symbol, CandlestickInterval interval, BinanceApiCallback<CandlestickEvent> callback) {
final String channel = String.format("%s@kline_%s", symbol, interval.getIntervalId());
return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, CandlestickEvent.class));
}

public void onUserDataUpdateEvent(String listenKey, BinanceApiCallback<UserDataUpdateEvent> callback) {
createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class));
}
public Closeable onAggTradeEvent(String symbol, BinanceApiCallback<AggTradeEvent> callback) {
final String channel = String.format("%s@aggTrade", symbol);
return createNewWebSocket(channel, new BinanceApiWebSocketListener<>(callback, AggTradeEvent.class));
}

public void onAllMarketTickersEvent(BinanceApiCallback<List<AllMarketTickersEvent>> callback) {
final String channel = "!ticker@arr";
createNewWebSocket(channel, new BinanceApiWebSocketListener<List<AllMarketTickersEvent>>(callback));
}
public Closeable onUserDataUpdateEvent(String listenKey, BinanceApiCallback<UserDataUpdateEvent> callback) {
return createNewWebSocket(listenKey, new BinanceApiWebSocketListener<>(callback, UserDataUpdateEvent.class));
}

private void createNewWebSocket(String channel, BinanceApiWebSocketListener<?> listener) {
String streamingUrl = String.format("%s/%s", BinanceApiConstants.WS_API_BASE_URL, channel);
Request request = new Request.Builder().url(streamingUrl).build();
client.newWebSocket(request, listener);
}
public Closeable onAllMarketTickersEvent(BinanceApiCallback<List<AllMarketTickersEvent>> callback) {
final String channel = "!ticker@arr";
return createNewWebSocket(channel, new BinanceApiWebSocketListener<List<AllMarketTickersEvent>>(callback));
}

@Override
public void close() throws IOException {
client.dispatcher().executorService().shutdown();
}
@Override
public void close() {
client.dispatcher().executorService().shutdown();
}

private Closeable createNewWebSocket(String channel, BinanceApiWebSocketListener<?> listener) {
String streamingUrl = String.format("%s/%s", BinanceApiConstants.WS_API_BASE_URL, channel);
Request request = new Request.Builder().url(streamingUrl).build();
final WebSocket webSocket = client.newWebSocket(request, listener);
return () -> {
final int code = 1000;
listener.onClosing(webSocket, code, null);
webSocket.close(code, null);
listener.onClosed(webSocket, code, null);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class BinanceApiWebSocketListener<T> extends WebSocketListener {

private TypeReference<T> eventTypeReference;

private boolean closing = false;

public BinanceApiWebSocketListener(BinanceApiCallback<T> callback, Class<T> eventClass) {
this.callback = callback;
this.eventClass = eventClass;
Expand All @@ -46,8 +48,15 @@ public void onMessage(WebSocket webSocket, String text) {
}
}

@Override
public void onClosing(final WebSocket webSocket, final int code, final String reason) {
closing = true;
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
throw new BinanceApiException(t);
if (!closing) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should still be some sort of notification to inform about the closure, because a regular API user might not see the closure of the socket, leaving him wondering why his application no longer works after a time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, without this, when the user calls close on the web socket, they user would receive a EOFException on this onFailure callback. This is not intuitive IMHO.

The flag stops the EOFException that occurs in response to the user's close request.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea but if the user does not close it and does not check onFailure, then the socket closes unnoticed.

callback.onFailure(t);
}
}
}