Skip to content

Commit

Permalink
Fix small bug with Consumers singleton.
Browse files Browse the repository at this point in the history
  • Loading branch information
acabezas committed Jun 28, 2019
1 parent 22e0d64 commit 481937e
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;

public class Consumers {

private final PhotonDriver driver;
private final ConcurrentMap<ConsumerType, ConsumerFactory> factories = Maps.newConcurrentMap();
private ConsumerFactory consumerFactory;
private BeamReaderLockManager beamReaderLockManager;
private BeamReaderConfigManager beamReaderConfigManager;
private PhotonScheduler beamReaderScheduler;
Expand Down Expand Up @@ -88,6 +87,14 @@ private static Consumers getInstance(final Properties properties) {
}

private ConsumerFactory getFactory(Properties properties) {
return Optional.ofNullable(consumerFactory)
.orElseGet(() -> {
consumerFactory = buildFactory(properties);
return consumerFactory;
});
}

private ConsumerFactory buildFactory(Properties properties) {
try {
PhotonDeserializer photonDeserializer = (PhotonDeserializer) Class.forName(properties.getProperty(PHOTON_DESERIALIZER_CLASS))
.getConstructor()
Expand Down Expand Up @@ -149,7 +156,12 @@ public static ConsumerFactory newConsumerFactory(final Properties properties) {
}

public static void shutDown() {
instance.driver.shutDown();
instance.shutDownFactory();
}

private void shutDownFactory() {
driver.shutDown();
consumerFactory = null;
}

enum ConsumerType {
Expand Down

0 comments on commit 481937e

Please sign in to comment.