-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New Destination: Kafka #3746
🎉 New Destination: Kafka #3746
Conversation
that's amazing @mmolimar |
Thank you @mmolimar !! |
@mmolimar incredible, thank you for sharing your connector! Will review very soon -- need to bootstrap my Kafka knowledge. Are you blocked on merging this to Airbyte to be able to use it in your instance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mmolimar This looks great. I am just curious about the setup options that end user will have. Chances are that the end user might not know the ideal value for all the options for their use case. For instance what should be the right Batch size
, Buffer memory
, Max request size
and in that case they might go ahead with the default option. Is the default option ideal for all the use cases? Also can we add more information in the docs about these setup options to help people figure out the right values for these based on their use case
airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json
Show resolved
Hide resolved
Thanks @sherifnada |
Thanks for your comments @subodh1810! |
Thanks @mmolimar, do you think this is ready to review, or do you need any assistance? |
I think it's fine. Just to resolve this little conflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mmolimar Thanks again for creating the PR. This is going to be huge. Left a review!
...rc/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json
Show resolved
Hide resolved
...tegration/java/io/airbyte/integrations/destination/kafka/KafkaDestinationAcceptanceTest.java
Outdated
Show resolved
Hide resolved
protected void setup(TestDestinationEnv testEnv) { | ||
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")); | ||
kafka.start(); | ||
try (var ignored = AdminClient.create(Map.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this block? Could you add a source code comment to help clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I was creating the topics via the AdminClient
but I just removed it.
airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json
Show resolved
Hide resolved
|
||
private void sendRecordInTransaction(ProducerRecord<String, JsonNode> record) throws Exception { | ||
try { | ||
producer.beginTransaction(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the value of creating a transaction on a single record? isn't that equivalent to having no transactionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just removed this. Having acks=all
and sync=true
it's enough
|
||
@Override | ||
protected void startTracked() { | ||
Map<AirbyteStreamNameNamespacePair, String> mapped = catalog.getStreams().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we refactor this logic into a method which returns topicMap
and then write some unit tests for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
final JsonNode value = Jsons.jsonNode(ImmutableMap.of( | ||
JavaBaseConstants.COLUMN_NAME_AB_ID, key, | ||
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt(), | ||
JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if a user is writing to a hardcoded stream, they have no way of knowing which stream this came from. Should we include the stream name in the output record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I just added it
...ation-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java
Show resolved
Hide resolved
...ation-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java
Show resolved
Hide resolved
Hey @sherifnada ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mmolimar thanks for the great PR! LGTM - just a few suggestions for docs/wording but I think we can release this pretty soon!
airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json
Show resolved
Hide resolved
...ation-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java
Show resolved
Hide resolved
...ation-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json
Show resolved
Hide resolved
...ation-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java
Show resolved
Hide resolved
…tions.yaml Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
…a/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
…te into feature/kafka-destination
Thank you @mmolimar ! Amazing contribution 🎉 🎉 🎉 🎉 🎉 🎉 |
Destination for Apache Kafka.
Related with #1855
Checklist
airbyte_secret
in the connector's spec./gradlew :airbyte-integrations:connectors:<name>:integrationTest
./test connector=connectors/<name>
command as documented here is passing.README.md
docs/SUMMARY.md
if it's a new connectordocs/integrations/<source or destination>/<name>
.docs/integrations/...
. See changelog exampledocs/integrations/README.md
contains a reference to the new connector/publish
command described here