Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support for Confluent's KafkaAvroSerializer / KafkaAvroDeSerializer #9

Merged
merged 10 commits into from
Jun 29, 2021
Merged

Basic support for Confluent's KafkaAvroSerializer / KafkaAvroDeSerializer #9

merged 10 commits into from
Jun 29, 2021

Conversation

fmck3516
Copy link
Contributor

@fmck3516 fmck3516 commented Jun 21, 2021

Basic support to serialize and deserialize with Confluent's proprietary 5-byte prefix.

The prefix will be added if the key or value serializer is set to io.confluent.kafka.serializers.KafkaAvroSerializer.
The prefix will be removed if the key or value deserializer is set to io.confluent.kafka.serializers.KafkaAvroDeserializer.

Configuration example:

var configuration = JSON.stringify({
    consumer: {
        keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
        valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    },
    producer: {
        keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
        valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
    },
    schemaRegistry: {
        url: "https://subdomain.us-east-2.aws.confluent.cloud",
        basicAuth: {
            credentialsSource: "USER_INFO",
            userInfo: "KEY:SECRET"
        },
    },
})

The configuration for schemaRegistry is required to use a serializer of type io.confluent.kafka.serializers.KafkaAvroSerializer.

A new produce and consume function has been added (each accepts the new configuration argument):

produceWithConfiguration(producer, messages, configuration, keySchema, valueSchema);
consumeWithConfiguration(consumer, 20, configuration, keySchema, valueSchema);

Note: I ported a sub-set of the existing functionality of the Confluent Java client. This code will be obsolete once Confluent adds Schema Registry support for their Go client. At the time of writing, the Go client does not support Schema Registry: https://docs.confluent.io/platform/current/clients/index.html

@fmck3516 fmck3516 marked this pull request as ready for review June 21, 2021 17:25
@fmck3516
Copy link
Contributor Author

I'm not familiar with Go - I still have to figure out how to unit test the stuff.

Copy link
Owner

@mostafa mostafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @fmck3516,

Thanks for the awesome PR. 🙏 I left a few comments and I think it can be merged after the changes.

I tried to create GH Actions for testing the feature, but I couldn't find an action for running Apache Kafka and co. There's an awesome GH Action by @szkiba called xk6bundler that can bundle and distribute k6 with xk6 extensions, but mere bundling without actual testing with Kafka is not what we want though.

consumer.go Outdated Show resolved Hide resolved
go.mod Outdated Show resolved Hide resolved
producer.go Outdated Show resolved Hide resolved
producer.go Outdated Show resolved Hide resolved
schemaRegistry.go Outdated Show resolved Hide resolved
test/README.md Outdated Show resolved Hide resolved
test/test.js Outdated
@@ -0,0 +1,63 @@
/*
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if you could move all the scripts (test_*.js) to the scripts directory and rename this somehow to something like test_avro_with_magic_bytes.js or something shorter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the file to test_avro_with_schema_registry.js and yes, I think it would be good to move the tests out of the top level directory.

Copy link
Owner

@mostafa mostafa Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the file is still called test/test.js. Am I missing something? 🤔

@fmck3516
Copy link
Contributor Author

@mostafa: Thanks for the feedback - I will update the PR in the upcoming week.

@fmck3516
Copy link
Contributor Author

I incorporated your feedback. I also switched to a JSON-based configuration model to be in sync with the approach for the auth configuration.

What do you think about having one big configuration JSON for auth and everything else?

@fmck3516 fmck3516 requested a review from mostafa June 29, 2021 02:50
@mostafa
Copy link
Owner

mostafa commented Jun 29, 2021

@fmck3516 I was actually thinking about the same thing, but then thought that it's out of scope of this PR or mine. So maybe the next PR would focus on refactoring, cleanups and better configuration management. I also have the idea to refactor metric collection and error handling. Sorry for too much information! 🤣

Copy link
Owner

@mostafa mostafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I can rearrange test scripts (files) after merging.

}`


var configuration = JSON.stringify({
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this. 👏

test/test.js Outdated
@@ -0,0 +1,63 @@
/*
Copy link
Owner

@mostafa mostafa Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the file is still called test/test.js. Am I missing something? 🤔

@mostafa mostafa merged commit c2b6227 into mostafa:master Jun 29, 2021
@fmck3516
Copy link
Contributor Author

fmck3516 commented Jun 29, 2021

@fmck3516 I was actually thinking about the same thing, but then thought that it's out of scope of this PR or mine.

That makes sense - one step at a time.

So maybe the next PR would focus on refactoring, cleanups and better configuration management. I also have the idea to refactor metric collection and error handling. Sorry for too much information! 🤣

One thing I would like to add to the wishlist: A suite of regression tests that is executed automatically.

mostafa pushed a commit that referenced this pull request May 31, 2022
…izer (#9)

* Experimental support for Confluent Cloud
* initial work to support Confluents KafkaAvroSerializer/KafkaAvroDeserializer format
* added schemaRegistry.go to encapsulate logic that deals with Confluents proprietary 5-byte wire format prefix
* added caching for previous schema lookups
* reduce diff noise
* PULL-9: review follow-ups
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants