Skip to content

Commit

Permalink
Add support for reading from multiple partitions with a group ID (#12)
Browse files Browse the repository at this point in the history
Update example scripts
Fix linting errors
  • Loading branch information
mostafa committed Jul 4, 2021
1 parent c4fcac2 commit b78344d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 18 deletions.
14 changes: 10 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func init() {
type Kafka struct{}

func (*Kafka) Reader(
brokers []string, topic string, partition int, offset int64, auth string) *kafkago.Reader {
brokers []string, topic string, partition int,
groupID string, offset int64, auth string) *kafkago.Reader {
var dialer *kafkago.Dialer

if auth != "" {
Expand All @@ -36,10 +37,15 @@ func (*Kafka) Reader(
}
}

if groupID != "" {
partition = 0
}

reader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: brokers,
Topic: topic,
Partition: partition,
GroupID: groupID,
MaxWait: time.Millisecond * 200,
RebalanceTimeout: time.Second * 5,
QueueCapacity: 1,
Expand All @@ -56,19 +62,19 @@ func (*Kafka) Reader(
func (*Kafka) Consume(
ctx context.Context, reader *kafkago.Reader, limit int64,
keySchema string, valueSchema string) []map[string]interface{} {
return ConsumeInternal(ctx, reader, limit, Configuration{}, keySchema, valueSchema);
return ConsumeInternal(ctx, reader, limit, Configuration{}, keySchema, valueSchema)
}

func (*Kafka) ConsumeWithConfiguration(
ctx context.Context, reader *kafkago.Reader, limit int64, configurationJson string,
keySchema string, valueSchema string) []map[string]interface{} {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
ReportReaderStats(ctx, reader.Stats())
return nil
}
return ConsumeInternal(ctx, reader, limit, configuration, keySchema, valueSchema);
return ConsumeInternal(ctx, reader, limit, configuration, keySchema, valueSchema)
}

func ConsumeInternal(
Expand Down
8 changes: 4 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ func (*Kafka) Writer(brokers []string, topic string, auth string) *kafkago.Write
func (*Kafka) Produce(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
keySchema string, valueSchema string) error {
return ProduceInternal(ctx, writer, messages, Configuration{}, keySchema, valueSchema);
return ProduceInternal(ctx, writer, messages, Configuration{}, keySchema, valueSchema)
}

func (*Kafka) ProduceWithConfiguration(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
ReportError(err, "Cannot unmarshal configuration " + configurationJson)
ReportError(err, "Cannot unmarshal configuration "+configurationJson)
return nil
}

return ProduceInternal(ctx, writer, messages, configuration, keySchema, valueSchema);
return ProduceInternal(ctx, writer, messages, configuration, keySchema, valueSchema)
}

func ProduceInternal(
Expand All @@ -61,7 +61,7 @@ func ProduceInternal(
state := lib.GetState(ctx)
err := errors.New("state is nil")

err = validateConfiguration(configuration);
err = validateConfiguration(configuration)
if err != nil {
ReportError(err, "Validation of properties failed.")
return err
Expand Down
13 changes: 7 additions & 6 deletions schemaRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/linkedin/goavro/v2"
"io/ioutil"
"net/http"
"strings"

"github.com/linkedin/goavro/v2"
)

func i32tob(val uint32) []byte {
r := make([]byte, 4)
for i := uint32(0); i < 4; i++ {
r[3 - i] = byte((val >> (8 * i)) & 0xff)
r[3-i] = byte((val >> (8 * i)) & 0xff)
}
return r
}
Expand All @@ -37,7 +38,7 @@ func addMagicByteAndSchemaIdPrefix(configuration Configuration, avroData []byte,
return nil, err
}
if schemaId != 0 {
return append(append([]byte{ 0 }, i32tob(schemaId)...), avroData...), nil
return append(append([]byte{0}, i32tob(schemaId)...), avroData...), nil
}
return avroData, nil
}
Expand All @@ -50,7 +51,7 @@ func getSchemaId(configuration Configuration, topic string, keyOrValue string, s
}
if useKafkaAvroSerializer(configuration, keyOrValue) {
url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions"
codec, _ := goavro.NewCodec(schema);
codec, _ := goavro.NewCodec(schema)

body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}"

Expand All @@ -65,7 +66,7 @@ func getSchemaId(configuration Configuration, topic string, keyOrValue string, s
password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1]
req.SetBasicAuth(username, password)
}
resp, err := client.Do(req);
resp, err := client.Do(req)
if err != nil {
return 0, err
}
Expand All @@ -81,7 +82,7 @@ func getSchemaId(configuration Configuration, topic string, keyOrValue string, s
var result map[string]int32
err = json.Unmarshal(bodyBytes, &result)
if err != nil {
return 0, err;
return 0, err
}
schemaId := uint32(result["id"])
schemaIdCache[schema] = schemaId
Expand Down
6 changes: 3 additions & 3 deletions scripts/test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const auth = JSON.stringify({
})

const producer = writer(bootstrapServers, topic, auth);
const consumer = reader(bootstrapServers, topic, null, null, auth);
const consumer = reader(bootstrapServers, topic, null, "", null, auth);

const keySchema = `{
"name": "KeySchema",
Expand Down Expand Up @@ -79,8 +79,8 @@ export default function () {
"ssn": "ssn-" + index,
}),
value: JSON.stringify({
"firstname": "firstname-" + index,
"lastname": "lastname-" + index,
"firstname": "firstname-" + index,
"lastname": "lastname-" + index,
}),
}]
let error = produceWithConfiguration(producer, messages, configuration, keySchema, valueSchema);
Expand Down
4 changes: 3 additions & 1 deletion scripts/test_sasl_auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ const auth = JSON.stringify({
algorithm: "sha256"
})
const offset = 0;
// partition and groupID are mutually exclusive
const partition = 1;
const groupID = "";

const producer = writer(bootstrapServers, kafkaTopic, auth);
const consumer = reader(bootstrapServers, kafkaTopic, offset, partition, auth);
const consumer = reader(bootstrapServers, kafkaTopic, partition, groupID, offset, auth);

export default function () {
for (let index = 0; index < 100; index++) {
Expand Down

0 comments on commit b78344d

Please sign in to comment.