Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support for Confluent's KafkaAvroSerializer / KafkaAvroDeSerializer #9

Merged
merged 10 commits into from
Jun 29, 2021
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# Dependency directories (remove the comment below to include it)
vendor/
k6
.idea
81 changes: 81 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kafka

import (
"encoding/json"
"errors"
)

type ConsumerConfiguration struct {
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
}

type ProducerConfiguration struct {
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
}

type BasicAuth struct {
CredentialsSource string `json:"credentialsSource"`
UserInfo string `json:"userInfo"`
}

type SchemaRegistryConfiguration struct {
Url string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
}

type Configuration struct {
Consumer ConsumerConfiguration `json:"consumer"`
Producer ProducerConfiguration `json:"producer"`
SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

func unmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
var configuration Configuration
err := json.Unmarshal([]byte(jsonConfiguration), &configuration)
return configuration, err
}

func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ConsumerConfiguration{}) == configuration.Consumer {
return false
}
if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" ||
keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" {
return true
}
return false
}

func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ProducerConfiguration{}) == configuration.Producer {
return false
}
if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" ||
keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" {
return true
}
return false
}

func useBasicAuthWithCredentialSourceUserInfo(configuration Configuration) bool {
if (Configuration{}) == configuration ||
(SchemaRegistryConfiguration{}) == configuration.SchemaRegistry ||
(BasicAuth{}) == configuration.SchemaRegistry.BasicAuth {
return false
}
return configuration.SchemaRegistry.BasicAuth.CredentialsSource == "USER_INFO"
}

func validateConfiguration(configuration Configuration) error {
if useKafkaAvroSerializer(configuration, "key") || useKafkaAvroSerializer(configuration, "value") {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("you must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of type \"io.confluent.kafka.serializers.KafkaAvroSerializer\"")
}
}
return nil
}
28 changes: 24 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func (*Kafka) Reader(
func (*Kafka) Consume(
ctx context.Context, reader *kafkago.Reader, limit int64,
keySchema string, valueSchema string) []map[string]interface{} {
return ConsumeInternal(ctx, reader, limit, Configuration{}, keySchema, valueSchema);
}

func (*Kafka) ConsumeWithConfiguration(
ctx context.Context, reader *kafkago.Reader, limit int64, configurationJson string,
keySchema string, valueSchema string) []map[string]interface{} {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
ReportReaderStats(ctx, reader.Stats())
return nil
}
return ConsumeInternal(ctx, reader, limit, configuration, keySchema, valueSchema);
}

func ConsumeInternal(
ctx context.Context, reader *kafkago.Reader, limit int64,
configuration Configuration, keySchema string, valueSchema string) []map[string]interface{} {
state := lib.GetState(ctx)

if state == nil {
Expand Down Expand Up @@ -88,16 +106,18 @@ func (*Kafka) Consume(

message := make(map[string]interface{})
if len(msg.Key) > 0 {
message["key"] = string(msg.Key)
keyWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Key, "key")
message["key"] = string(keyWithoutPrefix)
if keySchema != "" {
message["key"] = FromAvro(msg.Key, keySchema)
message["key"] = FromAvro(keyWithoutPrefix, keySchema)
}
}

if len(msg.Value) > 0 {
message["value"] = string(msg.Value)
valueWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Value, "value")
message["value"] = string(valueWithoutPrefix)
if valueSchema != "" {
message["value"] = FromAvro(msg.Value, valueSchema)
message["value"] = FromAvro(valueWithoutPrefix, valueSchema)
}
}

Expand Down
40 changes: 37 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,33 @@ func (*Kafka) Writer(brokers []string, topic string, auth string) *kafkago.Write
func (*Kafka) Produce(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
keySchema string, valueSchema string) error {
return ProduceInternal(ctx, writer, messages, Configuration{}, keySchema, valueSchema);
}

func (*Kafka) ProduceWithConfiguration(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
return nil
}

return ProduceInternal(ctx, writer, messages, configuration, keySchema, valueSchema);
}

func ProduceInternal(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
configuration Configuration, keySchema string, valueSchema string) error {
state := lib.GetState(ctx)
err := errors.New("state is nil")

err = validateConfiguration(configuration);
if err != nil {
ReportError(err, "Validation of properties failed.")
return err
}

if state == nil {
ReportError(err, "Cannot determine state")
return err
Expand All @@ -60,15 +84,25 @@ func (*Kafka) Produce(
value = ToAvro(message["value"], valueSchema)
}

keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}
valueData, err := addMagicByteAndSchemaIdPrefix(configuration, value, writer.Stats().Topic, "value", valueSchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}
kafkaMessages[i] = kafkago.Message{
Key: key,
Value: value,
Key: keyData,
Value: valueData,
}
}

err = writer.WriteMessages(ctx, kafkaMessages...)
if err == ctx.Err() {
// context is cancellled, so stop
// context is cancelled, so stop
ReportWriterStats(ctx, writer.Stats())
return nil
}
Expand Down
91 changes: 91 additions & 0 deletions schemaRegistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kafka

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/linkedin/goavro/v2"
"io/ioutil"
"net/http"
"strings"
)

func i32tob(val uint32) []byte {
r := make([]byte, 4)
for i := uint32(0); i < 4; i++ {
r[3 - i] = byte((val >> (8 * i)) & 0xff)
}
return r
}

// Account for proprietary 5-byte prefix before the Avro payload:
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func removeMagicByteAndSchemaIdPrefix(configuration Configuration, messageData []byte, keyOrValue string) []byte {
if useKafkaAvroDeserializer(configuration, keyOrValue) {
return messageData[5:]
}
return messageData
}

// Add proprietary 5-byte prefix before the Avro payload:
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func addMagicByteAndSchemaIdPrefix(configuration Configuration, avroData []byte, topic string, keyOrValue string, schema string) ([]byte, error) {
var schemaId, err = getSchemaId(configuration, topic, keyOrValue, schema)
if err != nil {
ReportError(err, "Retrieval of schema id failed.")
return nil, err
}
if schemaId != 0 {
return append(append([]byte{ 0 }, i32tob(schemaId)...), avroData...), nil
}
return avroData, nil
}

var schemaIdCache = make(map[string]uint32)

func getSchemaId(configuration Configuration, topic string, keyOrValue string, schema string) (uint32, error) {
if schemaIdCache[schema] > 0 {
return schemaIdCache[schema], nil
}
if useKafkaAvroSerializer(configuration, keyOrValue) {
url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions"
codec, _ := goavro.NewCodec(schema);

body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}"

client := &http.Client{}
req, err := http.NewRequest("POST", url, bytes.NewReader([]byte(body)))
if err != nil {
return 0, err
}
req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json")
if useBasicAuthWithCredentialSourceUserInfo(configuration) {
username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0]
password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1]
req.SetBasicAuth(username, password)
}
resp, err := client.Do(req);
if err != nil {
return 0, err
}
if resp.StatusCode >= 400 {
return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, body=%v, response=%v", url, body, resp))
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

var result map[string]int32
err = json.Unmarshal(bodyBytes, &result)
if err != nil {
return 0, err;
}
schemaId := uint32(result["id"])
schemaIdCache[schema] = schemaId
return schemaId, nil
}
return 0, nil
}
102 changes: 102 additions & 0 deletions test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 100 Avro messages per iteration.
*/

import {
check
} from 'k6';
import {
writer,
reader,
consumeWithConfiguration,
produceWithConfiguration
} from 'k6/x/kafka'; // import kafka extension

const bootstrapServers = ["subdomain.us-east-1.aws.confluent.cloud:9092"];
const topic = "com.example.person";

const auth = JSON.stringify({
username: "username",
password: "password",
algorithm: "plain"
})

const producer = writer(bootstrapServers, topic, auth);
const consumer = reader(bootstrapServers, topic, null, null, auth);

const keySchema = `{
"name": "KeySchema",
"type": "record",
"namespace": "com.example",
"fields": [
{
"name": "ssn",
"type": "string"
}
]
}
`
const valueSchema = `{
"name": "ValueSchema",
"type": "record",
"namespace": "com.example",
"fields": [
{
"name": "firstname",
"type": "string"
},
{
"name": "lastname",
"type": "string"
}
]
}`


var configuration = JSON.stringify({
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this. 👏

consumer: {
keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
},
schemaRegistry: {
url: "https://subdomain.us-east-2.aws.confluent.cloud",
basicAuth: {
credentialsSource: "USER_INFO",
userInfo: "KEY:SECRET"
},
},
})

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [{
key: JSON.stringify({
"ssn": "ssn-" + index,
}),
value: JSON.stringify({
"firstname": "firstname-" + index,
"lastname": "lastname-" + index,
}),
}]
let error = produceWithConfiguration(producer, messages, configuration, keySchema, valueSchema);
check(error, {
"is sent": err => err == undefined
});
}

let messages = consumeWithConfiguration(consumer, 20, configuration, keySchema, valueSchema);
check(messages, {
"20 message returned": msgs => msgs.length == 20
})

}

export function teardown(data) {
producer.close();
consumer.close();
}