Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE (For discussion only) Merging XChange-Stream with XChange. Addresses #3220 #3436

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
34 changes: 34 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,28 @@
<module>xchange-yobit</module>
<module>xchange-zaif</module>
<module>xchange-enigma</module>
<module>xchange-stream-core</module>
<module>xchange-stream-service-pubnub</module>
<module>xchange-stream-service-pusher</module>
<module>xchange-stream-service-netty</module>
<module>xchange-stream-service-wamp</module>
<module>xchange-stream-service-core</module>
<module>xchange-stream-bitstamp</module>
<module>xchange-stream-cexio</module>
<module>xchange-stream-okcoin</module>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead name all these modules like xchange-okcoin-stream instead? That way they will end up in our IDEs grouped together better, i.e. xchange-core will be next to xchange-core-stream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timmolter We've already renamed using the xchange-stream prefix. Can we perhaps aim for a merge first and then rename prior to the next XChange release? That way I can try and put together the merge sooner rather than later (there are quite a few people screaming for it ;))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we can do the merge first like you suggest. No problem.

<module>xchange-stream-poloniex</module>
<module>xchange-stream-coinmate</module>
<module>xchange-stream-coinbasepro</module>
<module>xchange-stream-bitfinex</module>
<module>xchange-stream-bitmex</module>
<module>xchange-stream-poloniex2</module>
<module>xchange-stream-bitflyer</module>
<module>xchange-stream-gemini</module>
<module>xchange-stream-binance</module>
<module>xchange-stream-hitbtc</module>
<module>xchange-stream-bankera</module>
<module>xchange-stream-kraken</module>
<module>xchange-stream-lgo</module>
</modules>

<repositories>
Expand Down Expand Up @@ -267,13 +289,20 @@
<version>28.2-jre</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
Expand Down Expand Up @@ -320,6 +349,11 @@
<artifactId>java-jwt</artifactId>
<version>${version.java-jwt}</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.18</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
30 changes: 30 additions & 0 deletions xchange-stream-bankera/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-parent</artifactId>
<version>4.4.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>xchange-stream-bankera</artifactId>
<name>XChange Streaming Bankera</name>
<description>XChange streaming implementation for the Bankera exchange</description>
<dependencies>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-stream-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-stream-service-netty</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-bankera</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.knowm.xchange.stream.bankera;

import org.knowm.xchange.stream.core.ProductSubscription;
import org.knowm.xchange.stream.core.StreamingExchange;
import org.knowm.xchange.stream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bankera.BankeraExchange;
import org.knowm.xchange.bankera.service.BankeraMarketDataService;

public class BankeraStreamingExchange extends BankeraExchange implements StreamingExchange {

private static final String WS_URI = "wss://api-exchange.bankera.com/ws";
private BankeraStreamingService streamingService;
private BankeraStreamingMarketDataService streamingMarketDataService;

public BankeraStreamingExchange() {
this.streamingService = new BankeraStreamingService(WS_URI);
}

@Override
protected void initServices() {
super.initServices();
streamingMarketDataService =
new BankeraStreamingMarketDataService(
streamingService, (BankeraMarketDataService) marketDataService);
}

@Override
public Completable connect(ProductSubscription... args) {
return streamingService.connect();
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
}

@Override
public boolean isAlive() {
return streamingService.isSocketOpen();
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

@Override
public ExchangeSpecification getDefaultExchangeSpecification() {
ExchangeSpecification spec = super.getDefaultExchangeSpecification();
spec.setShouldLoadRemoteMetaData(false);

return spec;
}

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public void useCompressedMessages(boolean compressedMessages) {
streamingService.useCompressedMessages(compressedMessages);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.knowm.xchange.stream.bankera;

import org.knowm.xchange.stream.core.StreamingMarketDataService;
import io.reactivex.Observable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import org.knowm.xchange.bankera.BankeraAdapters;
import org.knowm.xchange.bankera.dto.BankeraException;
import org.knowm.xchange.bankera.dto.marketdata.*;
import org.knowm.xchange.bankera.service.BankeraMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotAvailableFromExchangeException;

public class BankeraStreamingMarketDataService implements StreamingMarketDataService {

private final BankeraStreamingService service;
private final BankeraMarketDataService marketDataService;

public BankeraStreamingMarketDataService(
BankeraStreamingService service, BankeraMarketDataService marketDataService) {
this.service = service;
this.marketDataService = marketDataService;
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
BankeraMarket market = getMarketInfo(currencyPair);
return service
.subscribeChannel("market-orderbook", market.getId())
.map(
o -> {
List<BankeraOrderBook.OrderBookOrder> listBids = new ArrayList<>();
List<BankeraOrderBook.OrderBookOrder> listAsks = new ArrayList<>();
o.get("data")
.get("bids")
.forEach(
b ->
listBids.add(
new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
o.get("data")
.get("asks")
.forEach(
b ->
listAsks.add(
new BankeraOrderBook.OrderBookOrder(
0, b.get("price").asText(), b.get("amount").asText())));
return BankeraAdapters.adaptOrderBook(
new BankeraOrderBook(listBids, listAsks), currencyPair);
});
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
throw new NotAvailableFromExchangeException();
}

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
BankeraMarket market = getMarketInfo(currencyPair);
return service
.subscribeChannel("market-trade", market.getId())
.map(
t ->
new Trade.Builder()
.currencyPair(currencyPair)
.id("-1")
.price(new BigDecimal(t.get("data").get("price").asText()))
.originalAmount(new BigDecimal(t.get("data").get("amount").asText()))
.timestamp(new Date(t.get("data").get("time").asLong()))
.type(
t.get("data").get("side").asText().equals("SELL")
? Order.OrderType.ASK
: Order.OrderType.BID)
.build());
}

private BankeraMarket getMarketInfo(CurrencyPair currencyPair) {
try {
BankeraMarketInfo info = this.marketDataService.getMarketInfo();
Optional<BankeraMarket> market =
info.getMarkets().stream()
.filter(m -> m.getName().equals(currencyPair.toString().replace("/", "-")))
.findFirst();

if (market.isPresent()) {
return market.get();
}
throw new BankeraException(404, "Unable to find market.");
} catch (IOException e) {
throw new BankeraException(404, "Unable to find market.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.knowm.xchange.stream.bankera;

import com.fasterxml.jackson.databind.JsonNode;
import org.knowm.xchange.stream.bankera.dto.BankeraWebSocketSubscriptionMessage;
import org.knowm.xchange.stream.service.netty.JsonNettyStreamingService;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import java.io.IOException;

public class BankeraStreamingService extends JsonNettyStreamingService {

public BankeraStreamingService(String uri) {
super(uri, Integer.MAX_VALUE);
}

@Override
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
return message.get("type").asText();
}

@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
if (args.length != 1) throw new IOException("SubscribeMessage: Insufficient arguments");
BankeraWebSocketSubscriptionMessage subscribeMessage =
new BankeraWebSocketSubscriptionMessage(String.valueOf(args[0]));
return objectMapper.writeValueAsString(subscribeMessage);
}

@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
return null;
}

@Override
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.knowm.xchange.stream.bankera.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public class BankeraWebSocketSubscriptionMessage {

@JsonProperty("e")
private String event;

@JsonProperty("marketId")
private String marketId;

@JsonProperty("chartInterval")
private String chartInterval;

public BankeraWebSocketSubscriptionMessage(String marketId) {
this.event = "market";
this.marketId = marketId;
this.chartInterval = "1m";
}

public BankeraWebSocketSubscriptionMessage(String marketId, String chartInterval) {
this.event = "market";
this.marketId = marketId;
this.chartInterval = chartInterval;
}

public String getEvent() {
return event;
}

public String getMarketId() {
return marketId;
}

public String getChartInterval() {
return chartInterval;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.knowm.xchange.stream.bankera;

import org.knowm.xchange.stream.core.StreamingExchange;
import org.knowm.xchange.stream.core.StreamingExchangeFactory;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BankeraManualExample {
private static final Logger LOGGER = LoggerFactory.getLogger(BankeraManualExample.class);

public static void main(String[] args) {
StreamingExchange exchange =
StreamingExchangeFactory.INSTANCE.createExchange(BankeraStreamingExchange.class.getName());

exchange.connect().blockingAwait();
exchange
.getStreamingMarketDataService()
.getOrderBook(CurrencyPair.ETH_BTC)
.subscribe(
orderBook -> LOGGER.debug("ORDERBOOK: {}", orderBook.toString()),
throwable -> LOGGER.error("ERROR in getting order book: ", throwable));

exchange
.getStreamingMarketDataService()
.getTrades(CurrencyPair.ETH_BTC)
.subscribe(
trade -> LOGGER.debug("TRADES: {}", trade.toString()),
throwable -> LOGGER.error("ERROR in getting trade ", throwable));

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}

exchange.disconnect().subscribe(() -> LOGGER.info("Disconnected"));
}
}
Loading