From e384d86708973298f081862f0afbe80434eb4829 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Mon, 22 Aug 2022 12:22:58 +1000 Subject: [PATCH 01/10] new format is added --- .../connectors/source-kafka/Dockerfile | 2 +- .../connectors/source-kafka/build.gradle | 15 +- .../source/kafka/KafkaFormatFactory.java | 25 +++ .../source/kafka/KafkaSource.java | 104 ++-------- .../source/kafka/KafkaSourceConfig.java | 141 -------------- .../source/kafka/KafkaStrategy.java | 28 +++ .../source/kafka/MessageFormat.java | 10 + .../source/kafka/format/AbstractFormat.java | 115 +++++++++++ .../source/kafka/format/AvroFormat.java | 184 ++++++++++++++++++ .../source/kafka/format/JsonFormat.java | 157 +++++++++++++++ .../source/kafka/format/KafkaFormat.java | 15 ++ .../source-kafka/src/main/resources/spec.json | 44 +++++ .../source/kafka/KafkaSourceTest.java | 25 +++ .../src/test/resources/test_config.json | 1 + 14 files changed, 631 insertions(+), 235 deletions(-) create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java delete mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java create mode 100644 airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json diff --git a/airbyte-integrations/connectors/source-kafka/Dockerfile b/airbyte-integrations/connectors/source-kafka/Dockerfile index b34d30c35565..c16cd48974bb 100644 --- a/airbyte-integrations/connectors/source-kafka/Dockerfile +++ b/airbyte-integrations/connectors/source-kafka/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/source-kafka diff --git a/airbyte-integrations/connectors/source-kafka/build.gradle b/airbyte-integrations/connectors/source-kafka/build.gradle index 028ea061692b..6ef80b0db86e 100644 --- a/airbyte-integrations/connectors/source-kafka/build.gradle +++ b/airbyte-integrations/connectors/source-kafka/build.gradle @@ -9,13 +9,24 @@ application { applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] } +repositories { + mavenLocal() + mavenCentral() + maven { + url "https://packages.confluent.io/maven" + } + +} + dependencies { implementation project(':airbyte-config:config-models') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:bases:base-java') + implementation libs.connectors.testcontainers.kafka - implementation 'org.apache.kafka:kafka-clients:2.8.0' - implementation 'org.apache.kafka:connect-json:2.8.0' + implementation 'org.apache.kafka:kafka-clients:3.2.1' + implementation 'org.apache.kafka:connect-json:3.2.1' + implementation 'io.confluent:kafka-avro-serializer:7.2.1' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-kafka') diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java new file mode 100644 index 000000000000..bab91ebbac4c --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java @@ -0,0 +1,25 @@ +package io.airbyte.integrations.source.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.source.kafka.format.AvroFormat; +import io.airbyte.integrations.source.kafka.format.JsonFormat; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; + +public class KafkaFormatFactory { + + public static KafkaFormat getFormat(final JsonNode config){ + + MessageFormat messageFormat = config.has("MessageFormat")? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) + :MessageFormat.JSON; + + switch (messageFormat) { + case JSON -> { + return new JsonFormat(config); + } + case AVRO -> { + return new AvroFormat(config); + } + } + return new JsonFormat(config); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java index 4beeba15353b..6ef41c0fd988 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java @@ -5,39 +5,18 @@ package io.airbyte.integrations.source.kafka; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Lists; import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; +import io.airbyte.protocol.models.*; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class KafkaSource extends BaseConnector implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); @@ -46,33 +25,19 @@ public KafkaSource() {} @Override public AirbyteConnectionStatus check(final JsonNode config) { - try { - final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; - if (!testTopic.isBlank()) { - final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config); - final KafkaConsumer consumer = kafkaSourceConfig.getCheckConsumer(); - consumer.subscribe(Pattern.compile(testTopic)); - consumer.listTopics(); - consumer.close(); - LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); - } + KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); + if(kafkaFormat.isAccessible()){ return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } catch (final Exception e) { - LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + } return new AirbyteConnectionStatus() .withStatus(Status.FAILED) - .withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage()); - } + .withMessage("Could not connect to the Kafka brokers with provided configuration. \n" ); } @Override - public AirbyteCatalog discover(final JsonNode config) throws Exception { - - final Set topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe(); - final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers - .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .collect(Collectors.toList()); + public AirbyteCatalog discover(final JsonNode config) { + KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); + final List streams = kafkaFormat.getStreams(); return new AirbyteCatalog().withStreams(streams); } @@ -83,51 +48,8 @@ public AutoCloseableIterator read(final JsonNode config, final C if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) { throw new RuntimeException("Unable establish a connection: " + check.getMessage()); } - - final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config); - final KafkaConsumer consumer = kafkaSourceConfig.getConsumer(); - final List> recordsList = new ArrayList<>(); - - final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - int pollCount = 0; - final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; - while (true) { - final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - if (consumerRecords.count() == 0) { - pollCount++; - if (pollCount > retry) { - break; - } - } - - consumerRecords.forEach(record -> { - LOGGER.info("Consumer Record: key - {}, value - {}, partition - {}, offset - {}", - record.key(), record.value(), record.partition(), record.offset()); - recordsList.add(record); - }); - consumer.commitAsync(); - } - consumer.close(); - final Iterator> iterator = recordsList.iterator(); - - return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { - - @Override - protected AirbyteMessage computeNext() { - if (iterator.hasNext()) { - final ConsumerRecord record = iterator.next(); - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(record.topic()) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(record.value())); - } - - return endOfData(); - } - - }); + KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); + return kafkaFormat.read(); } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java deleted file mode 100644 index d92c6a31dedb..000000000000 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.kafka; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaSourceConfig { - - protected static final Logger LOGGER = LoggerFactory.getLogger(KafkaSourceConfig.class); - private static KafkaSourceConfig instance; - private final JsonNode config; - private KafkaConsumer consumer; - private Set topicsToSubscribe; - - private KafkaSourceConfig(final JsonNode config) { - this.config = config; - } - - public static KafkaSourceConfig getKafkaSourceConfig(final JsonNode config) { - if (instance == null) { - instance = new KafkaSourceConfig(config); - } - return instance; - } - - private KafkaConsumer buildKafkaConsumer(final JsonNode config) { - final Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, - config.has("group_id") ? config.get("group_id").asText() : null); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null); - props.putAll(propertiesByProtocol(config)); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, - config.has("client_id") ? config.get("client_id").asText() : null); - props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue()); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null); - props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, - config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null); - props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, - config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null); - props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, - config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); - - final Map filteredProps = props.entrySet().stream() - .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - return new KafkaConsumer<>(filteredProps); - } - - private Map propertiesByProtocol(final JsonNode config) { - final JsonNode protocolConfig = config.get("protocol"); - LOGGER.info("Kafka protocol config: {}", protocolConfig.toString()); - final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase()); - final ImmutableMap.Builder builder = ImmutableMap.builder() - .put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); - - switch (protocol) { - case PLAINTEXT -> {} - case SASL_SSL, SASL_PLAINTEXT -> { - builder.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText()); - builder.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText()); - } - default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol)); - } - - return builder.build(); - } - - public KafkaConsumer getConsumer() { - if (consumer != null) { - return consumer; - } - consumer = buildKafkaConsumer(config); - - final JsonNode subscription = config.get("subscription"); - LOGGER.info("Kafka subscribe method: {}", subscription.toString()); - switch (subscription.get("subscription_type").asText()) { - case "subscribe" -> { - final String topicPattern = subscription.get("topic_pattern").asText(); - consumer.subscribe(Pattern.compile(topicPattern)); - topicsToSubscribe = consumer.listTopics().keySet().stream() - .filter(topic -> topic.matches(topicPattern)) - .collect(Collectors.toSet()); - } - case "assign" -> { - topicsToSubscribe = new HashSet<>(); - final String topicPartitions = subscription.get("topic_partitions").asText(); - final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); - final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { - final String[] pair = topicPartition.split(":"); - topicsToSubscribe.add(pair[0]); - return new TopicPartition(pair[0], Integer.parseInt(pair[1])); - }).collect(Collectors.toList()); - LOGGER.info("Topic-partition list: {}", topicPartitionList); - consumer.assign(topicPartitionList); - } - } - return consumer; - } - - public Set getTopicsToSubscribe() { - if (topicsToSubscribe == null) { - getConsumer(); - } - return topicsToSubscribe; - } - - public KafkaConsumer getCheckConsumer() { - return buildKafkaConsumer(config); - } - -} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java new file mode 100644 index 000000000000..30205d752666 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java @@ -0,0 +1,28 @@ +package io.airbyte.integrations.source.kafka; + +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import io.confluent.kafka.serializers.subject.TopicNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; + +/** + * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html + */ +public enum KafkaStrategy { + TopicNameStrategy(TopicNameStrategy.class.getName()), + RecordNameStrategy(RecordNameStrategy.class.getName()), + TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName()); + + String className; + KafkaStrategy(String name){ + this.className=name; + } + + public static String getStrategyName(String name){ + for (KafkaStrategy value:KafkaStrategy.values()){ + if(value.name().equalsIgnoreCase(name)){ + return value.className; + } + } + throw new IllegalArgumentException("Unexpected data to strategy setting: " + name); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java new file mode 100644 index 000000000000..7338a2307e65 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java @@ -0,0 +1,10 @@ +package io.airbyte.integrations.source.kafka; + +/** + * message format in kafka queue + * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html + */ +public enum MessageFormat { + JSON, + AVRO +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java new file mode 100644 index 000000000000..90132940dce3 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java @@ -0,0 +1,115 @@ +package io.airbyte.integrations.source.kafka.format; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.kafka.KafkaProtocol; +import io.airbyte.integrations.source.kafka.KafkaStrategy; +import io.airbyte.integrations.source.kafka.MessageFormat; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class AbstractFormat implements KafkaFormat { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFormat.class); + + protected Set topicsToSubscribe; + protected JsonNode config; + + public AbstractFormat(JsonNode config) { + this.config=config; + + } + + protected abstract KafkaConsumer getConsumer(); + + + + protected abstract Set getTopicsToSubscribe(); + + protected Map getKafkaConfig(){ + + final Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + config.has("group_id") ? config.get("group_id").asText() : null); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null); + props.putAll(propertiesByProtocol(config)); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, + config.has("client_id") ? config.get("client_id").asText() : null); + props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue()); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null); + props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, + config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, + config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null); + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, + config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); + + MessageFormat messageFormat = config.has("MessageFormat")? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) + :MessageFormat.JSON; + switch (messageFormat) { + case JSON -> { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + + } + case AVRO -> { + final JsonNode avro_config=config.get("MessageFormat"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG,String.format("%s:%s",avro_config.get("schema_registry_username").asText() + ,avro_config.get("schema_registry_password").asText())); + props.put( KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); + } + } + + final Map filteredProps = props.entrySet().stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return filteredProps; + + } + + private Map propertiesByProtocol(final JsonNode config) { + final JsonNode protocolConfig = config.get("protocol"); + LOGGER.info("Kafka protocol config: {}", protocolConfig.toString()); + final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase()); + final ImmutableMap.Builder builder = ImmutableMap.builder() + .put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + + switch (protocol) { + case PLAINTEXT -> {} + case SASL_SSL, SASL_PLAINTEXT -> { + builder.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText()); + builder.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText()); + } + default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol)); + } + + return builder.build(); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java new file mode 100644 index 000000000000..620858b62be5 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -0,0 +1,184 @@ +package io.airbyte.integrations.source.kafka.format; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.source.kafka.KafkaStrategy; +import io.airbyte.protocol.models.*; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class AvroFormat extends AbstractFormat{ + private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); + + private KafkaConsumer consumer; + + public AvroFormat(JsonNode jsonConfig){ + super(jsonConfig); + } + + @Override + protected Map getKafkaConfig() { + Map props=super.getKafkaConfig(); + final JsonNode avro_config=config.get("MessageFormat"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG,String.format("%s:%s",avro_config.get("schema_registry_username").asText() + ,avro_config.get("schema_registry_password").asText())); + props.put( KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); + return props; + } + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; + } + Map filteredProps=getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } + } + return consumer; + } + + @Override + protected Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); + } + return topicsToSubscribe; + } + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + int pollCount = 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + pollCount++; + if (pollCount > retry) { + break; + } + } + + consumerRecords.forEach(record -> { + recordsList.add(record); + }); + consumer.commitAsync(); + } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + GenericRecord avro_data = record.value(); + ObjectMapper mapper = new ObjectMapper(); + String namespace=avro_data.getSchema().getNamespace(); + JsonNode output; + try { + //Todo dynamic namespace is not supported now hence, adding avro schema name in the message + String newString = String.format("{\"avro_schema\": \"%s\"}", namespace); + JsonNode newNode = mapper.readTree(newString); + output = mapper.readTree(avro_data.toString()); + ((ObjectNode) output).set("namespace", newNode); + } catch (JsonProcessingException e) { + LOGGER.error("Exception whilst reading avro data from stream", e); + throw new RuntimeException(e); + } + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(output)); + } + + return endOfData(); + } + + }); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java new file mode 100644 index 000000000000..71209ba44e1c --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -0,0 +1,157 @@ +package io.airbyte.integrations.source.kafka.format; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.protocol.models.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class JsonFormat extends AbstractFormat{ + private static final Logger LOGGER = LoggerFactory.getLogger(JsonFormat.class); + + private KafkaConsumer consumer; + + public JsonFormat(JsonNode jsonConfig){ + super(jsonConfig); + } + + + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; + } + Map filteredProps=getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } + } + return consumer; + } + + @Override + protected Map getKafkaConfig() { + Map props=super.getKafkaConfig(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + return props; + } + + public Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); + } + return topicsToSubscribe; + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + int pollCount = 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + pollCount++; + if (pollCount > retry) { + break; + } + } + + consumerRecords.forEach(record -> { + recordsList.add(record); + }); + consumer.commitAsync(); + } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(record.value())); + } + + return endOfData(); + } + + }); + } + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java new file mode 100644 index 000000000000..6f6c587614d4 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java @@ -0,0 +1,15 @@ +package io.airbyte.integrations.source.kafka.format; + +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; + +import java.util.List; + +public interface KafkaFormat { + + boolean isAccessible(); + List getStreams(); + + AutoCloseableIterator read(); +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json index 1a64203d5a34..1be07d1a7ef6 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json @@ -11,6 +11,50 @@ "required": ["bootstrap_servers", "subscription", "protocol"], "additionalProperties": false, "properties": { + "MessageFormat": { + "title": "MessageFormat", + "type": "object", + "description": "The serialization used based on this ", + "oneOf": [ + { + "title": "JSON", + "properties": { + "deserialization_type": { + "type": "string", + "enum": ["JSON"], + "default": "JSON" + } + } + }, + { + "title": "AVRO", + "properties": { + "deserialization_type": { + "type": "string", + "enum": ["AVRO"], + "default": "AVRO" + }, + "deserialization_strategy": { + "type": "string", + "enum": ["TopicNameStrategy","RecordNameStrategy","TopicRecordNameStrategy"], + "default": "TopicNameStrategy" + }, + "schema_registry_url": { + "type": "string", + "examples": ["http://localhost:8081"] + }, + "schema_registry_username": { + "type": "string", + "default": "" + }, + "schema_registry_password": { + "type": "string", + "default": "" + } + } + } + ] + }, "bootstrap_servers": { "title": "Bootstrap Servers", "description": "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).", diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java new file mode 100644 index 000000000000..5840bed684fc --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java @@ -0,0 +1,25 @@ +package io.airbyte.integrations.source.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.source.kafka.format.AvroFormat; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + + +public class KafkaSourceTest { + + + @Test + public void testAvroformat() throws IOException { + final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json")); + final KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(configJson); + assertInstanceOf(AvroFormat.class,kafkaFormat); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json new file mode 100644 index 000000000000..b365d73b5132 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json @@ -0,0 +1 @@ +{"group_id":"login","protocol":{"sasl_mechanism":"PLAIN","sasl_jaas_config":"org.apache.kafka.common.security.plain.PlainLoginModule ;","security_protocol":"SASL_SSL"},"client_id":"airbyte-login-consumer","test_topic":"","polling_time":100,"subscription":{"topic_pattern":"dev-accounts-lms-transaction-created","subscription_type":"subscribe"},"MessageFormat":{"schema_registry_url":"http://localhost","deserialization_type":"AVRO","deserialization_strategy":"TopicRecordNameStrategy","schema_registry_password":"password","schema_registry_username":"username"},"repeated_calls":3,"max_poll_records":500,"retry_backoff_ms":100,"auto_offset_reset":"earliest","bootstrap_servers":"localhost:9092","client_dns_lookup":"use_all_dns_ips","enable_auto_commit":true,"request_timeout_ms":30000,"receive_buffer_bytes":32768,"auto_commit_interval_ms":5000} \ No newline at end of file From a2e54da8efce3fcd6513a6a4448617d5fb392dcf Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Mon, 22 Aug 2022 13:03:12 +1000 Subject: [PATCH 02/10] Avro support Avro support --- docs/integrations/sources/kafka.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/integrations/sources/kafka.md b/docs/integrations/sources/kafka.md index 48815e18389f..5a3dac09d9ba 100644 --- a/docs/integrations/sources/kafka.md +++ b/docs/integrations/sources/kafka.md @@ -40,10 +40,15 @@ The Kafka source connector supports the following[sync modes](https://docs.airby | Incremental - Append Sync | Yes | | | Namespaces | No | | +## Supported Format + Json - Json value messages + Avro - deserialize Using confluent API. Please refer (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html) + ## Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :------------------------------------------------------| :---------------------------------------- | +| 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added Avro format support | | 0.1.7 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | | 0.1.6 | 2022-05-29 | [12903](https://github.com/airbytehq/airbyte/pull/12903) | Add Polling Time to Specification (default 100 ms) | | 0.1.5 | 2022-04-19 | [12134](https://github.com/airbytehq/airbyte/pull/12134) | Add PLAIN Auth | From 7b0790fa6391ab1844917e581e041d78a1e0cb9b Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Mon, 22 Aug 2022 14:41:19 +1000 Subject: [PATCH 03/10] new format is added --- .../source/kafka/format/AbstractFormat.java | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java index 90132940dce3..2be1ce4e70c9 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java @@ -4,18 +4,10 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.kafka.KafkaProtocol; -import io.airbyte.integrations.source.kafka.KafkaStrategy; -import io.airbyte.integrations.source.kafka.MessageFormat; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,26 +57,6 @@ protected Map getKafkaConfig(){ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); - MessageFormat messageFormat = config.has("MessageFormat")? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) - :MessageFormat.JSON; - switch (messageFormat) { - case JSON -> { - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); - - } - case AVRO -> { - final JsonNode avro_config=config.get("MessageFormat"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); - props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG,String.format("%s:%s",avro_config.get("schema_registry_username").asText() - ,avro_config.get("schema_registry_password").asText())); - props.put( KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); - props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); - } - } - final Map filteredProps = props.entrySet().stream() .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); From be322aeb5e3f5485a4c959849451d41254820cbb Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Tue, 23 Aug 2022 14:45:03 +1000 Subject: [PATCH 04/10] schema namespace updated --- .../io/airbyte/integrations/source/kafka/format/AvroFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index 620858b62be5..4669f36f08d7 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -163,7 +163,7 @@ protected AirbyteMessage computeNext() { String newString = String.format("{\"avro_schema\": \"%s\"}", namespace); JsonNode newNode = mapper.readTree(newString); output = mapper.readTree(avro_data.toString()); - ((ObjectNode) output).set("namespace", newNode); + ((ObjectNode) output).set("_namespace_", newNode); } catch (JsonProcessingException e) { LOGGER.error("Exception whilst reading avro data from stream", e); throw new RuntimeException(e); From a1c50fac937c9bd091ba60498672aa8955c631a6 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Thu, 25 Aug 2022 18:24:26 +1000 Subject: [PATCH 05/10] multi topic schema name is added --- .../source/kafka/format/AvroFormat.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index 4669f36f08d7..20dc1e73ad63 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -15,6 +15,7 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -134,6 +135,7 @@ public AutoCloseableIterator read() { int pollCount = 0; final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; while (true) { + consumer.assignment() final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); if (consumerRecords.count() == 0) { pollCount++; @@ -143,6 +145,7 @@ public AutoCloseableIterator read() { } consumerRecords.forEach(record -> { + record. recordsList.add(record); }); consumer.commitAsync(); @@ -157,13 +160,18 @@ protected AirbyteMessage computeNext() { GenericRecord avro_data = record.value(); ObjectMapper mapper = new ObjectMapper(); String namespace=avro_data.getSchema().getNamespace(); + String name= avro_data.getSchema().getName(); JsonNode output; try { //Todo dynamic namespace is not supported now hence, adding avro schema name in the message - String newString = String.format("{\"avro_schema\": \"%s\"}", namespace); - JsonNode newNode = mapper.readTree(newString); - output = mapper.readTree(avro_data.toString()); - ((ObjectNode) output).set("_namespace_", newNode); + if(StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { + String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); + JsonNode newNode = mapper.readTree(newString); + output = mapper.readTree(avro_data.toString()); + ((ObjectNode) output).set("_namespace_", newNode); + }else{ + output = mapper.readTree(avro_data.toString()); + } } catch (JsonProcessingException e) { LOGGER.error("Exception whilst reading avro data from stream", e); throw new RuntimeException(e); From bae89ee6a65d7ae93a233b77aaed6b910992ba08 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Fri, 26 Aug 2022 16:25:32 +1000 Subject: [PATCH 06/10] max_records_process param is added --- .../source/kafka/format/AvroFormat.java | 26 +++++++++++++++---- .../source/kafka/format/JsonFormat.java | 24 ++++++++++++++--- .../source-kafka/src/main/resources/spec.json | 6 +++++ 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index 20dc1e73ad63..bb9c4eae9ecd 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -29,6 +29,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -132,20 +133,35 @@ public AutoCloseableIterator read() { final KafkaConsumer consumer = getConsumer(); final List> recordsList = new ArrayList<>(); final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - int pollCount = 0; final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count= new AtomicInteger(); + final Map poll_lookup=new HashMap<>(); + getTopicsToSubscribe().forEach( topic -> + poll_lookup.put(topic,0) + ); while (true) { - consumer.assignment() final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); if (consumerRecords.count() == 0) { - pollCount++; - if (pollCount > retry) { + consumer.assignment().stream().map( record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic,poll_lookup.get(topic)+1); + } + ); + boolean is_complete=poll_lookup.entrySet().stream().allMatch( + e ->e.getValue() > retry + ); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); break; } + }else if(record_count.get() > max_records){ + LOGGER.info("Max record count is reached !!"); + break; } consumerRecords.forEach(record -> { - record. + record_count.getAndIncrement(); recordsList.add(record); }); consumer.commitAsync(); diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index 71209ba44e1c..ae643074b94f 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -20,6 +20,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -100,15 +101,30 @@ public AutoCloseableIterator read() { final KafkaConsumer consumer = getConsumer(); final List> recordsList = new ArrayList<>(); final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - int pollCount = 0; final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count= new AtomicInteger(); + final Map poll_lookup=new HashMap<>(); + getTopicsToSubscribe().forEach( topic -> + poll_lookup.put(topic,0) + ); while (true) { final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - if (consumerRecords.count() == 0) { - pollCount++; - if (pollCount > retry) { + if (consumerRecords.count() == 0) { consumer.assignment().stream().map( record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic,poll_lookup.get(topic)+1); + } + ); + boolean is_complete=poll_lookup.entrySet().stream().allMatch( + e ->e.getValue() > retry + ); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); break; } + }else if(record_count.get() > max_records){ + LOGGER.info("Max record count is reached !!"); + break; } consumerRecords.forEach(record -> { diff --git a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json index 1be07d1a7ef6..c74230165960 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json @@ -269,6 +269,12 @@ "description": "The number of repeated calls to poll() if no messages were received.", "type": "integer", "default": 3 + }, + "max_records_process": { + "title": "Maximum Records", + "description": "The Maximum records needs to be processed per execution", + "type": "integer", + "default": 100000 } } } From c0e71e00ffd8348fa2b4c5c4ab8bba9c7c4d0c33 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Fri, 9 Sep 2022 10:26:24 +1000 Subject: [PATCH 07/10] review 1 --- .../source/kafka/KafkaFormatFactory.java | 30 +- .../source/kafka/KafkaSource.java | 17 +- .../source/kafka/KafkaStrategy.java | 35 +- .../source/kafka/MessageFormat.java | 8 +- .../source/kafka/format/AbstractFormat.java | 138 +++---- .../source/kafka/format/AvroFormat.java | 341 +++++++++--------- .../source/kafka/format/JsonFormat.java | 279 +++++++------- .../source/kafka/format/KafkaFormat.java | 13 +- .../source-kafka/src/main/resources/spec.json | 8 +- .../source/kafka/KafkaSourceTest.java | 25 +- .../src/test/resources/test_config.json | 33 +- 11 files changed, 495 insertions(+), 432 deletions(-) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java index bab91ebbac4c..43c6c73cc631 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka; import com.fasterxml.jackson.databind.JsonNode; @@ -7,19 +11,21 @@ public class KafkaFormatFactory { - public static KafkaFormat getFormat(final JsonNode config){ + public static KafkaFormat getFormat(final JsonNode config) { - MessageFormat messageFormat = config.has("MessageFormat")? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) - :MessageFormat.JSON; + MessageFormat messageFormat = + config.has("MessageFormat") ? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) + : MessageFormat.JSON; - switch (messageFormat) { - case JSON -> { - return new JsonFormat(config); - } - case AVRO -> { - return new AvroFormat(config); - } - } - return new JsonFormat(config); + switch (messageFormat) { + case JSON -> { + return new JsonFormat(config); + } + case AVRO -> { + return new AvroFormat(config); + } } + return new JsonFormat(config); + } + } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java index 6ef41c0fd988..103a50700b3d 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java @@ -12,11 +12,10 @@ import io.airbyte.integrations.source.kafka.format.KafkaFormat; import io.airbyte.protocol.models.*; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - public class KafkaSource extends BaseConnector implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); @@ -25,18 +24,18 @@ public KafkaSource() {} @Override public AirbyteConnectionStatus check(final JsonNode config) { - KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); - if(kafkaFormat.isAccessible()){ + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); + if (kafkaFormat.isAccessible()) { return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); } - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("Could not connect to the Kafka brokers with provided configuration. \n" ); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage("Could not connect to the Kafka brokers with provided configuration. \n"); } @Override public AirbyteCatalog discover(final JsonNode config) { - KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); final List streams = kafkaFormat.getStreams(); return new AirbyteCatalog().withStreams(streams); } @@ -48,7 +47,7 @@ public AutoCloseableIterator read(final JsonNode config, final C if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) { throw new RuntimeException("Unable establish a connection: " + check.getMessage()); } - KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(config); + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); return kafkaFormat.read(); } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java index 30205d752666..8b94a1308edf 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka; import io.confluent.kafka.serializers.subject.RecordNameStrategy; @@ -8,21 +12,24 @@ * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html */ public enum KafkaStrategy { - TopicNameStrategy(TopicNameStrategy.class.getName()), - RecordNameStrategy(RecordNameStrategy.class.getName()), - TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName()); - String className; - KafkaStrategy(String name){ - this.className=name; - } + TopicNameStrategy(TopicNameStrategy.class.getName()), + RecordNameStrategy(RecordNameStrategy.class.getName()), + TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName()); - public static String getStrategyName(String name){ - for (KafkaStrategy value:KafkaStrategy.values()){ - if(value.name().equalsIgnoreCase(name)){ - return value.className; - } - } - throw new IllegalArgumentException("Unexpected data to strategy setting: " + name); + String className; + + KafkaStrategy(String name) { + this.className = name; + } + + public static String getStrategyName(String name) { + for (KafkaStrategy value : KafkaStrategy.values()) { + if (value.name().equalsIgnoreCase(name)) { + return value.className; + } } + throw new IllegalArgumentException("Unexpected data to strategy setting: " + name); + } + } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java index 7338a2307e65..0e06fd784ddd 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka; /** @@ -5,6 +9,6 @@ * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html */ public enum MessageFormat { - JSON, - AVRO + JSON, + AVRO } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java index 2be1ce4e70c9..fd475d0d0f6e 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java @@ -1,9 +1,17 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka.format; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.kafka.KafkaProtocol; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,77 +19,71 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - public abstract class AbstractFormat implements KafkaFormat { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFormat.class); - - protected Set topicsToSubscribe; - protected JsonNode config; - - public AbstractFormat(JsonNode config) { - this.config=config; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFormat.class); + + protected Set topicsToSubscribe; + protected JsonNode config; + + public AbstractFormat(JsonNode config) { + this.config = config; + + } + + protected abstract KafkaConsumer getConsumer(); + + protected abstract Set getTopicsToSubscribe(); + + protected Map getKafkaConfig() { + + final Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + config.has("group_id") ? config.get("group_id").asText() : null); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null); + props.putAll(propertiesByProtocol(config)); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, + config.has("client_id") ? config.get("client_id").asText() : null); + props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue()); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null); + props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, + config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, + config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null); + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, + config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); + + final Map filteredProps = props.entrySet().stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return filteredProps; + + } + + private Map propertiesByProtocol(final JsonNode config) { + final JsonNode protocolConfig = config.get("protocol"); + LOGGER.info("Kafka protocol config: {}", protocolConfig.toString()); + final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase()); + final ImmutableMap.Builder builder = ImmutableMap.builder() + .put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + + switch (protocol) { + case PLAINTEXT -> {} + case SASL_SSL, SASL_PLAINTEXT -> { + builder.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText()); + builder.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText()); + } + default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol)); } - protected abstract KafkaConsumer getConsumer(); - - - - protected abstract Set getTopicsToSubscribe(); - - protected Map getKafkaConfig(){ - - final Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, - config.has("group_id") ? config.get("group_id").asText() : null); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null); - props.putAll(propertiesByProtocol(config)); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, - config.has("client_id") ? config.get("client_id").asText() : null); - props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue()); - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null); - props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, - config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null); - props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, - config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null); - props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, - config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); - - final Map filteredProps = props.entrySet().stream() - .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - return filteredProps; - - } - - private Map propertiesByProtocol(final JsonNode config) { - final JsonNode protocolConfig = config.get("protocol"); - LOGGER.info("Kafka protocol config: {}", protocolConfig.toString()); - final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase()); - final ImmutableMap.Builder builder = ImmutableMap.builder() - .put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); - - switch (protocol) { - case PLAINTEXT -> {} - case SASL_SSL, SASL_PLAINTEXT -> { - builder.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText()); - builder.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText()); - } - default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol)); - } - - return builder.build(); - } + return builder.build(); + } } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index bb9c4eae9ecd..2b593278c848 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka.format; import com.fasterxml.jackson.core.JsonProcessingException; @@ -14,6 +18,13 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -25,184 +36,176 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class AvroFormat extends AbstractFormat{ - private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); - - private KafkaConsumer consumer; - - public AvroFormat(JsonNode jsonConfig){ - super(jsonConfig); +public class AvroFormat extends AbstractFormat { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); + + private KafkaConsumer consumer; + + public AvroFormat(JsonNode jsonConfig) { + super(jsonConfig); + } + + @Override + protected Map getKafkaConfig() { + Map props = super.getKafkaConfig(); + final JsonNode avro_config = config.get("MessageFormat"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, + String.format("%s:%s", avro_config.get("schema_registry_username").asText(), avro_config.get("schema_registry_password").asText())); + props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, + KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); + return props; + } + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; } - - @Override - protected Map getKafkaConfig() { - Map props=super.getKafkaConfig(); - final JsonNode avro_config=config.get("MessageFormat"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); - props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG,String.format("%s:%s",avro_config.get("schema_registry_username").asText() - ,avro_config.get("schema_registry_password").asText())); - props.put( KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); - props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); - return props; + Map filteredProps = getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } } + return consumer; + } - @Override - protected KafkaConsumer getConsumer() { - if (consumer != null) { - return consumer; - } - Map filteredProps=getKafkaConfig(); - consumer = new KafkaConsumer<>(filteredProps); - - final JsonNode subscription = config.get("subscription"); - LOGGER.info("Kafka subscribe method: {}", subscription.toString()); - switch (subscription.get("subscription_type").asText()) { - case "subscribe" -> { - final String topicPattern = subscription.get("topic_pattern").asText(); - consumer.subscribe(Pattern.compile(topicPattern)); - topicsToSubscribe = consumer.listTopics().keySet().stream() - .filter(topic -> topic.matches(topicPattern)) - .collect(Collectors.toSet()); - LOGGER.info("Topic list: {}", topicsToSubscribe); - } - case "assign" -> { - topicsToSubscribe = new HashSet<>(); - final String topicPartitions = subscription.get("topic_partitions").asText(); - final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); - final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { - final String[] pair = topicPartition.split(":"); - topicsToSubscribe.add(pair[0]); - return new TopicPartition(pair[0], Integer.parseInt(pair[1])); - }).collect(Collectors.toList()); - LOGGER.info("Topic-partition list: {}", topicPartitionList); - consumer.assign(topicPartitionList); - } - } - return consumer; + @Override + protected Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); } - - @Override - protected Set getTopicsToSubscribe() { - if (topicsToSubscribe == null) { - getConsumer(); + return topicsToSubscribe; + } + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count = new AtomicInteger(); + final Map poll_lookup = new HashMap<>(); + getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic, poll_lookup.get(topic) + 1); + }); + boolean is_complete = poll_lookup.entrySet().stream().allMatch( + e -> e.getValue() > retry); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); + break; } - return topicsToSubscribe; + } else if (record_count.get() > max_records) { + LOGGER.info("Max record count is reached !!"); + break; + } + + consumerRecords.forEach(record -> { + record_count.getAndIncrement(); + recordsList.add(record); + }); + consumer.commitAsync(); } - - @Override - public boolean isAccessible() { - try { - final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; - if (!testTopic.isBlank()) { - final KafkaConsumer consumer = getConsumer(); - consumer.subscribe(Pattern.compile(testTopic)); - consumer.listTopics(); - consumer.close(); - LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + GenericRecord avro_data = record.value(); + ObjectMapper mapper = new ObjectMapper(); + String namespace = avro_data.getSchema().getNamespace(); + String name = avro_data.getSchema().getName(); + JsonNode output; + try { + // Todo dynamic namespace is not supported now hence, adding avro schema name in the message + if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { + String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); + JsonNode newNode = mapper.readTree(newString); + output = mapper.readTree(avro_data.toString()); + ((ObjectNode) output).set("_namespace_", newNode); + } else { + output = mapper.readTree(avro_data.toString()); } - return true; - } catch (final Exception e) { - LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); - return false; + } catch (JsonProcessingException e) { + LOGGER.error("Exception whilst reading avro data from stream", e); + throw new RuntimeException(e); + } + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(output)); } - } - @Override - public List getStreams() { - final Set topicsToSubscribe = getTopicsToSubscribe(); - final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers - .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .collect(Collectors.toList()); - return streams; - } - - @Override - public AutoCloseableIterator read() { - - final KafkaConsumer consumer = getConsumer(); - final List> recordsList = new ArrayList<>(); - final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; - final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; - AtomicInteger record_count= new AtomicInteger(); - final Map poll_lookup=new HashMap<>(); - getTopicsToSubscribe().forEach( topic -> - poll_lookup.put(topic,0) - ); - while (true) { - final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - if (consumerRecords.count() == 0) { - consumer.assignment().stream().map( record -> record.topic()).distinct().forEach( - topic -> { - poll_lookup.put(topic,poll_lookup.get(topic)+1); - } - ); - boolean is_complete=poll_lookup.entrySet().stream().allMatch( - e ->e.getValue() > retry - ); - if (is_complete) { - LOGGER.info("There is no new data in the queue!!"); - break; - } - }else if(record_count.get() > max_records){ - LOGGER.info("Max record count is reached !!"); - break; - } + return endOfData(); + } - consumerRecords.forEach(record -> { - record_count.getAndIncrement(); - recordsList.add(record); - }); - consumer.commitAsync(); - } - consumer.close(); - final Iterator> iterator = recordsList.iterator(); - return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { - @Override - protected AirbyteMessage computeNext() { - if (iterator.hasNext()) { - final ConsumerRecord record = iterator.next(); - GenericRecord avro_data = record.value(); - ObjectMapper mapper = new ObjectMapper(); - String namespace=avro_data.getSchema().getNamespace(); - String name= avro_data.getSchema().getName(); - JsonNode output; - try { - //Todo dynamic namespace is not supported now hence, adding avro schema name in the message - if(StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { - String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); - JsonNode newNode = mapper.readTree(newString); - output = mapper.readTree(avro_data.toString()); - ((ObjectNode) output).set("_namespace_", newNode); - }else{ - output = mapper.readTree(avro_data.toString()); - } - } catch (JsonProcessingException e) { - LOGGER.error("Exception whilst reading avro data from stream", e); - throw new RuntimeException(e); - } - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(record.topic()) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(output)); - } - - return endOfData(); - } + }); + } - }); - } } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index ae643074b94f..c34fe80dd56e 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka.format; import com.fasterxml.jackson.databind.JsonNode; @@ -6,6 +10,13 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.protocol.models.*; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -16,158 +27,148 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class JsonFormat extends AbstractFormat{ - private static final Logger LOGGER = LoggerFactory.getLogger(JsonFormat.class); - - private KafkaConsumer consumer; +public class JsonFormat extends AbstractFormat { - public JsonFormat(JsonNode jsonConfig){ - super(jsonConfig); - } + private static final Logger LOGGER = LoggerFactory.getLogger(JsonFormat.class); + private KafkaConsumer consumer; + public JsonFormat(JsonNode jsonConfig) { + super(jsonConfig); + } - @Override - protected KafkaConsumer getConsumer() { - if (consumer != null) { - return consumer; - } - Map filteredProps=getKafkaConfig(); - consumer = new KafkaConsumer<>(filteredProps); - - final JsonNode subscription = config.get("subscription"); - LOGGER.info("Kafka subscribe method: {}", subscription.toString()); - switch (subscription.get("subscription_type").asText()) { - case "subscribe" -> { - final String topicPattern = subscription.get("topic_pattern").asText(); - consumer.subscribe(Pattern.compile(topicPattern)); - topicsToSubscribe = consumer.listTopics().keySet().stream() - .filter(topic -> topic.matches(topicPattern)) - .collect(Collectors.toSet()); - LOGGER.info("Topic list: {}", topicsToSubscribe); - } - case "assign" -> { - topicsToSubscribe = new HashSet<>(); - final String topicPartitions = subscription.get("topic_partitions").asText(); - final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); - final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { - final String[] pair = topicPartition.split(":"); - topicsToSubscribe.add(pair[0]); - return new TopicPartition(pair[0], Integer.parseInt(pair[1])); - }).collect(Collectors.toList()); - LOGGER.info("Topic-partition list: {}", topicPartitionList); - consumer.assign(topicPartitionList); - } - } - return consumer; + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; } - - @Override - protected Map getKafkaConfig() { - Map props=super.getKafkaConfig(); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); - return props; + Map filteredProps = getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } } - - public Set getTopicsToSubscribe() { - if (topicsToSubscribe == null) { - getConsumer(); + return consumer; + } + + @Override + protected Map getKafkaConfig() { + Map props = super.getKafkaConfig(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + return props; + } + + public Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); + } + return topicsToSubscribe; + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count = new AtomicInteger(); + final Map poll_lookup = new HashMap<>(); + getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic, poll_lookup.get(topic) + 1); + }); + boolean is_complete = poll_lookup.entrySet().stream().allMatch( + e -> e.getValue() > retry); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); + break; } - return topicsToSubscribe; + } else if (record_count.get() > max_records) { + LOGGER.info("Max record count is reached !!"); + break; + } + + consumerRecords.forEach(record -> { + recordsList.add(record); + }); + consumer.commitAsync(); } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(record.value())); + } - @Override - public List getStreams() { - final Set topicsToSubscribe = getTopicsToSubscribe(); - final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers - .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .collect(Collectors.toList()); - return streams; - } + return endOfData(); + } - @Override - public AutoCloseableIterator read() { + }); + } + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { final KafkaConsumer consumer = getConsumer(); - final List> recordsList = new ArrayList<>(); - final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; - final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; - AtomicInteger record_count= new AtomicInteger(); - final Map poll_lookup=new HashMap<>(); - getTopicsToSubscribe().forEach( topic -> - poll_lookup.put(topic,0) - ); - while (true) { - final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - if (consumerRecords.count() == 0) { consumer.assignment().stream().map( record -> record.topic()).distinct().forEach( - topic -> { - poll_lookup.put(topic,poll_lookup.get(topic)+1); - } - ); - boolean is_complete=poll_lookup.entrySet().stream().allMatch( - e ->e.getValue() > retry - ); - if (is_complete) { - LOGGER.info("There is no new data in the queue!!"); - break; - } - }else if(record_count.get() > max_records){ - LOGGER.info("Max record count is reached !!"); - break; - } - - consumerRecords.forEach(record -> { - recordsList.add(record); - }); - consumer.commitAsync(); - } + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); consumer.close(); - final Iterator> iterator = recordsList.iterator(); - return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { - @Override - protected AirbyteMessage computeNext() { - if (iterator.hasNext()) { - final ConsumerRecord record = iterator.next(); - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(record.topic()) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(record.value())); - } - - return endOfData(); - } - - }); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; } + } - @Override - public boolean isAccessible() { - try { - final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; - if (!testTopic.isBlank()) { - final KafkaConsumer consumer = getConsumer(); - consumer.subscribe(Pattern.compile(testTopic)); - consumer.listTopics(); - consumer.close(); - LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); - } - return true; - } catch (final Exception e) { - LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); - return false; - } - } } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java index 6f6c587614d4..e651d9814237 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java @@ -1,15 +1,20 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka.format; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; - import java.util.List; public interface KafkaFormat { - boolean isAccessible(); - List getStreams(); + boolean isAccessible(); + + List getStreams(); + + AutoCloseableIterator read(); - AutoCloseableIterator read(); } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json index c74230165960..4759f724440c 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json @@ -36,7 +36,11 @@ }, "deserialization_strategy": { "type": "string", - "enum": ["TopicNameStrategy","RecordNameStrategy","TopicRecordNameStrategy"], + "enum": [ + "TopicNameStrategy", + "RecordNameStrategy", + "TopicRecordNameStrategy" + ], "default": "TopicNameStrategy" }, "schema_registry_url": { @@ -272,7 +276,7 @@ }, "max_records_process": { "title": "Maximum Records", - "description": "The Maximum records needs to be processed per execution", + "description": "The Maximum to be processed per execution", "type": "integer", "default": 100000 } diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java index 5840bed684fc..454cf6334450 100644 --- a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java @@ -1,25 +1,26 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.kafka; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.source.kafka.format.AvroFormat; import io.airbyte.integrations.source.kafka.format.KafkaFormat; -import org.junit.jupiter.api.Test; - import java.io.IOException; - -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - +import org.junit.jupiter.api.Test; public class KafkaSourceTest { - - @Test - public void testAvroformat() throws IOException { - final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json")); - final KafkaFormat kafkaFormat =KafkaFormatFactory.getFormat(configJson); - assertInstanceOf(AvroFormat.class,kafkaFormat); - } + @Test + public void testAvroformat() throws IOException { + final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json")); + final KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(configJson); + assertInstanceOf(AvroFormat.class, kafkaFormat); + } } diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json index b365d73b5132..d53590514996 100644 --- a/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json @@ -1 +1,32 @@ -{"group_id":"login","protocol":{"sasl_mechanism":"PLAIN","sasl_jaas_config":"org.apache.kafka.common.security.plain.PlainLoginModule ;","security_protocol":"SASL_SSL"},"client_id":"airbyte-login-consumer","test_topic":"","polling_time":100,"subscription":{"topic_pattern":"dev-accounts-lms-transaction-created","subscription_type":"subscribe"},"MessageFormat":{"schema_registry_url":"http://localhost","deserialization_type":"AVRO","deserialization_strategy":"TopicRecordNameStrategy","schema_registry_password":"password","schema_registry_username":"username"},"repeated_calls":3,"max_poll_records":500,"retry_backoff_ms":100,"auto_offset_reset":"earliest","bootstrap_servers":"localhost:9092","client_dns_lookup":"use_all_dns_ips","enable_auto_commit":true,"request_timeout_ms":30000,"receive_buffer_bytes":32768,"auto_commit_interval_ms":5000} \ No newline at end of file +{ + "group_id": "login", + "protocol": { + "sasl_mechanism": "PLAIN", + "sasl_jaas_config": "org.apache.kafka.common.security.plain.PlainLoginModule ;", + "security_protocol": "SASL_SSL" + }, + "client_id": "airbyte-login-consumer", + "test_topic": "", + "polling_time": 100, + "subscription": { + "topic_pattern": "dev-accounts-lms-transaction-created", + "subscription_type": "subscribe" + }, + "MessageFormat": { + "schema_registry_url": "http://localhost", + "deserialization_type": "AVRO", + "deserialization_strategy": "TopicRecordNameStrategy", + "schema_registry_password": "password", + "schema_registry_username": "username" + }, + "repeated_calls": 3, + "max_poll_records": 500, + "retry_backoff_ms": 100, + "auto_offset_reset": "earliest", + "bootstrap_servers": "localhost:9092", + "client_dns_lookup": "use_all_dns_ips", + "enable_auto_commit": true, + "request_timeout_ms": 30000, + "receive_buffer_bytes": 32768, + "auto_commit_interval_ms": 5000 +} From 3edca7f5d4d403efe151af0a8ed8da85b4b31d79 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Fri, 9 Sep 2022 10:41:59 +1000 Subject: [PATCH 08/10] Schema registry details are added --- docs/integrations/sources/kafka.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/kafka.md b/docs/integrations/sources/kafka.md index 5a3dac09d9ba..7264cad0a1f0 100644 --- a/docs/integrations/sources/kafka.md +++ b/docs/integrations/sources/kafka.md @@ -22,6 +22,7 @@ You'll need the following information to configure the Kafka source: * **Subscription Method** - You can choose to manually assign a list of partitions, or subscribe to all topics matching specified pattern to get dynamically assigned partitions. * **List of topic** * **Bootstrap Servers** - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. +* **Schema Registry** - Host/port to connect schema registry server. Note: It supports for AVRO format only. ### For Airbyte Cloud: @@ -41,8 +42,10 @@ The Kafka source connector supports the following[sync modes](https://docs.airby | Namespaces | No | | ## Supported Format - Json - Json value messages - Avro - deserialize Using confluent API. Please refer (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html) + JSON - Json value messages. It does not support schema registry now. + + AVRO - deserialize Using confluent API. Please refer (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html) + ## Changelog From b0aad57cfd0948575712e8a98a937af256caf121 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Fri, 9 Sep 2022 10:43:11 +1000 Subject: [PATCH 09/10] note update --- docs/integrations/sources/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/kafka.md b/docs/integrations/sources/kafka.md index 7264cad0a1f0..7dff6e3cced5 100644 --- a/docs/integrations/sources/kafka.md +++ b/docs/integrations/sources/kafka.md @@ -51,7 +51,7 @@ The Kafka source connector supports the following[sync modes](https://docs.airby | Version | Date | Pull Request | Subject | | :------ | :-------- | :------------------------------------------------------| :---------------------------------------- | -| 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added Avro format support | +| 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added AVRO format support and Support for maximum records to process| | 0.1.7 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | | 0.1.6 | 2022-05-29 | [12903](https://github.com/airbytehq/airbyte/pull/12903) | Add Polling Time to Specification (default 100 ms) | | 0.1.5 | 2022-04-19 | [12134](https://github.com/airbytehq/airbyte/pull/12134) | Add PLAIN Auth | From f92f8e332120d27f5d5ba26776158abc7f2a5443 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 13 Sep 2022 15:18:48 +0000 Subject: [PATCH 10/10] auto-bump connector version [ci skip] --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 43 ++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 4b84ec3aa3e4..a4d29381d343 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -505,7 +505,7 @@ - name: Kafka sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265 dockerRepository: airbyte/source-kafka - dockerImageTag: 0.1.7 + dockerImageTag: 0.2.0 documentationUrl: https://docs.airbyte.io/integrations/sources/kafka icon: kafka.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ffe50e965ec6..86f816bf49ee 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4759,7 +4759,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-kafka:0.1.7" +- dockerImage: "airbyte/source-kafka:0.2.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka" connectionSpecification: @@ -4772,6 +4772,42 @@ - "protocol" additionalProperties: false properties: + MessageFormat: + title: "MessageFormat" + type: "object" + description: "The serialization used based on this " + oneOf: + - title: "JSON" + properties: + deserialization_type: + type: "string" + enum: + - "JSON" + default: "JSON" + - title: "AVRO" + properties: + deserialization_type: + type: "string" + enum: + - "AVRO" + default: "AVRO" + deserialization_strategy: + type: "string" + enum: + - "TopicNameStrategy" + - "RecordNameStrategy" + - "TopicRecordNameStrategy" + default: "TopicNameStrategy" + schema_registry_url: + type: "string" + examples: + - "http://localhost:8081" + schema_registry_username: + type: "string" + default: "" + schema_registry_password: + type: "string" + default: "" bootstrap_servers: title: "Bootstrap Servers" description: "A list of host/port pairs to use for establishing the initial\ @@ -5004,6 +5040,11 @@ \ received." type: "integer" default: 3 + max_records_process: + title: "Maximum Records" + description: "The Maximum to be processed per execution" + type: "integer" + default: 100000 supportsIncremental: true supportsNormalization: false supportsDBT: false