Skip to content

Commit

Permalink
Merge pull request #3 from ManfredKarrer/add-error-handlers
Browse files Browse the repository at this point in the history
Add error handlers
  • Loading branch information
ManfredKarrer authored Dec 9, 2018
2 parents 17e870a + dd1d8f7 commit 065d378
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 40 deletions.
20 changes: 20 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
root = true

[*]
indent_style = space
indent_size = 4
continuation_indent_size = 8
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.bat]
end_of_line = crlf

[build.gradle]
continuation_indent_size = 4

[.idea/codeStyles/*.xml]
indent_size = 2
insert_final_newline = false
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>network.bisq.btcd-cli4j</groupId>
<artifactId>btcd-cli4j-parent</artifactId>
<version>0.5.8.3</version>
<version>0.5.8.4</version>
</parent>
<artifactId>btcd-cli4j-core</artifactId>
<packaging>jar</packaging>
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/com/neemre/btcdcli4j/core/BtcdCli4jVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package com.neemre.btcdcli4j.core;

public class BtcdCli4jVersion {
public static final String VERSION = "0.5.8.4";
}
12 changes: 11 additions & 1 deletion daemon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>network.bisq.btcd-cli4j</groupId>
<artifactId>btcd-cli4j-parent</artifactId>
<version>0.5.8.3</version>
<version>0.5.8.4</version>
</parent>
<artifactId>btcd-cli4j-daemon</artifactId>
<packaging>jar</packaging>
Expand All @@ -25,5 +25,15 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.neemre.btcdcli4j.daemon;

import com.google.common.util.concurrent.*;
import com.neemre.btcdcli4j.core.BitcoindException;
import com.neemre.btcdcli4j.core.CommunicationException;
import com.neemre.btcdcli4j.core.NodeProperties;
Expand All @@ -11,12 +12,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class BtcdDaemonImpl implements BtcdDaemon {

Expand All @@ -25,16 +27,25 @@ public class BtcdDaemonImpl implements BtcdDaemon {
private DaemonConfigurator configurator;
private Map<Notifications, NotificationMonitor> monitors;
private Map<Notifications, Future<Void>> futures;
private ExecutorService monitorPool;
private ListeningExecutorService monitorPool;

private BtcdClient client;

@Nullable
private Consumer<Throwable> errorHandler;


public BtcdDaemonImpl() {
this(new Properties());
}

public BtcdDaemonImpl(BtcdClient btcdProvider) throws BitcoindException, CommunicationException {
this(btcdProvider, null);
}

public BtcdDaemonImpl(BtcdClient btcdProvider, @Nullable Consumer<Throwable> errorHandler)
throws BitcoindException, CommunicationException {
this.errorHandler = errorHandler;
initialize();
this.client = configurator.checkBtcdProvider(btcdProvider);
buildMonitors(configurator.checkNodeConfig(client.getNodeConfig()));
Expand Down Expand Up @@ -166,24 +177,48 @@ private void initialize() {

private void buildMonitors(Properties nodeConfig) {
int alertPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.ALERT_PORT.getKey()));
NotificationMonitor alertNotificationMonitor = new NotificationMonitor(Notifications.ALERT, alertPort, null);
NotificationMonitor alertNotificationMonitor = new NotificationMonitor(Notifications.ALERT, alertPort, null,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.ALERT, alertNotificationMonitor);

int blockPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.BLOCK_PORT.getKey()));
NotificationMonitor blockNotificationMonitor = new NotificationMonitor(Notifications.BLOCK, blockPort, client);
NotificationMonitor blockNotificationMonitor = new NotificationMonitor(Notifications.BLOCK, blockPort, client,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.BLOCK, blockNotificationMonitor);

int walletPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.WALLET_PORT.getKey()));
NotificationMonitor walletNotificationMonitor = new NotificationMonitor(Notifications.WALLET, walletPort, client);
NotificationMonitor walletNotificationMonitor = new NotificationMonitor(Notifications.WALLET, walletPort, client,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.WALLET, walletNotificationMonitor);
monitorPool = Executors.newFixedThreadPool(monitors.size());

monitorPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(monitors.size()));
}

private void startMonitors() {
for (Notifications notificationType : monitors.keySet()) {
NotificationMonitor monitor = monitors.get(notificationType);
futures.put(notificationType, monitorPool.submit(monitor, (Void) null));

ListenableFuture<Void> future = monitorPool.submit(monitor);
futures.put(notificationType, future);

Futures.addCallback(future, new FutureCallback<Void>() {
public void onSuccess(Void ignore) {
}

public void onFailure(Throwable throwable) {
if (errorHandler != null)
errorHandler.accept(throwable);
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = false)
public class NotificationHandlerException extends RuntimeException {
public class NotificationHandlerException extends Exception {

private static final long serialVersionUID = 1L;

private final Errors error;
private int code;


public NotificationHandlerException(Errors error) {
this(error, Constants.STRING_EMPTY);
}

public NotificationHandlerException(Errors error, String additionalMsg) {
super(error.getDescription() + additionalMsg);
this.error = error;
code = error.getCode();
}

public NotificationHandlerException(Errors error, Exception cause) {
public NotificationHandlerException(Errors error, Throwable cause) {
super(error.getDescription(), cause);
this.error = error;
code = error.getCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.neemre.btcdcli4j.daemon.notification;

import com.google.common.util.concurrent.*;
import com.neemre.btcdcli4j.core.client.BtcdClient;
import com.neemre.btcdcli4j.core.common.Constants;
import com.neemre.btcdcli4j.core.common.Errors;
Expand All @@ -11,17 +12,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class NotificationMonitor extends Observable implements Observer, Runnable {
public class NotificationMonitor extends Observable implements Observer, Callable<Void> {

private static final Logger LOG = LoggerFactory.getLogger(NotificationMonitor.class);
private static final int WORKER_MIN_COUNT = 1;
Expand All @@ -34,45 +38,73 @@ public class NotificationMonitor extends Observable implements Observer, Runnabl
private int serverPort;
private ServerSocket serverSocket;
private volatile boolean isActive;
private ThreadPoolExecutor workerPool;

@Nullable
private BtcdClient client;
@Nullable
private Consumer<Throwable> errorHandler;

private ThreadPoolExecutor executor;
private ListeningExecutorService workerPool;

public NotificationMonitor(Notifications type, int serverPort, BtcdClient client) {
public NotificationMonitor(Notifications type, int serverPort, @Nullable BtcdClient client) {
this(type, serverPort, client, null);
}

public NotificationMonitor(Notifications type, int serverPort, @Nullable BtcdClient client,
@Nullable Consumer<Throwable> errorHandler) {
LOG.info("** NotificationMonitor(): launching new '{}' notification monitor (port: '{}', "
+ "RPC-capable: '{}')", type.name(), serverPort, ((client == null) ? "no" : "yes"));
this.errorHandler = errorHandler;
this.type = type;
this.serverPort = serverPort;
this.client = client;
}

@Override
public void run() {
public Void call() throws NotificationHandlerException {
activate();

LOG.info("-- run(..): started listening for '{}' notifications on port '{}'", type.name(),
serverSocket.getLocalPort());
while (isActive) {
try {
Socket socket = serverSocket.accept();
NotificationWorker worker = NotificationWorkerFactory.createWorker(type, socket, client);
worker.addObserver(this);
workerPool.submit(worker);

ListenableFuture<Void> future = workerPool.submit(worker);

Futures.addCallback(future, new FutureCallback<Void>() {
public void onSuccess(Void ignore) {
}

public void onFailure(Throwable throwable) {
if (errorHandler != null)
errorHandler.accept(throwable);
}
});

LOG.trace("-- run(..): total no. of '{}' notifications received: '{}', task queue "
+ "occupancy: '{}/{}'", type.name(), workerPool.getTaskCount(),
workerPool.getQueue().size(), TASK_QUEUE_LENGTH);
+ "occupancy: '{}/{}'", type.name(), executor.getTaskCount(),
executor.getQueue().size(), TASK_QUEUE_LENGTH);

} catch (SocketTimeoutException e) {
LOG.trace("-- run(..): polling '{}' notification monitor for interrupts (socket idle "
+ "for {}ms)", type.name(), IDLE_SOCKET_TIMEOUT);
} catch (IOException e) {
Thread.currentThread().interrupt();
throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, e);
} catch (Throwable e) {
Thread.currentThread().interrupt();
throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, e);
} finally {
if (Thread.interrupted()) {
deactivate();
}
}
}
return null;
}

@Override
Expand All @@ -88,7 +120,7 @@ public boolean isActive() {
return isActive;
}

private void activate() {
private void activate() throws NotificationHandlerException {
Thread.currentThread().setName(getUniqueName());
isActive = true;
try {
Expand All @@ -105,8 +137,12 @@ private void activate() {
throw new NotificationHandlerException(Errors.IO_SERVERSOCKET_UNINITIALIZED, e);
}
}
workerPool = new ThreadPoolExecutor(WORKER_MIN_COUNT, WORKER_MAX_COUNT, IDLE_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(TASK_QUEUE_LENGTH));

executor = new ThreadPoolExecutor(WORKER_MIN_COUNT, WORKER_MAX_COUNT, IDLE_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(TASK_QUEUE_LENGTH));
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler((r, e) -> LOG.error("RejectedExecutionHandler called"));
workerPool = MoreExecutors.listeningDecorator(executor);
}

private void deactivate() {
Expand All @@ -127,4 +163,4 @@ private String getUniqueName() {
return "NotificationMonitor[" + StringUtils.capitalize(type.name().toLowerCase()) + "]-"
+ StringUtils.random(4, Constants.DECIMAL_DIGITS);
}
}
}
Loading

0 comments on commit 065d378

Please sign in to comment.