Skip to content

Commit

Permalink
Add ability to restart schedulers after shut down.
Browse files Browse the repository at this point in the history
  • Loading branch information
acabezas committed Jul 3, 2019
1 parent 23a4b20 commit 70caff6
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public abstract class AbstractPhotonScheduler implements PhotonScheduler {

private final ScheduledExecutorService scheduledExecutorService;
private static ExecutorService executorService;
private static ScheduledExecutorService scheduledExecutorService;
private volatile Duration pollingInterval;
private ScheduledFuture<?> scheduledFuture;

public AbstractPhotonScheduler(final ScheduledExecutorService scheduledExecutorService,
final Duration pollingInterval) {
this.scheduledExecutorService = scheduledExecutorService;
public AbstractPhotonScheduler(final Duration pollingInterval) {
this.pollingInterval = pollingInterval;
}

Expand Down Expand Up @@ -61,7 +62,7 @@ public void setPollingInterval(Duration pollingInterval) {
@Override
public void start() throws Exception {
if (!isActive()) {
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
scheduledFuture = getScheduledExecutorService().scheduleAtFixedRate(() -> {
try {
executeTask();
} catch (Exception e) {
Expand All @@ -84,11 +85,31 @@ private boolean isActive(ScheduledFuture<?> scheduledFuture) {

@Override
public void shutdown() throws Exception {
scheduledExecutorService.shutdown();
doShutDown();
Optional.ofNullable(scheduledExecutorService)
.ifPresent(ScheduledExecutorService::shutdown);
Optional.ofNullable(executorService)
.ifPresent(ExecutorService::shutdown);
}

private ScheduledExecutorService getScheduledExecutorService() {
return Optional.ofNullable(scheduledExecutorService)
.filter(s -> !s.isShutdown())
.orElseGet(() -> {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
return scheduledExecutorService;
});
}

protected ExecutorService getExecutorService(int poolSize) {
return Optional.ofNullable(executorService)
.filter(e -> !e.isShutdown())
.orElseGet(() -> {
executorService = Executors.newFixedThreadPool(poolSize);
return executorService;
});
}


abstract void executeTask();

abstract void doShutDown() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@Slf4j
public class DefaultBeamReaderScheduler extends AbstractPhotonScheduler {
Expand All @@ -43,7 +40,6 @@ public class DefaultBeamReaderScheduler extends AbstractPhotonScheduler {
private final BeamReaderCache beamReaderCache;
private final BeamReaderExecutionFunction execution;
private final BeamConsumer beamConsumer;
private final ExecutorService executorService;
private final ConcurrentMap<String, Instant> beamReaderLastScheduled;


Expand All @@ -52,26 +48,22 @@ public DefaultBeamReaderScheduler(final BeamReaderConfigManager beamReaderConfig
final BeamReaderCache beamReaderCache,
final BeamConsumer beamConsumer,
final BeamReaderExecutionFunction execution) {
this(beamReaderConfigManager, beamCache, beamReaderCache, beamConsumer, execution, Maps.newConcurrentMap(),
Executors.newScheduledThreadPool(5), Executors.newFixedThreadPool(100));
this(beamReaderConfigManager, beamCache, beamReaderCache, beamConsumer, execution, Maps.newConcurrentMap());
}

public DefaultBeamReaderScheduler(final BeamReaderConfigManager beamReaderConfigManager,
final BeamCache beamCache,
final BeamReaderCache beamReaderCache,
final BeamConsumer beamConsumer,
final BeamReaderExecutionFunction execution,
final ConcurrentMap<String, Instant> beamReaderLastScheduled,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorService executorService) {
super(scheduledExecutorService, DEFAULT_POLLING_INTERVAL);
final ConcurrentMap<String, Instant> beamReaderLastScheduled) {
super(DEFAULT_POLLING_INTERVAL);
this.beamReaderConfigManager = beamReaderConfigManager;
this.beamCache = beamCache;
this.beamReaderCache = beamReaderCache;
this.beamConsumer = beamConsumer;
this.execution = execution;
this.beamReaderLastScheduled = beamReaderLastScheduled;
this.executorService = executorService;
}

@Override
Expand All @@ -85,7 +77,7 @@ void executeTask() {
if (wrapper.isPresent()) {
if (wrapper.get().getPhotonBeamReader().getPhotonBeamReaderLock().isPresent()) {
if (!wrapper.get().getLock().isLocked()) {
executorService.execute(() ->
getExecutorService(100).execute(() ->
execution.execute(wrapper.get(), photonMessageHandler, beamCache, beamReaderConfigManager, beamConsumer, photonBeamReaderConfig.getIsAsync()));
}
}
Expand All @@ -97,9 +89,4 @@ void executeTask() {
}
});
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,17 @@
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

@Slf4j
public class LockGetScheduler extends AbstractPhotonScheduler {

private final ExecutorService executorService;
private final BeamReaderCache beamReaderCache;
private final BeamReaderLockDao beamReaderLockDao;

public LockGetScheduler(final ExecutorService executorService,
final BeamReaderCache beamReaderCache,
public LockGetScheduler(final BeamReaderCache beamReaderCache,
final BeamReaderLockDao beamReaderLockDao,
final ScheduledExecutorService scheduledExecutorService,
Duration lockThreshold) {
super(scheduledExecutorService, lockThreshold);
this.executorService = executorService;
super(lockThreshold);
this.beamReaderCache = beamReaderCache;
this.beamReaderLockDao = beamReaderLockDao;
}
Expand All @@ -52,7 +46,7 @@ void executeTask() {
.stream()
.flatMap(v -> v.values().stream())
.filter(br -> !br.getPhotonBeamReader().getPhotonBeamReaderLock().isPresent())
.forEach(br -> executorService.execute(() -> {
.forEach(br -> getExecutorService(50).execute(() -> {
try {
Optional<PhotonBeamReaderLock> lock = beamReaderLockDao.getAvailablePhotonBeamLock(br.getPhotonBeamReader());
if (lock.isPresent()) {
Expand All @@ -71,9 +65,4 @@ void executeTask() {
}
}));
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,18 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

@Slf4j
public class LockUpdateScheduler extends AbstractPhotonScheduler {

private final ExecutorService executorService;
private final BeamReaderCache beamReaderCache;
private final BeamReaderLockDao beamReaderLockDao;
private Duration lockThreshold;

public LockUpdateScheduler(final ExecutorService executorService,
final BeamReaderCache beamReaderCache,
public LockUpdateScheduler(final BeamReaderCache beamReaderCache,
final BeamReaderLockDao beamReaderLockDao,
final ScheduledExecutorService scheduledExecutorService,
Duration lockThreshold) {
super(scheduledExecutorService, lockThreshold.dividedBy(2));
this.executorService = executorService;
super(lockThreshold.dividedBy(2));
this.beamReaderCache = beamReaderCache;
this.beamReaderLockDao = beamReaderLockDao;
this.lockThreshold = lockThreshold;
Expand All @@ -51,7 +45,7 @@ void executeTask() {
.stream()
.flatMap(v -> v.values().stream())
.filter(br -> br.getPhotonBeamReader().getPhotonBeamReaderLock().isPresent())
.forEach(br -> executorService.execute(() -> {
.forEach(br -> getExecutorService(50).execute(() -> {
try {
if (br.getConsumerGroupLock().tryLock()) {
if (br.getPhotonBeamReader().getPhotonBeamReaderLock().isPresent()) {
Expand All @@ -73,9 +67,4 @@ void executeTask() {
}
}));
}

@Override
void doShutDown() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,18 @@
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@Slf4j
public class DefaultBeamReaderLockManager implements BeamReaderLockManager {

private final PhotonScheduler lockUpdateScheduler;
private final PhotonScheduler lockGetScheduler;


public DefaultBeamReaderLockManager(final BeamReaderCache beamReaderCache,
final BeamReaderLockDao beamReaderLockDao,
final Duration lockThreshold) {
this(Executors.newScheduledThreadPool(5), Executors.newFixedThreadPool(50), beamReaderCache,
beamReaderLockDao, lockThreshold);
}

public DefaultBeamReaderLockManager(final ScheduledExecutorService scheduledExecutorService,
final ExecutorService executorService,
final BeamReaderCache beamReaderCache,
final BeamReaderLockDao beamReaderLockDao,
final Duration lockThreshold) {
this(new LockUpdateScheduler(executorService, beamReaderCache, beamReaderLockDao,
scheduledExecutorService, lockThreshold),
new LockGetScheduler(executorService, beamReaderCache, beamReaderLockDao,
scheduledExecutorService, lockThreshold));
this(new LockUpdateScheduler(beamReaderCache, beamReaderLockDao, lockThreshold),
new LockGetScheduler(beamReaderCache, beamReaderLockDao, lockThreshold));
}

public DefaultBeamReaderLockManager(final PhotonScheduler lockUpdateScheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class DefaultAsyncMessageProcessor<K extends ProcessorKey, V extends Proc

private final Lock lock = new ReentrantLock();
private final Condition fullCondition = lock.newCondition();
private final ExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
private static ExecutorService executorService;
private static ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap;
private final ConcurrentMap<String, K> eventQueueKeyMap;
private final MessageEventHandler<K, V> eventHandler;
Expand All @@ -64,20 +64,16 @@ public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler
public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler,
final ProcessorManifest<K, V, T> processorManifest,
int maxEvents) {
this(Executors.newFixedThreadPool(150), Executors.newScheduledThreadPool(5), Maps.newConcurrentMap(), Maps.newConcurrentMap(),
this(Maps.newConcurrentMap(), Maps.newConcurrentMap(),
eventHandler, processorManifest, Duration.ofMillis(1), maxEvents);
}

public DefaultAsyncMessageProcessor(final ExecutorService executorService,
final ScheduledExecutorService scheduledExecutorService,
final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap,
public DefaultAsyncMessageProcessor(final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap,
final ConcurrentMap<String, K> eventQueueKeyMap,
final MessageEventHandler<K, V> eventHandler,
final ProcessorManifest<K, V, T> processorManifest,
Duration processingLoopInterval,
int maxEvents) {
this.executorService = executorService;
this.scheduledExecutorService = scheduledExecutorService;
this.eventQueueMapMap = eventQueueMapMap;
this.eventQueueKeyMap = eventQueueKeyMap;
this.eventHandler = eventHandler;
Expand Down Expand Up @@ -152,9 +148,9 @@ public Duration getProcessingLoopInterval() {
@Override
public void start() throws Exception {
if (!isActive()) {
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> eventQueueMapMap.entrySet()
scheduledFuture = getScheduledExecutorService().scheduleAtFixedRate(() -> eventQueueMapMap.entrySet()
.forEach(e -> e.getValue()
.iterateKeys(k -> executorService.execute(() -> {
.iterateKeys(k -> getExecutorService().execute(() -> {
if (e.getValue().tryQueueLock(k)) {
try {
while (!e.getValue().queueIsEmpty(k)) {
Expand Down Expand Up @@ -196,8 +192,10 @@ public void stop() throws Exception {

@Override
public void shutdown() throws Exception {
scheduledExecutorService.shutdown();
executorService.shutdown();
Optional.ofNullable(scheduledExecutorService)
.ifPresent(ScheduledExecutorService::shutdown);
Optional.ofNullable(executorService)
.ifPresent(ExecutorService::shutdown);
}

@Override
Expand All @@ -214,4 +212,22 @@ private EventQueueMap<V> getEventQueueMap(K key) {
private boolean isActive(ScheduledFuture<?> scheduledFuture) {
return !(scheduledFuture.isCancelled() || scheduledFuture.isDone());
}

private ScheduledExecutorService getScheduledExecutorService() {
return Optional.ofNullable(scheduledExecutorService)
.filter(s -> !s.isShutdown())
.orElseGet(() -> {
scheduledExecutorService = Executors.newScheduledThreadPool(5);
return scheduledExecutorService;
});
}

private ExecutorService getExecutorService() {
return Optional.ofNullable(executorService)
.filter(e -> !e.isShutdown())
.orElseGet(() -> {
executorService = Executors.newFixedThreadPool(150);
return executorService;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;

public class DefaultAsyncPhotonConsumerTest {

Expand Down Expand Up @@ -111,8 +110,7 @@ public void consumerTest() {
processedRecordCache, watermarkUpdater, new BaseOffsetManager(beamReaderDao), partitionHelper);

BeamReaderConfigManager beamReaderConfigManager = new DefaultBeamReaderConfigManager(beamCache, beamReaderCache);
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(Executors.newScheduledThreadPool(10),
Executors.newFixedThreadPool(10), beamReaderCache, beamReaderLockDao, Duration.ofMillis(10));
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(beamReaderCache, beamReaderLockDao, Duration.ofMillis(10));

PhotonConsumer photonConsumer = new DefaultPhotonConsumer(beamReaderConfigManager,
new DefaultBeamReaderScheduler(beamReaderConfigManager, beamCache, beamReaderCache, beamConsumer, CONSUMER_EXECUTION_FUNCTION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;

@Slf4j
public class DefaultPhotonConsumerTest {
Expand Down Expand Up @@ -123,8 +122,7 @@ public void consumerTest() {
processedRecordCache, watermarkUpdater, new BaseOffsetManager(beamReaderDao), partitionHelper);

BeamReaderConfigManager beamReaderConfigManager = new DefaultBeamReaderConfigManager(beamCache, beamReaderCache);
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(Executors.newScheduledThreadPool(10),
Executors.newFixedThreadPool(10), beamReaderCache, beamReaderLockDao, BEAM_READ_LOCK_THRESHOLD);
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(beamReaderCache, beamReaderLockDao, BEAM_READ_LOCK_THRESHOLD);

PhotonConsumer photonConsumer = new DefaultPhotonConsumer(beamReaderConfigManager,
new DefaultBeamReaderScheduler(beamReaderConfigManager, beamCache, beamReaderCache, beamConsumer, CONSUMER_EXECUTION_FUNCTION),
Expand Down Expand Up @@ -182,8 +180,7 @@ public void consumerSparseDataTest() {
processedRecordCache, watermarkUpdater, new BaseOffsetManager(beamReaderDao), partitionHelper);

BeamReaderConfigManager beamReaderConfigManager = new DefaultBeamReaderConfigManager(beamCache, beamReaderCache);
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(Executors.newScheduledThreadPool(10),
Executors.newFixedThreadPool(10), beamReaderCache, beamReaderLockDao, BEAM_READ_LOCK_THRESHOLD);
BeamReaderLockManager beamReaderLockManager = new DefaultBeamReaderLockManager(beamReaderCache, beamReaderLockDao, BEAM_READ_LOCK_THRESHOLD);

PhotonConsumer photonConsumer = new DefaultPhotonConsumer(beamReaderConfigManager,
new DefaultBeamReaderScheduler(beamReaderConfigManager, beamCache, beamReaderCache, beamConsumer, CONSUMER_EXECUTION_FUNCTION),
Expand Down
Loading

0 comments on commit 70caff6

Please sign in to comment.