Skip to content

Commit

Permalink
Update Kinesis destination to use outputRecordCollector to properly s…
Browse files Browse the repository at this point in the history
…tore state (#15348)

* Update Kinesis destination to use outputRecordCollector to properly store state

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Aug 5, 2022
1 parent a4e52cc commit 964b226
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
- name: Kinesis
destinationDefinitionId: 6d1d66d4-26ab-4602-8d32-f85894b04955
dockerRepository: airbyte/destination-kinesis
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/kinesis
icon: kinesis.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2296,7 +2296,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-kinesis:0.1.3"
- dockerImage: "airbyte/destination-kinesis:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/kinesis"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-kinesis

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/destination-kinesis
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {

testImplementation "org.assertj:assertj-core:${assertVersion}"
testImplementation "org.testcontainers:localstack:${testContainersVersion}"
testImplementation project(':airbyte-integrations:bases:standard-destination-test')


integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void main(String[] args) throws Exception {
* @return AirbyteConnectionStatus status of the connection result.
*/
@Override
public AirbyteConnectionStatus check(JsonNode config) {
public AirbyteConnectionStatus check(final JsonNode config) {
KinesisStream kinesisStream = null;
var streamName = "test_stream";
try {
Expand Down Expand Up @@ -69,10 +69,11 @@ public AirbyteConnectionStatus check(JsonNode config) {
* @return KinesisMessageConsumer for consuming Airbyte messages and streaming them to Kinesis.
*/
@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new KinesisMessageConsumer(new KinesisConfig(config), configuredCatalog, outputRecordCollector);
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final KinesisStream kinesisStream = new KinesisStream(new KinesisConfig(config));
return new KinesisMessageConsumer(configuredCatalog, kinesisStream, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ public class KinesisMessageConsumer extends FailureTrackingAirbyteMessageConsume

private final Map<AirbyteStreamNameNamespacePair, KinesisStreamConfig> kinesisStreams;

private AirbyteMessage lastMessage = null;

public KinesisMessageConsumer(KinesisConfig kinesisConfig,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
public KinesisMessageConsumer(final ConfiguredAirbyteCatalog configuredCatalog,
final KinesisStream kinesisStream,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
this.kinesisStream = new KinesisStream(kinesisConfig);
this.kinesisStream = kinesisStream;
var nameTransformer = new KinesisNameTransformer();
this.kinesisStreams = configuredCatalog.getStreams().stream()
.collect(Collectors.toUnmodifiableMap(
Expand All @@ -60,7 +58,7 @@ protected void startTracked() {
* @param message received from the Airbyte source.
*/
@Override
protected void acceptTracked(AirbyteMessage message) {
protected void acceptTracked(final AirbyteMessage message) {
if (message.getType() == AirbyteMessage.Type.RECORD) {
var messageRecord = message.getRecord();

Expand All @@ -84,7 +82,7 @@ protected void acceptTracked(AirbyteMessage message) {
// throw exception and end sync?
});
} else if (message.getType() == AirbyteMessage.Type.STATE) {
this.lastMessage = message;
outputRecordCollector.accept(message);
} else {
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
}
Expand All @@ -97,13 +95,12 @@ protected void acceptTracked(AirbyteMessage message) {
* @param hasFailed flag for indicating if the operation has failed.
*/
@Override
protected void close(boolean hasFailed) {
protected void close(final boolean hasFailed) {
try {
if (!hasFailed) {
kinesisStream.flush(e -> {
LOGGER.error("Error while streaming data to Kinesis", e);
});
this.outputRecordCollector.accept(lastMessage);
}
} finally {
kinesisStream.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.kinesis;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@DisplayName("KinesisRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class KinesisRecordConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

@Mock
private ConfiguredAirbyteCatalog catalog;
@Mock
private KinesisStream kinesisStream;

private KinesisMessageConsumer consumer;

@BeforeEach
public void init() {
consumer = new KinesisMessageConsumer(catalog, kinesisStream, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}

0 comments on commit 964b226

Please sign in to comment.