Skip to content

Commit

Permalink
Fix locking issue with async processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
acabezas committed Jun 12, 2019
1 parent c28a2b9 commit 3086da5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 67 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@
</goals>
</execution>
</executions>
<configuration>
<source>8</source>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
package com.homeaway.datatools.photon.utils.processing;

import com.google.common.collect.Maps;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -34,17 +36,16 @@ public class DefaultAsyncMessageProcessor<K extends ProcessorKey, V extends Proc

private final Lock lock = new ReentrantLock();
private final Condition fullCondition = lock.newCondition();
private final Condition emptyCondition = lock.newCondition();
private final ExecutorService executorService;
private final Executor mainExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap;
private final ConcurrentMap<String, K> eventQueueKeyMap;
private final MessageEventHandler<K, V> eventHandler;
private final ProcessorManifest<K, V, T> processorManifest;
private final LongAdder count;
private volatile boolean active;
private volatile Duration processingLoopInterval;
private volatile int maxEvents;
private ScheduledFuture<?> scheduledFuture;

public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler) {
this(eventHandler, null);
Expand All @@ -63,28 +64,27 @@ public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler
public DefaultAsyncMessageProcessor(final MessageEventHandler<K, V> eventHandler,
final ProcessorManifest<K, V, T> processorManifest,
int maxEvents) {
this(Executors.newFixedThreadPool(150), Executors.newSingleThreadExecutor(), Maps.newConcurrentMap(), Maps.newConcurrentMap(),
this(Executors.newFixedThreadPool(150), Executors.newScheduledThreadPool(5), Maps.newConcurrentMap(), Maps.newConcurrentMap(),
eventHandler, processorManifest, Duration.ofMillis(1), maxEvents);
}

public DefaultAsyncMessageProcessor(final ExecutorService executorService,
final Executor mainExecutor,
final ScheduledExecutorService scheduledExecutorService,
final ConcurrentMap<String, EventQueueMap<V>> eventQueueMapMap,
final ConcurrentMap<String, K> eventQueueKeyMap,
final MessageEventHandler<K, V> eventHandler,
final ProcessorManifest<K, V, T> processorManifest,
Duration processingLoopInterval,
int maxEvents) {
this.executorService = executorService;
this.mainExecutor = mainExecutor;
this.scheduledExecutorService = scheduledExecutorService;
this.eventQueueMapMap = eventQueueMapMap;
this.eventQueueKeyMap = eventQueueKeyMap;
this.eventHandler = eventHandler;
this.processorManifest = processorManifest;
this.processingLoopInterval = processingLoopInterval;
this.maxEvents = maxEvents;
this.count = new LongAdder();
this.active = false;
}

@Override
Expand All @@ -109,24 +109,23 @@ public void addEventToQueue(K key, V event) {
try {
while (maxEvents > 0 && getEventCount() >= maxEvents) {
try {
fullCondition.await();
fullCondition.await(5L, MILLISECONDS);
} catch (InterruptedException e) {
log.error("Thread interrupted while adding event {} for key {}", event, key);
}
}
getProcessorManifest().ifPresent(m -> m.putEvent(key, event));
getEventQueueMap(key).putEvent(event);
emptyCondition.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
getProcessorManifest().ifPresent(m -> m.putEvent(key, event));
getEventQueueMap(key).putEvent(event);
}

@Override
public boolean isActive() {
return active;
return Optional.ofNullable(scheduledFuture).map(this::isActive).orElse(false);
}

@Override
Expand All @@ -152,66 +151,46 @@ public Duration getProcessingLoopInterval() {

@Override
public void start() throws Exception {
if (!active) {
active = true;
mainExecutor.execute(() -> {

while (active) {

lock.lock();
try {
while (active && getEventCount() == 0) {
emptyCondition.await();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}

if (active) {
eventQueueMapMap.entrySet()
.forEach(e -> e.getValue()
.iterateKeys(k -> executorService.execute(() -> {
if (e.getValue().tryQueueLock(k)) {
try {
while (!e.getValue().queueIsEmpty(k)) {
V event = e.getValue().peekQueue(k);
try {
eventHandler.handleEvent(eventQueueKeyMap.get(e.getKey()), event);
lock.lock();
try {
if (!e.getValue().popQueue(k)) {
log.warn("EventQueue removed while event was being processed." +
"Key={}, EventQueueMap={}, Event={}", k, e.getValue(), event);
}
fullCondition.signal();
} finally {
lock.unlock();
}
getProcessorManifest().ifPresent(m -> m.removeEvent(eventQueueKeyMap.get(e.getKey()), event));
} catch (Exception ex) {
eventHandler.handleException(eventQueueKeyMap.get(e.getKey()), event, new MessageProcessorException(ex));
}
}
e.getValue().removeEmptyQueue(k);
} finally {
e.getValue().releaseQueueLock(k);
if (!isActive()) {
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> eventQueueMapMap.entrySet()
.forEach(e -> e.getValue()
.iterateKeys(k -> executorService.execute(() -> {
if (e.getValue().tryQueueLock(k)) {
try {
while (!e.getValue().queueIsEmpty(k)) {
V event = e.getValue().peekQueue(k);
try {
eventHandler.handleEvent(eventQueueKeyMap.get(e.getKey()), event);
if (!e.getValue().popQueue(k)) {
log.warn("EventQueue removed while event was being processed." +
"Key={}, EventQueueMap={}, Event={}", k, e.getValue(), event);
}
getProcessorManifest().ifPresent(m -> m.removeEvent(eventQueueKeyMap.get(e.getKey()), event));
} catch (Exception ex) {
eventHandler.handleException(eventQueueKeyMap.get(e.getKey()), event, new MessageProcessorException(ex));
}
})
)
);
}
}
});
}
e.getValue().removeEmptyQueue(k);
} finally {
e.getValue().releaseQueueLock(k);
lock.lock();
try {
fullCondition.signalAll();
} finally {
lock.unlock();
}
}
}
})
)
), 0, processingLoopInterval.toMillis(), MILLISECONDS);
}
}

@Override
public void stop() throws Exception {
if (active) {
active = false;
if (isActive()) {
scheduledFuture.cancel(true);
}
}

Expand All @@ -226,5 +205,7 @@ private EventQueueMap<V> getEventQueueMap(K key) {
q -> new DefaultEventQueueMap<>(count));
}


private boolean isActive(ScheduledFuture<?> scheduledFuture) {
return !(scheduledFuture.isCancelled() || scheduledFuture.isDone());
}
}

0 comments on commit 3086da5

Please sign in to comment.