Skip to content

Commit

Permalink
queue-server: Make the completion ack timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrossie committed Jul 8, 2021
1 parent 5302938 commit 1999ba3
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public class StandaloneNotificationQueueHandler implements NotificationQueueHandler {

private static final Logger logger = LoggerFactory.getLogger(StandaloneNotificationQueueHandler.class);
private static final long WAIT_ACK_TO_SEC = 3;
private final long ackTimeSec;

private static final List<Period> RETRY_SCHEDULE = Arrays.asList(
Period.minutes(1),
Expand All @@ -56,10 +56,12 @@ public class StandaloneNotificationQueueHandler implements NotificationQueueHand
private final Map<String, ServerCallStreamObserver<EventMsg>> observers;
private final Map<String, CompletionSuccess> dispatchedEvents;

public StandaloneNotificationQueueHandler() {
public StandaloneNotificationQueueHandler(long ackTimeSec) {
logger.info("StandaloneNotificationQueueHandler configured with ackTime value = %d sec", ackTimeSec);

This comment has been minimized.

Copy link
@pierre

pierre Jul 8, 2021

Member

Don't you need {} instead? IDEA should have warned you 🤔

This comment has been minimized.

Copy link
@sbrossie

sbrossie Jul 8, 2021

Author Member

I will never learn... and switching from go to java is not helping... Fixed in df06477

this.lock = new ReentrantLock();
this.observers = new HashMap<>();
this.dispatchedEvents = new HashMap();
this.ackTimeSec = ackTimeSec;
}

public void notifyEventCompletion(final String userToken, final boolean success) {
Expand Down Expand Up @@ -181,7 +183,7 @@ public void handleReadyNotification(final NotificationEvent inputEvent, final Da
}
foundValidObs = true;
try {
complSuccess = new CompletionSuccess(userTokenStr);
complSuccess = new CompletionSuccess(userTokenStr, ackTimeSec);
dispatchedEvents.put(userTokenStr, complSuccess);
obs.onNext(event);
// Break after first success
Expand Down Expand Up @@ -214,11 +216,13 @@ private static class CompletionSuccess {

private final String userToken;
private final CountDownLatch latch;
private final long ackTimeSec;
private boolean status;

public CompletionSuccess(final String userToken) {
public CompletionSuccess(final String userToken, final long ackTimeSec) {
this.userToken = userToken;
this.latch = new CountDownLatch(1);
this.ackTimeSec = ackTimeSec;
}

public void notify(boolean status) {
Expand All @@ -228,9 +232,9 @@ public void notify(boolean status) {

public void waitForCompletion() throws QueueRetryException {
try {
final boolean res = latch.await(WAIT_ACK_TO_SEC, TimeUnit.SECONDS);
final boolean res = latch.await(ackTimeSec, TimeUnit.SECONDS);
if (!res) {
logger.warn("Thread waiting for event userToken={} timed out after {} seconds", userToken, WAIT_ACK_TO_SEC);
logger.warn("Thread waiting for event userToken={} timed out after {} seconds", userToken, ackTimeSec);
throw new QueueRetryException(RETRY_SCHEDULE);
} else if (!status) {
logger.info("Client Nack for userToken={}", userToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public RetryableNotificationQueueService(final NotificationQueueService notifica
public StandaloneQueueNotification(final String jdbcConnection,
final String dbUsername,
final String dbPassword,
final Long ackTimeSec,
final NotificationQueueConfig config) throws NotificationQueueAlreadyExists {
super(config, jdbcConnection, dbUsername, dbPassword);

this.notificationQueueHandler = new StandaloneNotificationQueueHandler();
this.notificationQueueHandler = new StandaloneNotificationQueueHandler(ackTimeSec);
this.notificationQueueService = new DefaultNotificationQueueService(dbi, clock, (NotificationQueueConfig) queueConfig, metricRegistry);
this.retryableQueueService = new RetryableNotificationQueueService(notificationQueueService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ConfigModel {
static final String PROP_DATASTORE_PASSWORD = "QUEUE_DATASTORE_PASSWORD";

static final String PROP_APP_PORT = "QUEUE_APP_PORT";
static final String PROP_APP_ACK_TIME_SEC = "APP_ACK_TIME_SEC";

private static final String NOTIFICATION_PREFIX = "org.killbill.notificationq.main.";

Expand Down Expand Up @@ -80,6 +81,7 @@ public static class App {
private Integer port;
private Integer nbThreads;
private Boolean recycleTcpConn;
private Long ackTimeSec;

public App() {
}
Expand All @@ -105,6 +107,19 @@ public int getNbThreads() {
public boolean getRecycleTcpConn() {
return recycleTcpConn;
}

public long getAckTimeSec() {
return ackTimeSec;
}

public void setAckTimeSec(final Long ackTimeSec) {
final Long oAckTimeSec = fromSystemProperty(PROP_APP_ACK_TIME_SEC, Long.valueOf(0));
if (oAckTimeSec != null) {
this.ackTimeSec = oAckTimeSec;
} else {
this.ackTimeSec = ackTimeSec;
}
}
}


Expand Down Expand Up @@ -213,6 +228,8 @@ private static <T> T fromSystemProperty(final String prop, final T dummy) {
}
if (dummy instanceof Integer) {
return (T) Integer.valueOf(value);
} else if (dummy instanceof Long) {
return (T) Long.valueOf(value);
} else if (dummy instanceof String) {
return (T) value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;

public class QueueServer {

private static final Logger logger = LoggerFactory.getLogger(QueueServer.class);
Expand Down Expand Up @@ -85,6 +83,7 @@ private synchronized void start() throws IOException, InterruptedException, Noti
this.queue = new StandaloneQueueNotification(config.getDatastore().getJdbcConn(),
config.getDatastore().getUser(),
config.getDatastore().getPassword(),
config.getApp().getAckTimeSec(),
config.getNotificationQueueConfig());
queue.start();

Expand Down
1 change: 1 addition & 0 deletions queue-server/src/main/resources/config/local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ app:
port: 10001
nbThreads: 30
recycleTcpConn: false
ackTimeSec: 5

This comment has been minimized.

Copy link
@pierre

pierre Jul 8, 2021

Member

You also need a placeholder in the production file, or the env variable won't be honored.

This comment has been minimized.

Copy link
@sbrossie

sbrossie Jul 8, 2021

Author Member

Resolved over slack.

This comment has been minimized.

Copy link
@pierre

pierre Jul 8, 2021

Member

Yeah, my head was still in Go (was referring to spf13/viper#188).

logging:
level: debug
format: text
Expand Down
1 change: 1 addition & 0 deletions queue-server/src/main/resources/config/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ app:
port: 9999
nbThreads: 30
recycleTcpConn: false
ackTimeSec: 5
logging:
level: debug
format: text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void testConfig() throws IOException, URISyntaxException {

assertEquals(config.getApp().getNbThreads(), 30);
assertEquals(config.getApp().getPort(), 9999);
assertEquals(config.getApp().getAckTimeSec(), 5);

assertEquals(config.getDatastore().getPort(), Integer.valueOf(5432));
assertEquals(config.getDatastore().getHost(), "localhost");
Expand Down Expand Up @@ -78,13 +79,16 @@ public void testConfigWithOverrides() throws IOException, URISyntaxException {
System.setProperty(ConfigModel.PROP_DATASTORE_USER, "user");
System.setProperty(ConfigModel.PROP_DATASTORE_PASSWORD, "pwd!!!");
System.setProperty(ConfigModel.PROP_DATASTORE_HOST, "host");
System.setProperty(ConfigModel.PROP_APP_ACK_TIME_SEC, "90");



final Config conf = new Config(TEST_RESOURCE);
final ConfigModel config = conf.getConfig();

assertEquals(config.getApp().getNbThreads(), 30);
assertEquals(config.getApp().getPort(), 8888);
assertEquals(config.getApp().getAckTimeSec(), 90);

assertEquals(config.getDatastore().getPort(), Integer.valueOf(2345));
assertEquals(config.getDatastore().getHost(), "host");
Expand Down

0 comments on commit 1999ba3

Please sign in to comment.