Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source:Kafka - Avro format message support #15827

Merged
merged 28 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e384d86
new format is added
sivankumar86 Aug 22, 2022
b169b54
Merge branch 'airbytehq:master' into master
sivankumar86 Aug 22, 2022
a2e54da
Avro support
sivankumar86 Aug 22, 2022
7b0790f
new format is added
sivankumar86 Aug 22, 2022
5383a1e
Merge branch 'master' into master
sivankumar86 Aug 22, 2022
ddbd195
Merge branch 'master' into master
sivankumar86 Aug 22, 2022
4c75c08
Merge branch 'master' into master
sivankumar86 Aug 23, 2022
be322ae
schema namespace updated
sivankumar86 Aug 23, 2022
92a981e
Merge branch 'master' of https://github.com/sivankumar86/airbyte
sivankumar86 Aug 23, 2022
c8cd31b
Merge branch 'master' into master
sivankumar86 Aug 23, 2022
a1c50fa
multi topic schema name is added
sivankumar86 Aug 25, 2022
5703ffe
Merge branch 'master' of https://github.com/sivankumar86/airbyte
sivankumar86 Aug 25, 2022
bae89ee
max_records_process param is added
sivankumar86 Aug 26, 2022
61191fe
Merge branch 'master' into master
sivankumar86 Aug 26, 2022
6be8f59
Merge branch 'master' into master
sajarin Aug 29, 2022
69127b4
Merge branch 'master' into master
sivankumar86 Aug 29, 2022
af5ac8e
Merge branch 'master' into master
sivankumar86 Sep 3, 2022
e0cc956
Merge branch 'master' into master
sajarin Sep 6, 2022
7c4e5ea
Merge branch 'master' into master
sivankumar86 Sep 8, 2022
e12767b
Merge branch 'master' into master
sajarin Sep 8, 2022
c0e71e0
review 1
sivankumar86 Sep 9, 2022
ac7dfed
Merge branch 'airbytehq:master' into master
sivankumar86 Sep 9, 2022
3edca7f
Schema registry details are added
sivankumar86 Sep 9, 2022
b0aad57
note update
sivankumar86 Sep 9, 2022
da4dd7a
Merge branch 'master' into master
sivankumar86 Sep 9, 2022
7551df3
Merge branch 'master' into master
sajarin Sep 9, 2022
549a16b
Merge branch 'master' into master
sivankumar86 Sep 12, 2022
f92f8e3
auto-bump connector version [ci skip]
octavia-squidington-iii Sep 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-kafka
15 changes: 13 additions & 2 deletions airbyte-integrations/connectors/source-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@ application {
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

repositories {
mavenLocal()
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
sivankumar86 marked this conversation as resolved.
Show resolved Hide resolved
}

}

dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
implementation libs.connectors.testcontainers.kafka

implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'org.apache.kafka:connect-json:2.8.0'
implementation 'org.apache.kafka:kafka-clients:3.2.1'
implementation 'org.apache.kafka:connect-json:3.2.1'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-kafka')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.source.kafka.format.AvroFormat;
import io.airbyte.integrations.source.kafka.format.JsonFormat;
import io.airbyte.integrations.source.kafka.format.KafkaFormat;

public class KafkaFormatFactory {

public static KafkaFormat getFormat(final JsonNode config) {

MessageFormat messageFormat =
config.has("MessageFormat") ? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase())
: MessageFormat.JSON;

switch (messageFormat) {
case JSON -> {
return new JsonFormat(config);
}
case AVRO -> {
return new AvroFormat(config);
}
}
return new JsonFormat(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,14 @@
package io.airbyte.integrations.source.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.integrations.source.kafka.format.KafkaFormat;
import io.airbyte.protocol.models.*;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,33 +24,19 @@ public KafkaSource() {}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : "";
if (!testTopic.isBlank()) {
final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getCheckConsumer();
consumer.subscribe(Pattern.compile(testTopic));
consumer.listTopics();
consumer.close();
LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText());
}
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
if (kafkaFormat.isAccessible()) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage());
}
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n");
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {

final Set<String> topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe();
final List<AirbyteStream> streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers
.createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.collect(Collectors.toList());
public AirbyteCatalog discover(final JsonNode config) {
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
final List<AirbyteStream> streams = kafkaFormat.getStreams();
return new AirbyteCatalog().withStreams(streams);
}

Expand All @@ -83,51 +47,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
}

final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getConsumer();
final List<ConsumerRecord<String, JsonNode>> recordsList = new ArrayList<>();

final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
int pollCount = 0;
final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100;
while (true) {
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
if (consumerRecords.count() == 0) {
pollCount++;
if (pollCount > retry) {
break;
}
}

consumerRecords.forEach(record -> {
LOGGER.info("Consumer Record: key - {}, value - {}, partition - {}, offset - {}",
record.key(), record.value(), record.partition(), record.offset());
recordsList.add(record);
});
consumer.commitAsync();
}
consumer.close();
final Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();

return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {

@Override
protected AirbyteMessage computeNext() {
if (iterator.hasNext()) {
final ConsumerRecord<String, JsonNode> record = iterator.next();
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(record.topic())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(record.value()));
}

return endOfData();
}

});
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
return kafkaFormat.read();
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;

/**
* https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html
*/
public enum KafkaStrategy {

TopicNameStrategy(TopicNameStrategy.class.getName()),
RecordNameStrategy(RecordNameStrategy.class.getName()),
TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName());

String className;

KafkaStrategy(String name) {
this.className = name;
}

public static String getStrategyName(String name) {
for (KafkaStrategy value : KafkaStrategy.values()) {
if (value.name().equalsIgnoreCase(name)) {
return value.className;
}
}
throw new IllegalArgumentException("Unexpected data to strategy setting: " + name);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

/**
* message format in kafka queue
* https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html
*/
public enum MessageFormat {
JSON,
AVRO
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,41 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;
package io.airbyte.integrations.source.kafka.format;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.Arrays;
import io.airbyte.integrations.source.kafka.KafkaProtocol;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceConfig {
public abstract class AbstractFormat implements KafkaFormat {

protected static final Logger LOGGER = LoggerFactory.getLogger(KafkaSourceConfig.class);
private static KafkaSourceConfig instance;
private final JsonNode config;
private KafkaConsumer<String, JsonNode> consumer;
private Set<String> topicsToSubscribe;
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFormat.class);

private KafkaSourceConfig(final JsonNode config) {
protected Set<String> topicsToSubscribe;
protected JsonNode config;

public AbstractFormat(JsonNode config) {
this.config = config;
}

public static KafkaSourceConfig getKafkaSourceConfig(final JsonNode config) {
if (instance == null) {
instance = new KafkaSourceConfig(config);
}
return instance;
}

private KafkaConsumer<String, JsonNode> buildKafkaConsumer(final JsonNode config) {
protected abstract KafkaConsumer<String, ?> getConsumer();

protected abstract Set<String> getTopicsToSubscribe();

protected Map<String, Object> getKafkaConfig() {

final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
Expand All @@ -66,14 +58,13 @@ private KafkaConsumer<String, JsonNode> buildKafkaConsumer(final JsonNode config
config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());

final Map<String, Object> filteredProps = props.entrySet().stream()
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return new KafkaConsumer<>(filteredProps);
return filteredProps;

}

private Map<String, Object> propertiesByProtocol(final JsonNode config) {
Expand All @@ -95,47 +86,4 @@ private Map<String, Object> propertiesByProtocol(final JsonNode config) {
return builder.build();
}

public KafkaConsumer<String, JsonNode> getConsumer() {
if (consumer != null) {
return consumer;
}
consumer = buildKafkaConsumer(config);

final JsonNode subscription = config.get("subscription");
LOGGER.info("Kafka subscribe method: {}", subscription.toString());
switch (subscription.get("subscription_type").asText()) {
case "subscribe" -> {
final String topicPattern = subscription.get("topic_pattern").asText();
consumer.subscribe(Pattern.compile(topicPattern));
topicsToSubscribe = consumer.listTopics().keySet().stream()
.filter(topic -> topic.matches(topicPattern))
.collect(Collectors.toSet());
}
case "assign" -> {
topicsToSubscribe = new HashSet<>();
final String topicPartitions = subscription.get("topic_partitions").asText();
final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(",");
final List<TopicPartition> topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> {
final String[] pair = topicPartition.split(":");
topicsToSubscribe.add(pair[0]);
return new TopicPartition(pair[0], Integer.parseInt(pair[1]));
}).collect(Collectors.toList());
LOGGER.info("Topic-partition list: {}", topicPartitionList);
consumer.assign(topicPartitionList);
}
}
return consumer;
}

public Set<String> getTopicsToSubscribe() {
if (topicsToSubscribe == null) {
getConsumer();
}
return topicsToSubscribe;
}

public KafkaConsumer<String, JsonNode> getCheckConsumer() {
return buildKafkaConsumer(config);
}

}
Loading