Skip to content

Commit

Permalink
Add ChainTipPublisher class, be a little lazier
Browse files Browse the repository at this point in the history
* Add ChainTipPublisher class (to deal with type erasure in DI, etc.)
* RxBitcoinClient: don't always wait for connection in initChainTipService(),
  actively connect if useZmq, lazily wait if not useZmq
* TxOutSetService: require ChainTipPublisher in constructor, this allows the
  service to be passive/lazy depending upon the ChainTipPublisher implementation.
* ChainTipPublishers: static method never() returns do-nothing Publisher
  • Loading branch information
msgilligan committed Oct 6, 2023
1 parent beaf95f commit 8f61bb0
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
import org.consensusj.bitcoin.jsonrpc.ChainTipClient;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.ChainTipService;
import org.consensusj.jsonrpc.JsonRpcStatusException;
import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
Expand Down Expand Up @@ -63,9 +64,9 @@ public synchronized void start() {
}

@Override
public Publisher<ChainTip> chainTipPublisher() {
public ChainTipPublisher chainTipPublisher() {
start();
return chainTipProcessor;
return ChainTipPublisher.of(chainTipProcessor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
import io.reactivex.rxjava3.core.Flowable;
import org.consensusj.bitcoin.jsonrpc.BitcoinExtendedClient;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.ChainTipService;
import org.consensusj.bitcoin.rx.zeromq.RxBitcoinZmqService;
import org.consensusj.jsonrpc.JsonRpcTransport;
Expand Down Expand Up @@ -51,8 +52,9 @@ public RxBitcoinClient(SSLContext sslContext, Network network, URI server, Strin

private void initChainTipService(Duration timeout) {
if (chainTipService == null) {
this.waitForConnected().orTimeout(timeout.toSeconds(), TimeUnit.SECONDS).join();
if (useZmq) {
log.warn("(useZmq enabled) Initiating server connection (with timeout of {} seconds)", timeout.toSeconds());
this.connectToServer(timeout).join();
chainTipService = new RxBitcoinZmqService(this);
} else {
chainTipService = new PollingChainTipServiceImpl(this);
Expand All @@ -77,8 +79,8 @@ public <RSLT> Publisher<RSLT> pollOnNewBlockAsync(Supplier<CompletionStage<RSLT>
* @return a publisher of Chain Tips
*/
@Override
public Publisher<ChainTip> chainTipPublisher() {
public ChainTipPublisher chainTipPublisher() {
initChainTipService(Duration.ofMinutes(60));
return Flowable.fromPublisher(chainTipService.chainTipPublisher());
return chainTipService.chainTipPublisher();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static void main(String[] args) throws InterruptedException {
boolean useZmq = true;

try ( RxBitcoinClient client = new RxBitcoinClient(network, rpcUri, rpcUser, rpcPassword, useZmq);
TxOutSetService txOutSetService = new TxOutSetService(client) ) {
TxOutSetService txOutSetService = new TxOutSetService(client, client.chainTipPublisher()) ) {

client.connectToServer(Duration.ofMinutes(5)).join();
// Subscribe to ChainTips
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import org.bitcoinj.base.Sha256Hash;
import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.consensusj.bitcoin.json.pojo.TxOutSetInfo;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient;
import org.consensusj.jsonrpc.DefaultRpcClient;
import org.consensusj.jsonrpc.AsyncSupport;
import org.consensusj.jsonrpc.DefaultRpcClient;
import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,22 +30,29 @@
public class TxOutSetService implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TxOutSetService.class);
private final RxBitcoinClient client;
private final ChainTipPublisher chainTipPublisher;
private final FlowableProcessor<TxOutSetInfo> txOutSetProcessor = BehaviorProcessor.create();
private final int CACHE_BLOCK_DEPTH = 1;
private final int MAX_OUTSTANDING_CALLS = 2;
private final AtomicInteger outstandingCalls = new AtomicInteger(0);
// TODO: Combine lastCall with cache and with in-memory SPV blockchain
private CompletableFuture<TxOutSetInfo> lastCall;
private final ConcurrentHashMap<Sha256Hash, TxOutSetInfo> txOutSetCache = new ConcurrentHashMap<>();
private Disposable txOutSetSubscription;
private volatile Disposable txOutSetSubscription;

public TxOutSetService(RxBitcoinClient client) {
// TODO: Use BitcoinClient not RxBitcoinClient
/**
* @param client Client for requesting {@link TxOutSetInfo} objects.
* @param chainTipPublisher Publisher telling us when new blocks are available.
*/
public TxOutSetService(RxBitcoinClient client, ChainTipPublisher chainTipPublisher) {
this.client = client;
this.chainTipPublisher = chainTipPublisher;
}

private synchronized void start() {
if (txOutSetSubscription == null) {
txOutSetSubscription = Flowable.fromPublisher(client.chainTipPublisher())
txOutSetSubscription = Flowable.fromPublisher(chainTipPublisher)
.doOnNext(this::onNewBlock)
.flatMap(this::fetchCacheMaybe)
.subscribe(txOutSetProcessor::onNext, txOutSetProcessor::onError, txOutSetProcessor::onComplete);
Expand Down Expand Up @@ -76,6 +82,7 @@ private void onNewBlock(ChainTip tip) {
(t) -> log.warn("Ignoring transient error: ", t) // Log if ignored
);

// TODO: Convert to using CompletableFuture to drop dependency on RxBitcoinClient and RxJava
/**
* Try to fetch a TxOutSetInfo from the cache or from the network. Returning an empty stream if a transient error occurs.
* To use this you will typically call {@link Flowable#flatMap(io.reactivex.rxjava3.functions.Function)}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.consensusj.bitcoin.rx.jsonrpc.test;

import io.reactivex.rxjava3.core.Flowable;
import org.consensusj.bitcoin.rx.ChainTipPublisher;

/**
* Useful for testing.
*/
public class TestChainTipPublishers {
/**
* @return A {@link ChainTipPublisher} that never emits items and never closes.
*/
public static ChainTipPublisher never() {
return ChainTipPublisher.of(Flowable.never());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.bitcoinj.base.Sha256Hash;
import org.bitcoinj.core.Transaction;
import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
import org.consensusj.bitcoin.rx.ChainTipPublisher;
import org.consensusj.bitcoin.rx.ChainTipService;
import org.consensusj.bitcoin.rx.jsonrpc.RxBitcoinClient;
import org.consensusj.bitcoin.rx.RxBlockchainService;
Expand Down Expand Up @@ -80,8 +81,8 @@ public Publisher<Integer> blockHeightPublisher() {
}

@Override
public Publisher<ChainTip> chainTipPublisher() {
return flowableChainTip;
public ChainTipPublisher chainTipPublisher() {
return ChainTipPublisher.of(flowableChainTip);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
package org.consensusj.bitcoin.rx;/**
*
package org.consensusj.bitcoin.rx;

import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/**
* Marker type for {@code Publisher<ChainTip>}. In a future release this may use {@link java.util.concurrent.Flow.Publisher}.
* Because of type erasure in Java generics we need this to strongly type parameters that require a {@code Publisher<ChainTip>}.
*/
public class ChainTipPublisher {
public interface ChainTipPublisher extends Publisher<ChainTip> {
/**
* Adapt a {@code Publisher<ChainTip}
* @param publisher to wrap
* @return wrapped publisher
*/
static ChainTipPublisher of(Publisher<ChainTip> publisher) {
return new Wrapper(publisher);
}

class Wrapper implements ChainTipPublisher {
private final Publisher<ChainTip> publisher;

Wrapper(Publisher<ChainTip> publisher) {
this.publisher = publisher;
}

@Override
public void subscribe(Subscriber<? super ChainTip> s) {
publisher.subscribe(s);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public interface ChainTipService {
*
* @return A Publisher for the sequence
*/
Publisher<ChainTip> chainTipPublisher();
ChainTipPublisher chainTipPublisher();
}

0 comments on commit 8f61bb0

Please sign in to comment.