Skip to content

Commit

Permalink
Add ability to shutdown all active thread pools.
Browse files Browse the repository at this point in the history
  • Loading branch information
acabezas committed Jul 3, 2019
1 parent a8da7f1 commit cbe42b1
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.expediagroup.photon</groupId>
<artifactId>photon</artifactId>
<version>0.1.6-SNAPSHOT</version>
<version>0.1.7-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -44,7 +44,7 @@
<slf4j.version>1.7.24</slf4j.version>
<cassandra.version>3.11.2</cassandra.version>
<cassandra.driver.version>3.3.0</cassandra.driver.version>
<coverage.line>0.79</coverage.line>
<coverage.line>0.78</coverage.line>
<coverage.branch>0.40</coverage.branch>
<coverage.class>0</coverage.class>
<coverage.method>0</coverage.method>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
public interface Startable extends Stoppable {

void start() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ public interface Stoppable {

void stop() throws Exception;

void shutdown() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,20 @@ public void stop() throws Exception {
}
}

@Override
public void shutdown() throws Exception {
beamReaderScheduler.shutdown();
beamReaderLockManager.shutdown();
Optional.ofNullable(walkBackBeamConsumer).ifPresent(wc -> {
try {
wc.shutdown();
} catch (Exception e) {
log.error("Could not stop walkback consumer", e);
throw new RuntimeException(e);
}
});
}

@Override
public void putBeamForProcessing(String clientName, String beamName, PhotonMessageHandler photonMessageHandler,
PhotonBeamReaderOffsetType offsetType) {
Expand Down Expand Up @@ -212,5 +226,4 @@ BiFunction<String, String, Instant> getDefaultWatermarkFunction(PhotonBeamReader
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public void stop() throws Exception {
asyncProcessor.stop();
}

@Override
public void shutdown() throws Exception {
photonConsumer.shutdown();
asyncProcessor.shutdown();
}

@Slf4j
private static final class AsyncPhotonHandler<T> implements PhotonMessageHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public void stop() {
active = false;
}

@Override
public void shutdown() throws Exception {
scheduledExecutorService.shutdown();
executorService.shutdown();
}

private void consume(PhotonBeamReader photonBeamReader,
PhotonBeamWalkBackTracker photonBeamWalkBackTracker) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void start() throws Exception {
try {
executeTask();
} catch (Exception e) {
log.error("Could not schedule task {}", e);
log.error("Could not schedule task", e);
}
}, 0L, pollingInterval.toMillis(), TimeUnit.MILLISECONDS);
}
Expand All @@ -78,9 +78,17 @@ public void stop() throws Exception {
}
}

abstract void executeTask();

private boolean isActive(ScheduledFuture<?> scheduledFuture) {
return !(scheduledFuture.isCancelled() || scheduledFuture.isDone());
}

@Override
public void shutdown() throws Exception {
scheduledExecutorService.shutdown();
doShutDown();
}

abstract void executeTask();

abstract void doShutDown() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,9 @@ void executeTask() {
}
});
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ void executeTask() {
}
}));
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ void executeTask() {
}
}));
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,10 @@ public void stop() throws Exception {
lockUpdateScheduler.stop();
lockGetScheduler.stop();
}

@Override
public void shutdown() throws Exception {
lockUpdateScheduler.shutdown();
lockGetScheduler.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public void stop() throws Exception {
}
}

@Override
public void shutdown() throws Exception {
scheduledExecutorService.shutdown();
executorService.shutdown();
}

@Override
public Optional<ProcessorManifest<K, V, T>> getProcessorManifest() {
return Optional.ofNullable(processorManifest);
Expand Down

0 comments on commit cbe42b1

Please sign in to comment.