Skip to content

Defend against invalid BatchEventProcessor configuration overrides. #331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
private final BlockingQueue<Object> eventQueue;
private final EventHandler eventHandler;

private final int batchSize;
private final long flushInterval;
private final long timeoutMillis;
final int batchSize;
final long flushInterval;
final long timeoutMillis;
private final ExecutorService executor;
private final NotificationCenter notificationCenter;

Expand All @@ -70,21 +70,11 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler eventHandler, Integer batchSize, Long flushInterval, Long timeoutMillis, ExecutorService executor, NotificationCenter notificationCenter) {
this.eventHandler = eventHandler;
this.eventQueue = eventQueue;
this.batchSize = batchSize == null ? PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE) : batchSize;
this.flushInterval = flushInterval == null ? PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL) : flushInterval;
this.timeoutMillis = timeoutMillis == null ? PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL) : timeoutMillis;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.timeoutMillis = timeoutMillis;
this.notificationCenter = notificationCenter;

if (executor == null) {
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
this.executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = threadFactory.newThread(runnable);
thread.setDaemon(true);
return thread;
});
} else {
this.executor = executor;
}
this.executor = executor;
}

public synchronized void start() {
Expand Down Expand Up @@ -240,42 +230,64 @@ public static Builder builder() {
public static class Builder {
private BlockingQueue<Object> eventQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
private EventHandler eventHandler = null;
private Integer batchSize = null;
private Long flushInterval = null;
private Integer batchSize = PropertyUtils.getInteger(CONFIG_BATCH_SIZE, DEFAULT_BATCH_SIZE);
private Long flushInterval = PropertyUtils.getLong(CONFIG_BATCH_INTERVAL, DEFAULT_BATCH_INTERVAL);
private Long timeoutMillis = PropertyUtils.getLong(CONFIG_CLOSE_TIMEOUT, DEFAULT_TIMEOUT_INTERVAL);
private ExecutorService executor = null;
private NotificationCenter notificationCenter = null;
private Long timeoutMillis = null;

/**
* {@link EventHandler} implementation used to dispatch events to Optimizely.
*/
public Builder withEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
return this;
}

/**
* EventQueue is the underlying BlockingQueue used to buffer events before being added to the batch payload.
*/
public Builder withEventQueue(BlockingQueue<Object> eventQueue) {
this.eventQueue = eventQueue;
return this;
}

/**
* BatchSize is the maximum number of events contained within a single event batch.
*/
public Builder withBatchSize(Integer batchSize) {
this.batchSize = batchSize;
return this;
}

/**
* FlushInterval is the maximum duration, in milliseconds, that an event will remain in flight before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. May be say event will remain in queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to confuse the pending batch with the eventQueue. There are no guarantees about how long an event might exist in the eventQueue.

* being flushed to the event dispatcher.
*/
public Builder withFlushInterval(Long flushInterval) {
this.flushInterval = flushInterval;
return this;
}

/**
* ExecutorService used to execute the {@link EventConsumer} thread.
*/
public Builder withExecutor(ExecutorService executor) {
this.executor = executor;
return this;
}

/**
* Timeout is the maximum time to wait for the EventProcessor to close.
*/
public Builder withTimeout(long duration, TimeUnit timeUnit) {
this.timeoutMillis = timeUnit.toMillis(duration);
return this;
}

/**
* NotificationCenter used to notify when event batches are flushed.
*/
public Builder withNotificationCenter(NotificationCenter notificationCenter) {
this.notificationCenter = notificationCenter;
return this;
Expand All @@ -286,6 +298,30 @@ public BatchEventProcessor build() {
}

public BatchEventProcessor build(boolean shouldStart) {
if (batchSize < 0) {
logger.warn("Invalid batchSize of {}, Defaulting to {}", batchSize, DEFAULT_BATCH_SIZE);
batchSize = DEFAULT_BATCH_SIZE;
}

if (flushInterval < 0) {
logger.warn("Invalid flushInterval of {}, Defaulting to {}", flushInterval, DEFAULT_BATCH_INTERVAL);
flushInterval = DEFAULT_BATCH_INTERVAL;
}

if (timeoutMillis < 0) {
logger.warn("Invalid timeoutMillis of {}, Defaulting to {}", timeoutMillis, DEFAULT_TIMEOUT_INTERVAL);
timeoutMillis = DEFAULT_TIMEOUT_INTERVAL;
}

if (executor == null) {
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = threadFactory.newThread(runnable);
thread.setDaemon(true);
return thread;
});
}

BatchEventProcessor batchEventProcessor = new BatchEventProcessor(eventQueue, eventHandler, batchSize, flushInterval, timeoutMillis, executor, notificationCenter);

if (shouldStart) {
Expand Down
14 changes: 14 additions & 0 deletions core-api/src/test/java/com/optimizely/ab/EventHandlerRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class EventHandlerRule implements EventHandler, TestRule {

private List<CanonicalEvent> expectedEvents;
private LinkedList<CanonicalEvent> actualEvents;
private int actualCalls;
private Integer expectedCalls;

@Override
public Statement apply(final Statement base, Description description) {
Expand All @@ -71,12 +73,19 @@ public void evaluate() throws Throwable {
private void before() {
expectedEvents = new LinkedList<>();
actualEvents = new LinkedList<>();

expectedCalls = null;
actualCalls = 0;
}

private void after() {
}

private void verify() {
if (expectedCalls != null) {
assertEquals(expectedCalls.intValue(), actualCalls);
}

assertEquals(expectedEvents.size(), actualEvents.size());

ListIterator<CanonicalEvent> expectedIterator = expectedEvents.listIterator();
Expand All @@ -90,6 +99,10 @@ private void verify() {
}
}

public void expectCalls(int expected) {
expectedCalls = expected;
}

public void expectImpression(String experientId, String variationId, String userId) {
expectImpression(experientId, variationId, userId, Collections.emptyMap());
}
Expand Down Expand Up @@ -119,6 +132,7 @@ public void expect(String experientId, String variationId, String eventName, Str
@Override
public void dispatchEvent(LogEvent logEvent) {
logger.info("Receiving event: {}", logEvent);
actualCalls++;

List<Visitor> visitors = logEvent.getEventBatch().getVisitors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.Collections;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -93,11 +94,12 @@ public void testFlushOnMaxTimeout() throws Exception {
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
fail("Exceeded timeout waiting for events to flush.");
}

eventProcessor.close();
assertEquals(0, eventQueue.size());
eventHandlerRule.expectCalls(1);
}

@Test
Expand All @@ -116,17 +118,17 @@ public void testFlushMaxBatchSize() throws Exception {
eventHandlerRule.expectConversion(eventName, USER_ID);
}

countDownLatch.await();
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for events to flush.");
}

assertEquals(0, eventQueue.size());
eventHandlerRule.expectCalls(1);
}

@Test
public void testFlush() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
setEventProcessor(logEvent -> {
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));

UserEvent userEvent = buildConversionEvent(EVENT_NAME);
eventProcessor.process(userEvent);
Expand All @@ -137,18 +139,12 @@ public void testFlush() throws Exception {
eventProcessor.flush();
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

if (!countDownLatch.await(MAX_DURATION_MS / 2, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
}
eventHandlerRule.expectCalls(2);
}

@Test
public void testFlushOnMismatchRevision() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
setEventProcessor(logEvent -> {
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));

ProjectConfig projectConfig1 = mock(ProjectConfig.class);
when(projectConfig1.getRevision()).thenReturn("1");
Expand All @@ -165,18 +161,12 @@ public void testFlushOnMismatchRevision() throws Exception {
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

eventProcessor.close();
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
}
eventHandlerRule.expectCalls(2);
}

@Test
public void testFlushOnMismatchProjectId() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
setEventProcessor(logEvent -> {
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));

ProjectConfig projectConfig1 = mock(ProjectConfig.class);
when(projectConfig1.getRevision()).thenReturn("1");
Expand All @@ -193,18 +183,12 @@ public void testFlushOnMismatchProjectId() throws Exception {
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

eventProcessor.close();
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
}
eventHandlerRule.expectCalls(2);
}

@Test
public void testStopAndStart() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
setEventProcessor(logEvent -> {
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});
setEventProcessor(logEvent -> eventHandlerRule.dispatchEvent(logEvent));

UserEvent userEvent = buildConversionEvent(EVENT_NAME);
eventProcessor.process(userEvent);
Expand All @@ -218,31 +202,27 @@ public void testStopAndStart() throws Exception {
eventProcessor.start();

eventProcessor.close();
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
}
eventHandlerRule.expectCalls(2);
}

@Test
public void testNotificationCenter() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
notificationCenter.addNotificationHandler(LogEvent.class, x -> countDownLatch.countDown());
AtomicInteger counter = new AtomicInteger();
notificationCenter.addNotificationHandler(LogEvent.class, x -> counter.incrementAndGet());
setEventProcessor(logEvent -> {});

UserEvent userEvent = buildConversionEvent(EVENT_NAME);
eventProcessor.process(userEvent);
eventProcessor.close();

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for notification.");
}
assertEquals(1, counter.intValue());
}

@Test
public void testCloseTimeout() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
setEventProcessor(logEvent -> {
if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.SECONDS)) {
if (!countDownLatch.await(TIMEOUT_MS * 2, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for close.");
}
});
Expand All @@ -266,6 +246,48 @@ public void testCloseEventHandler() throws Exception {
verify((AutoCloseable) mockEventHandler).close();
}

@Test
public void testInvalidBatchSizeUsesDefault() {
eventProcessor = BatchEventProcessor.builder()
.withEventQueue(eventQueue)
.withBatchSize(-1)
.withFlushInterval(MAX_DURATION_MS)
.withEventHandler(new NoopEventHandler())
.withNotificationCenter(notificationCenter)
.withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
.build();

assertEquals(eventProcessor.batchSize, BatchEventProcessor.DEFAULT_BATCH_SIZE);
}

@Test
public void testInvalidFlushIntervalUsesDefault() {
eventProcessor = BatchEventProcessor.builder()
.withEventQueue(eventQueue)
.withBatchSize(MAX_BATCH_SIZE)
.withFlushInterval(-1L)
.withEventHandler(new NoopEventHandler())
.withNotificationCenter(notificationCenter)
.withTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS)
.build();

assertEquals(eventProcessor.flushInterval, BatchEventProcessor.DEFAULT_BATCH_INTERVAL);
}

@Test
public void testInvalidTimeoutUsesDefault() {
eventProcessor = BatchEventProcessor.builder()
.withEventQueue(eventQueue)
.withBatchSize(MAX_BATCH_SIZE)
.withFlushInterval(MAX_DURATION_MS)
.withEventHandler(new NoopEventHandler())
.withNotificationCenter(notificationCenter)
.withTimeout(-1L, TimeUnit.MILLISECONDS)
.build();

assertEquals(eventProcessor.timeoutMillis, BatchEventProcessor.DEFAULT_TIMEOUT_INTERVAL);
}

private void setEventProcessor(EventHandler eventHandler) {
eventProcessor = BatchEventProcessor.builder()
.withEventQueue(eventQueue)
Expand Down