Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination: Kafka #3746

Merged
merged 30 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0a17bfc
Kafka destination
mmolimar May 30, 2021
792ff02
Adding destination definition
mmolimar May 30, 2021
bde98db
Fix tests
mmolimar May 30, 2021
9d7b178
Adding name resolver
mmolimar Jun 2, 2021
2e8c190
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 2, 2021
b23d31d
Spec configs for DBT and normalization
mmolimar Jun 2, 2021
99ba342
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 2, 2021
f48e170
Updating docs
mmolimar Jun 2, 2021
e5cbb61
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 3, 2021
3361a3f
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 12, 2021
06fd84b
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 13, 2021
756033f
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jun 16, 2021
b3565b9
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jul 13, 2021
ed8a743
Adding suggestions
mmolimar Jul 17, 2021
9487b0c
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jul 17, 2021
f13c93c
Fix tests
mmolimar Jul 17, 2021
187517d
Updating spec
mmolimar Jul 17, 2021
c132b63
Flush messages
mmolimar Jul 17, 2021
dc53912
Improvements in spec
mmolimar Jul 17, 2021
f99546f
Add state to the output collector
mmolimar Jul 17, 2021
33ffe98
Adding Kafka icon
mmolimar Jul 17, 2021
fd0bd35
Update airbyte-config/init/src/main/resources/seed/destination_defini…
mmolimar Jul 20, 2021
068fc37
Update airbyte-integrations/connectors/destination-kafka/src/main/jav…
mmolimar Jul 20, 2021
85b306b
Update docs/integrations/destinations/kafka.md
mmolimar Jul 20, 2021
c0e5acb
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
mmolimar Jul 20, 2021
ee262d5
Update docs/integrations/destinations/kafka.md
mmolimar Jul 20, 2021
afc4ea8
Merge branch 'feature/kafka-destination' of github.com:mmolimar/airby…
mmolimar Jul 20, 2021
430f37a
Updating docs
mmolimar Jul 21, 2021
56ef956
format
sherifnada Jul 22, 2021
e15b971
Merge branch 'master' of github.com:airbytehq/airbyte into feature/ka…
sherifnada Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "9f760101-60ae-462f-9ee6-b7a9dafd454d",
"name": "Kafka",
"dockerRepository": "airbyte/destination-kafka",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/kafka"
}
1 change: 1 addition & 0 deletions airbyte-config/init/src/main/resources/icons/kafka.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
dockerRepository: airbyte/destination-kafka
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka
icon: kafka.svg
mmolimar marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static void validateConfig(JsonNode schemaJson, JsonNode objectJson, Str
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
throw new Exception(String.format("Verification error(s) occurred for %s. Errors: %s ",
operationType, validationResult.toString()));
operationType, validationResult));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,28 @@ protected boolean implementsAppendDedup() throws WorkerException {
}
}

/**
* Detects if a destination implements overwrite mode from the spec.json that should include
* 'supportedDestinationSyncMode'
*
* @return - a boolean.
*/
protected boolean implementsOverwrite() throws WorkerException {
mmolimar marked this conversation as resolved.
Show resolved Hide resolved
final ConnectorSpecification spec = runSpec();
assertNotNull(spec);
if (spec.getSupportedDestinationSyncModes() != null) {
return spec.getSupportedDestinationSyncModes().contains(DestinationSyncMode.OVERWRITE);
} else {
return false;
}
}

/**
* Override to return true to if the destination implements basic normalization and it should be
* tested here.
*
* @return - a boolean.
*/
protected boolean supportsNormalization() {
return false;
}
Expand Down Expand Up @@ -354,6 +376,11 @@ public void testSync(String messagesFilename, String catalogFilename) throws Exc
*/
@Test
public void testSecondSync() throws Exception {
if (!implementsOverwrite()) {
LOGGER.info("Destination's spec.json does not support overwrite sync mode.");
return;
}

final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/destination-kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-kafka

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-kafka
24 changes: 24 additions & 0 deletions airbyte-integrations/connectors/destination-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.kafka.KafkaDestination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')

implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'org.apache.kafka:connect-json:2.8.0'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-kafka')
integrationTestJavaImplementation "org.testcontainers:kafka:1.15.3"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDestination.class);

public static final String COLUMN_NAME_AB_ID = JavaBaseConstants.COLUMN_NAME_AB_ID;
public static final String COLUMN_NAME_EMITTED_AT = JavaBaseConstants.COLUMN_NAME_EMITTED_AT;
public static final String COLUMN_NAME_DATA = JavaBaseConstants.COLUMN_NAME_DATA;
public static final String COLUMN_NAME_STREAM = "_airbyte_stream";

private final StandardNameTransformer namingResolver;

public KafkaDestination() {
this.namingResolver = new StandardNameTransformer();
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : "";
if (!testTopic.isBlank()) {
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
final KafkaDestinationConfig kafkaDestinationConfig = KafkaDestinationConfig.getKafkaDestinationConfig(config);
final KafkaProducer<String, JsonNode> producer = kafkaDestinationConfig.getProducer();
final String key = UUID.randomUUID().toString();
final JsonNode value = Jsons.jsonNode(ImmutableMap.of(
COLUMN_NAME_AB_ID, key,
COLUMN_NAME_STREAM, "test-topic-stream",
COLUMN_NAME_EMITTED_AT, System.currentTimeMillis(),
COLUMN_NAME_DATA, Jsons.jsonNode(ImmutableMap.of("test-key", "test-value"))));

final RecordMetadata metadata = producer.send(new ProducerRecord<>(
namingResolver.getIdentifier(testTopic), key, value)).get();
producer.flush();

LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", metadata.topic());
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage());
}
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new KafkaRecordConsumer(KafkaDestinationConfig.getKafkaDestinationConfig(config),
catalog,
outputRecordCollector,
namingResolver);
}

public static void main(String[] args) throws Exception {
final Destination destination = new KafkaDestination();
LOGGER.info("Starting destination: {}", KafkaDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("Completed destination: {}", KafkaDestination.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.stream.Collectors;

import io.airbyte.commons.json.Jsons;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDestinationConfig {

protected static final Logger LOGGER = LoggerFactory.getLogger(KafkaDestinationConfig.class);

private final String topicPattern;
private final boolean sync;
private final KafkaProducer<String, JsonNode> producer;

private KafkaDestinationConfig(String topicPattern, boolean sync, JsonNode config) {
this.topicPattern = topicPattern;
this.sync = sync;
this.producer = buildKafkaProducer(config);
}

public static KafkaDestinationConfig getKafkaDestinationConfig(JsonNode config) {
return new KafkaDestinationConfig(
config.get("topic_pattern").asText(),
config.has("sync_producer") && config.get("sync_producer").booleanValue(),
config);
}

private KafkaProducer<String, JsonNode> buildKafkaProducer(JsonNode config) {
final Map<String, Object> props = ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText())
.putAll(propertiesByProtocol(config))
.put(ProducerConfig.CLIENT_ID_CONFIG,
config.has("client_id") ? config.get("client_id").asText() : null)
.put(ProducerConfig.ACKS_CONFIG, config.get("acks").asText())
.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get("enable_idempotence").booleanValue())
.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get("compression_type").asText())
.put(ProducerConfig.BATCH_SIZE_CONFIG, config.get("batch_size").intValue())
.put(ProducerConfig.LINGER_MS_CONFIG, config.get("linger_ms").longValue())
.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
config.get("max_in_flight_requests_per_connection").intValue())
.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText())
.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get("buffer_memory").longValue())
.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get("max_request_size").intValue())
.put(ProducerConfig.RETRIES_CONFIG, config.get("retries").intValue())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
config.get("socket_connection_setup_timeout_ms").longValue())
.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
config.get("socket_connection_setup_timeout_max_ms").longValue())
.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get("max_block_ms").longValue())
.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get("request_timeout_ms").intValue())
.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get("delivery_timeout_ms").intValue())
.put(ProducerConfig.SEND_BUFFER_CONFIG, config.get("send_buffer_bytes").intValue())
.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get("receive_buffer_bytes").intValue())
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
.build();

final Map<String, Object> filteredProps = props.entrySet().stream()
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return new KafkaProducer<>(filteredProps);
}

private Map<String, Object> propertiesByProtocol(JsonNode config) {
JsonNode protocolConfig = config.get("protocol");
LOGGER.info("Kafka protocol config: {}", protocolConfig.toString());
final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase());
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString());

switch (protocol) {
case PLAINTEXT -> {
}
case SASL_SSL, SASL_PLAINTEXT -> {
builder.put(SaslConfigs.SASL_JAAS_CONFIG, config.get("sasl_jaas_config").asText());
builder.put(SaslConfigs.SASL_MECHANISM, config.get("sasl_mechanism").asText());
}
default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol));
}

return builder.build();
}

public String getTopicPattern() {
return topicPattern;
}

public boolean isSync() {
return sync;
}

public KafkaProducer<String, JsonNode> getProducer() {
return producer;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.integrations.destination.kafka;

public enum KafkaProtocol {

PLAINTEXT,
SASL_PLAINTEXT,
SASL_SSL

}
Loading