Skip to content

Commit

Permalink
Source:Kafka - Avro format message support (#15827)
Browse files Browse the repository at this point in the history
* new format is added

* Avro support

Avro support

* new format is added

* schema namespace updated

* multi topic schema name is added

* max_records_process param is added

* review 1

* Schema registry details are added

* note update

* auto-bump connector version [ci skip]

Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 13, 2022
1 parent 3dfe362 commit ce64e96
Show file tree
Hide file tree
Showing 16 changed files with 690 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@
- name: Kafka
sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
dockerRepository: airbyte/source-kafka
dockerImageTag: 0.1.7
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.io/integrations/sources/kafka
icon: kafka.svg
sourceType: database
Expand Down
43 changes: 42 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4759,7 +4759,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-kafka:0.1.7"
- dockerImage: "airbyte/source-kafka:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka"
connectionSpecification:
Expand All @@ -4772,6 +4772,42 @@
- "protocol"
additionalProperties: false
properties:
MessageFormat:
title: "MessageFormat"
type: "object"
description: "The serialization used based on this "
oneOf:
- title: "JSON"
properties:
deserialization_type:
type: "string"
enum:
- "JSON"
default: "JSON"
- title: "AVRO"
properties:
deserialization_type:
type: "string"
enum:
- "AVRO"
default: "AVRO"
deserialization_strategy:
type: "string"
enum:
- "TopicNameStrategy"
- "RecordNameStrategy"
- "TopicRecordNameStrategy"
default: "TopicNameStrategy"
schema_registry_url:
type: "string"
examples:
- "http://localhost:8081"
schema_registry_username:
type: "string"
default: ""
schema_registry_password:
type: "string"
default: ""
bootstrap_servers:
title: "Bootstrap Servers"
description: "A list of host/port pairs to use for establishing the initial\
Expand Down Expand Up @@ -5004,6 +5040,11 @@
\ received."
type: "integer"
default: 3
max_records_process:
title: "Maximum Records"
description: "The Maximum to be processed per execution"
type: "integer"
default: 100000
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-kafka
15 changes: 13 additions & 2 deletions airbyte-integrations/connectors/source-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@ application {
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

repositories {
mavenLocal()
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}

}

dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
implementation libs.connectors.testcontainers.kafka

implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'org.apache.kafka:connect-json:2.8.0'
implementation 'org.apache.kafka:kafka-clients:3.2.1'
implementation 'org.apache.kafka:connect-json:3.2.1'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-kafka')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.source.kafka.format.AvroFormat;
import io.airbyte.integrations.source.kafka.format.JsonFormat;
import io.airbyte.integrations.source.kafka.format.KafkaFormat;

public class KafkaFormatFactory {

public static KafkaFormat getFormat(final JsonNode config) {

MessageFormat messageFormat =
config.has("MessageFormat") ? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase())
: MessageFormat.JSON;

switch (messageFormat) {
case JSON -> {
return new JsonFormat(config);
}
case AVRO -> {
return new AvroFormat(config);
}
}
return new JsonFormat(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,14 @@
package io.airbyte.integrations.source.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.integrations.source.kafka.format.KafkaFormat;
import io.airbyte.protocol.models.*;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,33 +24,19 @@ public KafkaSource() {}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : "";
if (!testTopic.isBlank()) {
final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getCheckConsumer();
consumer.subscribe(Pattern.compile(testTopic));
consumer.listTopics();
consumer.close();
LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText());
}
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
if (kafkaFormat.isAccessible()) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage());
}
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n");
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {

final Set<String> topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe();
final List<AirbyteStream> streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers
.createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.collect(Collectors.toList());
public AirbyteCatalog discover(final JsonNode config) {
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
final List<AirbyteStream> streams = kafkaFormat.getStreams();
return new AirbyteCatalog().withStreams(streams);
}

Expand All @@ -83,51 +47,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
}

final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getConsumer();
final List<ConsumerRecord<String, JsonNode>> recordsList = new ArrayList<>();

final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
int pollCount = 0;
final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100;
while (true) {
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
if (consumerRecords.count() == 0) {
pollCount++;
if (pollCount > retry) {
break;
}
}

consumerRecords.forEach(record -> {
LOGGER.info("Consumer Record: key - {}, value - {}, partition - {}, offset - {}",
record.key(), record.value(), record.partition(), record.offset());
recordsList.add(record);
});
consumer.commitAsync();
}
consumer.close();
final Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();

return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {

@Override
protected AirbyteMessage computeNext() {
if (iterator.hasNext()) {
final ConsumerRecord<String, JsonNode> record = iterator.next();
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(record.topic())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(record.value()));
}

return endOfData();
}

});
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
return kafkaFormat.read();
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;

/**
* https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html
*/
public enum KafkaStrategy {

TopicNameStrategy(TopicNameStrategy.class.getName()),
RecordNameStrategy(RecordNameStrategy.class.getName()),
TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName());

String className;

KafkaStrategy(String name) {
this.className = name;
}

public static String getStrategyName(String name) {
for (KafkaStrategy value : KafkaStrategy.values()) {
if (value.name().equalsIgnoreCase(name)) {
return value.className;
}
}
throw new IllegalArgumentException("Unexpected data to strategy setting: " + name);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

/**
* message format in kafka queue
* https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html
*/
public enum MessageFormat {
JSON,
AVRO
}
Loading

0 comments on commit ce64e96

Please sign in to comment.