Skip to content

Commit

Permalink
Merge pull request #4700 from rizer1980/binance_implement_channel_ina…
Browse files Browse the repository at this point in the history
…ctive_listener

[binance] implement channel inactive handler
  • Loading branch information
timmolter authored Nov 13, 2023
2 parents 374c78f + aef519b commit b7dfec0
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package info.bitrich.xchangestream.binance;

import static java.util.Collections.emptyMap;

import info.bitrich.xchangestream.binance.BinanceUserDataChannel.NoActiveChannelException;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel.State;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import info.bitrich.xchangestream.util.Events;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
Expand All @@ -17,14 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;

public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {

private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class);
Expand Down Expand Up @@ -322,4 +322,14 @@ public void enableLiveSubscription() {
public void disableLiveSubscription() {
if (this.streamingService != null) this.streamingService.disableLiveSubscription();
}

/**
* Enables the user to listen on channel inactive events and react appropriately.
*
* @param channelInactiveHandler a WebSocketMessageHandler instance.
*/
public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
streamingService.setChannelInactiveHandler(channelInactiveHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -20,6 +20,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinanceStreamingService extends JsonNettyStreamingService {

Expand All @@ -32,6 +34,9 @@ public class BinanceStreamingService extends JsonNettyStreamingService {
private final KlineSubscription klineSubscription;

private boolean isLiveSubscriptionEnabled = false;

private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;

private final Map<Integer, BinanceWebSocketSubscriptionMessage> liveSubscriptionMessage =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -236,4 +241,39 @@ public void unsubscribeChannel(final String channelId) {
}
}
}

@Override
protected WebSocketClientHandler getWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
LOGGER.info("Registering BinanceWebSocketClientHandler");
return new BinanceWebSocketClientHandler(handshake, handler);
}

public void setChannelInactiveHandler(
WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
this.channelInactiveHandler = channelInactiveHandler;
}

/**
* Custom client handler in order to execute an external, user-provided handler on channel events.
*/
class BinanceWebSocketClientHandler extends NettyWebSocketClientHandler {

public BinanceWebSocketClientHandler(
WebSocketClientHandshaker handshake, WebSocketMessageHandler handler) {
super(handshake, handler);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
if (channelInactiveHandler != null)
channelInactiveHandler.onMessage("WebSocket Client disconnected!");
}
}
}

0 comments on commit b7dfec0

Please sign in to comment.