Skip to content

Commit

Permalink
Basic support for Confluent's KafkaAvroSerializer / KafkaAvroDeSerial…
Browse files Browse the repository at this point in the history
…izer (#9)

* Experimental support for Confluent Cloud
* initial work to support Confluents KafkaAvroSerializer/KafkaAvroDeserializer format
* added schemaRegistry.go to encapsulate logic that deals with Confluents proprietary 5-byte wire format prefix
* added caching for previous schema lookups
* reduce diff noise
* PULL-9: review follow-ups
  • Loading branch information
fmck3516 authored Jun 29, 2021
1 parent d8c8d2d commit c2b6227
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# Dependency directories (remove the comment below to include it)
vendor/
k6
.idea
81 changes: 81 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kafka

import (
"encoding/json"
"errors"
)

type ConsumerConfiguration struct {
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
}

type ProducerConfiguration struct {
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
}

type BasicAuth struct {
CredentialsSource string `json:"credentialsSource"`
UserInfo string `json:"userInfo"`
}

type SchemaRegistryConfiguration struct {
Url string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
}

type Configuration struct {
Consumer ConsumerConfiguration `json:"consumer"`
Producer ProducerConfiguration `json:"producer"`
SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

func unmarshalConfiguration(jsonConfiguration string) (Configuration, error) {
var configuration Configuration
err := json.Unmarshal([]byte(jsonConfiguration), &configuration)
return configuration, err
}

func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ConsumerConfiguration{}) == configuration.Consumer {
return false
}
if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" ||
keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" {
return true
}
return false
}

func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool {
if (Configuration{}) == configuration ||
(ProducerConfiguration{}) == configuration.Producer {
return false
}
if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" ||
keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" {
return true
}
return false
}

func useBasicAuthWithCredentialSourceUserInfo(configuration Configuration) bool {
if (Configuration{}) == configuration ||
(SchemaRegistryConfiguration{}) == configuration.SchemaRegistry ||
(BasicAuth{}) == configuration.SchemaRegistry.BasicAuth {
return false
}
return configuration.SchemaRegistry.BasicAuth.CredentialsSource == "USER_INFO"
}

func validateConfiguration(configuration Configuration) error {
if useKafkaAvroSerializer(configuration, "key") || useKafkaAvroSerializer(configuration, "value") {
if (SchemaRegistryConfiguration{}) == configuration.SchemaRegistry {
return errors.New("you must provide a value for the \"SchemaRegistry\" configuration property to use a serializer " +
"of type \"io.confluent.kafka.serializers.KafkaAvroSerializer\"")
}
}
return nil
}
28 changes: 24 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func (*Kafka) Reader(
func (*Kafka) Consume(
ctx context.Context, reader *kafkago.Reader, limit int64,
keySchema string, valueSchema string) []map[string]interface{} {
return ConsumeInternal(ctx, reader, limit, Configuration{}, keySchema, valueSchema);
}

func (*Kafka) ConsumeWithConfiguration(
ctx context.Context, reader *kafkago.Reader, limit int64, configurationJson string,
keySchema string, valueSchema string) []map[string]interface{} {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
ReportReaderStats(ctx, reader.Stats())
return nil
}
return ConsumeInternal(ctx, reader, limit, configuration, keySchema, valueSchema);
}

func ConsumeInternal(
ctx context.Context, reader *kafkago.Reader, limit int64,
configuration Configuration, keySchema string, valueSchema string) []map[string]interface{} {
state := lib.GetState(ctx)

if state == nil {
Expand Down Expand Up @@ -88,16 +106,18 @@ func (*Kafka) Consume(

message := make(map[string]interface{})
if len(msg.Key) > 0 {
message["key"] = string(msg.Key)
keyWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Key, "key")
message["key"] = string(keyWithoutPrefix)
if keySchema != "" {
message["key"] = FromAvro(msg.Key, keySchema)
message["key"] = FromAvro(keyWithoutPrefix, keySchema)
}
}

if len(msg.Value) > 0 {
message["value"] = string(msg.Value)
valueWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Value, "value")
message["value"] = string(valueWithoutPrefix)
if valueSchema != "" {
message["value"] = FromAvro(msg.Value, valueSchema)
message["value"] = FromAvro(valueWithoutPrefix, valueSchema)
}
}

Expand Down
40 changes: 37 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,33 @@ func (*Kafka) Writer(brokers []string, topic string, auth string) *kafkago.Write
func (*Kafka) Produce(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
keySchema string, valueSchema string) error {
return ProduceInternal(ctx, writer, messages, Configuration{}, keySchema, valueSchema);
}

func (*Kafka) ProduceWithConfiguration(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
return nil
}

return ProduceInternal(ctx, writer, messages, configuration, keySchema, valueSchema);
}

func ProduceInternal(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
configuration Configuration, keySchema string, valueSchema string) error {
state := lib.GetState(ctx)
err := errors.New("state is nil")

err = validateConfiguration(configuration);
if err != nil {
ReportError(err, "Validation of properties failed.")
return err
}

if state == nil {
ReportError(err, "Cannot determine state")
return err
Expand All @@ -60,15 +84,25 @@ func (*Kafka) Produce(
value = ToAvro(message["value"], valueSchema)
}

keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}
valueData, err := addMagicByteAndSchemaIdPrefix(configuration, value, writer.Stats().Topic, "value", valueSchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}
kafkaMessages[i] = kafkago.Message{
Key: key,
Value: value,
Key: keyData,
Value: valueData,
}
}

err = writer.WriteMessages(ctx, kafkaMessages...)
if err == ctx.Err() {
// context is cancellled, so stop
// context is cancelled, so stop
ReportWriterStats(ctx, writer.Stats())
return nil
}
Expand Down
91 changes: 91 additions & 0 deletions schemaRegistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kafka

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/linkedin/goavro/v2"
"io/ioutil"
"net/http"
"strings"
)

func i32tob(val uint32) []byte {
r := make([]byte, 4)
for i := uint32(0); i < 4; i++ {
r[3 - i] = byte((val >> (8 * i)) & 0xff)
}
return r
}

// Account for proprietary 5-byte prefix before the Avro payload:
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func removeMagicByteAndSchemaIdPrefix(configuration Configuration, messageData []byte, keyOrValue string) []byte {
if useKafkaAvroDeserializer(configuration, keyOrValue) {
return messageData[5:]
}
return messageData
}

// Add proprietary 5-byte prefix before the Avro payload:
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
func addMagicByteAndSchemaIdPrefix(configuration Configuration, avroData []byte, topic string, keyOrValue string, schema string) ([]byte, error) {
var schemaId, err = getSchemaId(configuration, topic, keyOrValue, schema)
if err != nil {
ReportError(err, "Retrieval of schema id failed.")
return nil, err
}
if schemaId != 0 {
return append(append([]byte{ 0 }, i32tob(schemaId)...), avroData...), nil
}
return avroData, nil
}

var schemaIdCache = make(map[string]uint32)

func getSchemaId(configuration Configuration, topic string, keyOrValue string, schema string) (uint32, error) {
if schemaIdCache[schema] > 0 {
return schemaIdCache[schema], nil
}
if useKafkaAvroSerializer(configuration, keyOrValue) {
url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions"
codec, _ := goavro.NewCodec(schema);

body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}"

client := &http.Client{}
req, err := http.NewRequest("POST", url, bytes.NewReader([]byte(body)))
if err != nil {
return 0, err
}
req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json")
if useBasicAuthWithCredentialSourceUserInfo(configuration) {
username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0]
password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1]
req.SetBasicAuth(username, password)
}
resp, err := client.Do(req);
if err != nil {
return 0, err
}
if resp.StatusCode >= 400 {
return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, body=%v, response=%v", url, body, resp))
}
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

var result map[string]int32
err = json.Unmarshal(bodyBytes, &result)
if err != nil {
return 0, err;
}
schemaId := uint32(result["id"])
schemaIdCache[schema] = schemaId
return schemaId, nil
}
return 0, nil
}
102 changes: 102 additions & 0 deletions test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 100 Avro messages per iteration.
*/

import {
check
} from 'k6';
import {
writer,
reader,
consumeWithConfiguration,
produceWithConfiguration
} from 'k6/x/kafka'; // import kafka extension

const bootstrapServers = ["subdomain.us-east-1.aws.confluent.cloud:9092"];
const topic = "com.example.person";

const auth = JSON.stringify({
username: "username",
password: "password",
algorithm: "plain"
})

const producer = writer(bootstrapServers, topic, auth);
const consumer = reader(bootstrapServers, topic, null, null, auth);

const keySchema = `{
"name": "KeySchema",
"type": "record",
"namespace": "com.example",
"fields": [
{
"name": "ssn",
"type": "string"
}
]
}
`
const valueSchema = `{
"name": "ValueSchema",
"type": "record",
"namespace": "com.example",
"fields": [
{
"name": "firstname",
"type": "string"
},
{
"name": "lastname",
"type": "string"
}
]
}`


var configuration = JSON.stringify({
consumer: {
keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
},
schemaRegistry: {
url: "https://subdomain.us-east-2.aws.confluent.cloud",
basicAuth: {
credentialsSource: "USER_INFO",
userInfo: "KEY:SECRET"
},
},
})

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [{
key: JSON.stringify({
"ssn": "ssn-" + index,
}),
value: JSON.stringify({
"firstname": "firstname-" + index,
"lastname": "lastname-" + index,
}),
}]
let error = produceWithConfiguration(producer, messages, configuration, keySchema, valueSchema);
check(error, {
"is sent": err => err == undefined
});
}

let messages = consumeWithConfiguration(consumer, 20, configuration, keySchema, valueSchema);
check(messages, {
"20 message returned": msgs => msgs.length == 20
})

}

export function teardown(data) {
producer.close();
consumer.close();
}

0 comments on commit c2b6227

Please sign in to comment.