Skip to content

Commit

Permalink
better tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 25, 2021
1 parent 5792690 commit 3e53b2f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ protected void close(boolean hasFailed) throws Exception {
onClose.accept(lastFlushedState == null && hasFailed);
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
outputRecordCollector.accept(lastFlushedState);
if (lastFlushedState != null) {
outputRecordCollector.accept(lastFlushedState);
}
} catch (Exception e) {
LOGGER.error("on close failed.");
LOGGER.error("on close failed.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -84,7 +86,7 @@ public class BufferedStreamConsumerTest {
private RecordWriter recordWriter;
private CheckedConsumer<Boolean, Exception> onClose;
private CheckedFunction<String, Boolean, Exception> isValidRecord;
private Consumer<AirbyteMessage> checkpointConsumer;
private Consumer<AirbyteMessage> outputRecordCollector;

@SuppressWarnings("unchecked")
@BeforeEach
Expand All @@ -93,9 +95,9 @@ void setup() throws Exception {
recordWriter = mock(RecordWriter.class);
onClose = mock(CheckedConsumer.class);
isValidRecord = mock(CheckedFunction.class);
checkpointConsumer = mock(Consumer.class);
outputRecordCollector = mock(Consumer.class);
consumer = new BufferedStreamConsumer(
checkpointConsumer,
outputRecordCollector,
onStart,
recordWriter,
onClose,
Expand All @@ -119,7 +121,7 @@ void test1StreamWith1State() throws Exception {

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -136,7 +138,7 @@ void test1StreamWith2State() throws Exception {

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}

@Test
Expand All @@ -152,7 +154,6 @@ void test1StreamWith0State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
}

// todo (cgardens) - split testing buffer flushing into own test.
@Test
void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
Expand All @@ -169,7 +170,7 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch2);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -179,7 +180,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception

// consumer with big enough buffered that we see both batches are flushed in one go.
final BufferedStreamConsumer consumer = new BufferedStreamConsumer(
checkpointConsumer,
outputRecordCollector,
onStart,
recordWriter,
onClose,
Expand All @@ -201,7 +202,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
.collect(Collectors.toList());
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -222,7 +223,48 @@ void testExceptionAfterOneStateMessage() throws Exception {

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
void testExceptionAfterNoStateMessages() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumeRecords(consumer, expectedRecordsBatch2);
when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
consumer.close();

verify(onStart).call();
verify(onClose).accept(true);

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verifyNoInteractions(outputRecordCollector);
}

@Test
void testExceptionADuringOnClose() throws Exception {
doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);

final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
consumer.close();

verifyStartAndClose();

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verifyNoInteractions(outputRecordCollector);
}

@Test
Expand All @@ -245,7 +287,7 @@ void test2StreamWith1State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -269,7 +311,7 @@ void test2StreamWith2State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);

verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}

private void verifyStartAndClose() throws Exception {
Expand Down Expand Up @@ -310,4 +352,4 @@ private void verifyRecords(String streamName, String namespace, Collection<Airby
expectedRecords.stream().map(AirbyteMessage::getRecord).collect(Collectors.toList()));
}

}
}

0 comments on commit 3e53b2f

Please sign in to comment.