Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpointing: Partial Success in BufferedStreamConsumer (Destination) #3555

Merged
merged 8 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.7
dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.7
dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
Expand All @@ -37,7 +37,7 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.10
dockerImageTag: 0.3.11
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
Expand All @@ -47,26 +47,26 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
dockerRepository: airbyte/destination-mssql
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
} else {
// todo (cgardens) - decide if we want to throw here instead.
LOGGER.error(inputString);
LOGGER.error("Received invalid message: " + inputString);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,28 @@
* <p>
* All other message types are ignored.
* </p>
*
* <p>
* Throughout the lifecycle of the consumer, messages get promoted from buffered to flushed to
* committed. A record message when it is received is immediately buffered. When the buffer fills
* up, all buffered records are flushed out of memory using the user-provided recordWriter. When
* this flush happens, a state message is moved from pending to flushed. On close, if the
* user-provided onClose function is successful, then the flushed state record is considered
* committed and is then emitted. We expect this class to only ever emit either 1 state message (in
* the case of a full or partial success) or 0 state messages (in the case where the onClose step
* was never reached or did not complete without exception).
* </p>
*
* <p>
* When a record is "flushed" it is moved from the docker container to the destination. By
* convention, it is usually placed in some sort of temporary storage on the destination (e.g. a
* temporary database or file store). The logic in close handles committing the temporary
* representation data to the final store (e.g. final table). In the case of Copy destinations they
* often have additional temporary stores. The common pattern for copy destination is that flush
* pushes the data into cloud storage and then close copies from cloud storage to a temporary table
* AND then copies from the temporary table into the final table. This abstraction is blind to that
* detail as it implementation detail of how copy destinations implement close.
* </p>
*/
public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {

Expand All @@ -85,7 +107,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private boolean hasStarted;
private boolean hasClosed;

private AirbyteMessage lastCommittedState;
private AirbyteMessage lastFlushedState;
private AirbyteMessage pendingState;

public BufferedStreamConsumer(Consumer<AirbyteMessage> outputRecordCollector,
Expand Down Expand Up @@ -165,7 +187,7 @@ private void flushQueueToDestination() throws Exception {
}

if (pendingState != null) {
lastCommittedState = pendingState;
lastFlushedState = pendingState;
pendingState = null;
}
}
Expand All @@ -192,15 +214,23 @@ protected void close(boolean hasFailed) throws Exception {
}

try {
onClose.accept(hasFailed);
// todo (cgardens) - For now we are using this conditional to maintain existing behavior. When we
// enable checkpointing, we will need to get feedback from onClose on whether any data was persisted
// or not. If it was then, the state message will be emitted.
if (!hasFailed && lastCommittedState != null) {
outputRecordCollector.accept(lastCommittedState);
// if no state was was emitted (i.e. full refresh), if there were still no failures, then we can
// still succeed.
if (lastFlushedState == null) {
onClose.accept(hasFailed);
} else {
// if any state message flushed that means we can still go for at least a partial success.
onClose.accept(false);
}

// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
if (lastFlushedState != null) {
outputRecordCollector.accept(lastFlushedState);
}
} catch (Exception e) {
LOGGER.error("on close failed.", e);
LOGGER.error("Close failed.", e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

package io.airbyte.integrations.destination.buffered_stream_consumer;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand Down Expand Up @@ -82,7 +86,7 @@ public class BufferedStreamConsumerTest {
private RecordWriter recordWriter;
private CheckedConsumer<Boolean, Exception> onClose;
private CheckedFunction<String, Boolean, Exception> isValidRecord;
private Consumer<AirbyteMessage> checkpointConsumer;
private Consumer<AirbyteMessage> outputRecordCollector;

@SuppressWarnings("unchecked")
@BeforeEach
Expand All @@ -91,9 +95,9 @@ void setup() throws Exception {
recordWriter = mock(RecordWriter.class);
onClose = mock(CheckedConsumer.class);
isValidRecord = mock(CheckedFunction.class);
checkpointConsumer = mock(Consumer.class);
outputRecordCollector = mock(Consumer.class);
consumer = new BufferedStreamConsumer(
checkpointConsumer,
outputRecordCollector,
onStart,
recordWriter,
onClose,
Expand All @@ -117,7 +121,7 @@ void test1StreamWith1State() throws Exception {

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -134,7 +138,7 @@ void test1StreamWith2State() throws Exception {

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}

@Test
Expand All @@ -150,7 +154,6 @@ void test1StreamWith0State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
}

// todo (cgardens) - split testing buffer flushing into own test.
@Test
void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
Expand All @@ -167,7 +170,7 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch2);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -177,7 +180,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception

// consumer with big enough buffered that we see both batches are flushed in one go.
final BufferedStreamConsumer consumer = new BufferedStreamConsumer(
checkpointConsumer,
outputRecordCollector,
onStart,
recordWriter,
onClose,
Expand All @@ -193,6 +196,75 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception

verifyStartAndClose();

final List<AirbyteMessage> expectedRecords = Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2)
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
void testExceptionAfterOneStateMessage() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
consumer.close();

verifyStartAndClose();

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
void testExceptionAfterNoStateMessages() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumeRecords(consumer, expectedRecordsBatch2);
when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
consumer.close();

verify(onStart).call();
verify(onClose).accept(true);

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verifyNoInteractions(outputRecordCollector);
}

@Test
void testExceptionDuringOnClose() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test (and potentially others) should verify that this line throws an exception (also that line should throw an exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed.

doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);

final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
assertThrows(IllegalStateException.class, () -> consumer.close());

verifyStartAndClose();

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verifyNoInteractions(outputRecordCollector);
}

@Test
Expand All @@ -215,7 +287,7 @@ void test2StreamWith1State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
verify(outputRecordCollector).accept(STATE_MESSAGE1);
}

@Test
Expand All @@ -239,7 +311,7 @@ void test2StreamWith2State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);

verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}

private void verifyStartAndClose() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.7
LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-local-json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-oracle
Loading