Skip to content

Commit

Permalink
Added external json encoding library to improve encoding speed and al…
Browse files Browse the repository at this point in the history
…location rate when transforming envelopes to json documents
  • Loading branch information
noctarius committed Jul 26, 2023
1 parent bb81f97 commit ef14086
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 133 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/docker/docker v24.0.4+incompatible
github.com/go-errors/errors v1.4.2
github.com/go-redis/redis v6.15.9+incompatible
github.com/goccy/go-json v0.10.2
github.com/gookit/color v1.5.3
github.com/gookit/slog v0.5.2
github.com/hashicorp/go-uuid v1.0.3
Expand All @@ -27,6 +28,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/localstack v0.21.0
github.com/testcontainers/testcontainers-go/modules/redis v0.21.0
github.com/urfave/cli v1.22.14
golang.org/x/net v0.12.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -87,7 +89,6 @@ require (
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGK
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
30 changes: 16 additions & 14 deletions internal/eventing/sinks/awskinesis/awskinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,47 @@
package awskinesis

import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/go-errors/errors"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
config "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/encoding"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"log"
"time"
)

func init() {
sink.RegisterSink(spiconfig.AwsKinesis, newAwsKinesisSink)
sink.RegisterSink(config.AwsKinesis, newAwsKinesisSink)
}

type awsKinesisSink struct {
streamName *string
awsKinesis *kinesis.Kinesis
encoder *encoding.JsonEncoder
}

func newAwsKinesisSink(
config *spiconfig.Config,
c *config.Config,
) (sink.Sink, error) {

streamName := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisStreamName, nil)
streamName := config.GetOrDefault[*string](c, config.PropertyKinesisStreamName, nil)
if streamName == nil {
return nil, errors.Errorf("AWS Kinesis sink needs the stream name to be configured")
}

shardCount := spiconfig.GetOrDefault[*int64](config, spiconfig.PropertyKinesisStreamShardCount, nil)
streamMode := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisStreamMode, nil)
streamCreate := spiconfig.GetOrDefault(config, spiconfig.PropertyKinesisStreamCreate, true)
shardCount := config.GetOrDefault[*int64](c, config.PropertyKinesisStreamShardCount, nil)
streamMode := config.GetOrDefault[*string](c, config.PropertyKinesisStreamMode, nil)
streamCreate := config.GetOrDefault(c, config.PropertyKinesisStreamCreate, true)

awsRegion := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisRegion, nil)
endpoint := spiconfig.GetOrDefault(config, spiconfig.PropertyKinesisAwsEndpoint, "")
accessKeyId := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisAwsAccessKeyId, nil)
secretAccessKey := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisAwsSecretAccessKey, nil)
sessionToken := spiconfig.GetOrDefault[*string](config, spiconfig.PropertyKinesisAwsSessionToken, nil)
awsRegion := config.GetOrDefault[*string](c, config.PropertyKinesisRegion, nil)
endpoint := config.GetOrDefault(c, config.PropertyKinesisAwsEndpoint, "")
accessKeyId := config.GetOrDefault[*string](c, config.PropertyKinesisAwsAccessKeyId, nil)
secretAccessKey := config.GetOrDefault[*string](c, config.PropertyKinesisAwsSecretAccessKey, nil)
sessionToken := config.GetOrDefault[*string](c, config.PropertyKinesisAwsSessionToken, nil)

awsConfig := aws.NewConfig().WithEndpoint(endpoint)
if accessKeyId != nil && secretAccessKey != nil && sessionToken != nil {
Expand Down Expand Up @@ -115,6 +116,7 @@ func newAwsKinesisSink(
return &awsKinesisSink{
streamName: streamName,
awsKinesis: awsKinesis,
encoder: encoding.NewJsonEncoderWithConfig(c),
}, nil
}

Expand All @@ -130,7 +132,7 @@ func (a *awsKinesisSink) Emit(
_ sink.Context, _ time.Time, topicName string, _, envelope schema.Struct,
) error {

envelopeData, err := json.Marshal(envelope)
envelopeData, err := a.encoder.Marshal(envelope)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions internal/eventing/sinks/awssqs/awssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,43 @@ package awssqs

import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/go-errors/errors"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
config "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/encoding"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)

func init() {
sink.RegisterSink(spiconfig.AwsSQS, newAwsSqsSink)
sink.RegisterSink(config.AwsSQS, newAwsSqsSink)
}

type awsSqsSink struct {
queueUrl *string
awsSqs *sqs.SQS
encoder *encoding.JsonEncoder
}

func newAwsSqsSink(
config *spiconfig.Config,
c *config.Config,
) (sink.Sink, error) {

queueUrl := spiconfig.GetOrDefault[*string](config, spiconfig.PropertySqsQueueUrl, nil)
queueUrl := config.GetOrDefault[*string](c, config.PropertySqsQueueUrl, nil)
if queueUrl == nil {
return nil, errors.Errorf("AWS SQS sink needs the queue url to be configured")
}

awsRegion := spiconfig.GetOrDefault[*string](config, spiconfig.PropertySqsAwsRegion, nil)
endpoint := spiconfig.GetOrDefault(config, spiconfig.PropertySqsAwsEndpoint, "")
accessKeyId := spiconfig.GetOrDefault[*string](config, spiconfig.PropertySqsAwsAccessKeyId, nil)
secretAccessKey := spiconfig.GetOrDefault[*string](config, spiconfig.PropertySqsAwsSecretAccessKey, nil)
sessionToken := spiconfig.GetOrDefault[*string](config, spiconfig.PropertySqsAwsSessionToken, nil)
awsRegion := config.GetOrDefault[*string](c, config.PropertySqsAwsRegion, nil)
endpoint := config.GetOrDefault(c, config.PropertySqsAwsEndpoint, "")
accessKeyId := config.GetOrDefault[*string](c, config.PropertySqsAwsAccessKeyId, nil)
secretAccessKey := config.GetOrDefault[*string](c, config.PropertySqsAwsSecretAccessKey, nil)
sessionToken := config.GetOrDefault[*string](c, config.PropertySqsAwsSessionToken, nil)

awsConfig := aws.NewConfig().WithEndpoint(endpoint)
if accessKeyId != nil && secretAccessKey != nil && sessionToken != nil {
Expand All @@ -75,6 +76,7 @@ func newAwsSqsSink(
return &awsSqsSink{
queueUrl: queueUrl,
awsSqs: sqs.New(awsSession),
encoder: encoding.NewJsonEncoderWithConfig(c),
}, nil
}

Expand All @@ -90,7 +92,7 @@ func (a *awsSqsSink) Emit(
_ sink.Context, _ time.Time, topicName string, _, envelope schema.Struct,
) error {

envelopeData, err := json.Marshal(envelope)
envelopeData, err := a.encoder.Marshal(envelope)
if err != nil {
return err
}
Expand Down
62 changes: 32 additions & 30 deletions internal/eventing/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,71 @@ package kafka

import (
"crypto/tls"
"encoding/json"
"github.com/Shopify/sarama"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
config "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/encoding"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)

func init() {
sink.RegisterSink(spiconfig.Kafka, newKafkaSink)
sink.RegisterSink(config.Kafka, newKafkaSink)
}

type kafkaSink struct {
producer sarama.SyncProducer
encoder *encoding.JsonEncoder
}

func newKafkaSink(
config *spiconfig.Config,
c *config.Config,
) (sink.Sink, error) {

c := sarama.NewConfig()
c.ClientID = "event-stream-prototype"
c.Producer.Idempotent = spiconfig.GetOrDefault(
config, "sink.kafka.idempotent", false,
kafkaConfig := sarama.NewConfig()
kafkaConfig.ClientID = "event-stream-prototype"
kafkaConfig.Producer.Idempotent = config.GetOrDefault(
c, "sink.kafka.idempotent", false,
)
c.Producer.Return.Successes = true
c.Producer.RequiredAcks = sarama.WaitForLocal
c.Producer.Retry.Max = 10

if spiconfig.GetOrDefault(config, spiconfig.PropertyKafkaSaslEnabled, false) {
c.Net.SASL.Enable = true
c.Net.SASL.User = spiconfig.GetOrDefault(
config, spiconfig.PropertyKafkaSaslUser, "",
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.RequiredAcks = sarama.WaitForLocal
kafkaConfig.Producer.Retry.Max = 10

if config.GetOrDefault(c, config.PropertyKafkaSaslEnabled, false) {
kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.User = config.GetOrDefault(
c, config.PropertyKafkaSaslUser, "",
)
c.Net.SASL.Password = spiconfig.GetOrDefault(
config, spiconfig.PropertyKafkaSaslPassword, "",
kafkaConfig.Net.SASL.Password = config.GetOrDefault(
c, config.PropertyKafkaSaslPassword, "",
)
c.Net.SASL.Mechanism = spiconfig.GetOrDefault[sarama.SASLMechanism](
config, spiconfig.PropertyKafkaSaslMechanism, sarama.SASLTypePlaintext,
kafkaConfig.Net.SASL.Mechanism = config.GetOrDefault[sarama.SASLMechanism](
c, config.PropertyKafkaSaslMechanism, sarama.SASLTypePlaintext,
)
}

if spiconfig.GetOrDefault(config, spiconfig.PropertyKafkaTlsEnabled, false) {
c.Net.TLS.Enable = true
c.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: spiconfig.GetOrDefault(
config, spiconfig.PropertyKafkaTlsSkipVerify, false,
if config.GetOrDefault(c, config.PropertyKafkaTlsEnabled, false) {
kafkaConfig.Net.TLS.Enable = true
kafkaConfig.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: config.GetOrDefault(
c, config.PropertyKafkaTlsSkipVerify, false,
),
ClientAuth: spiconfig.GetOrDefault(
config, spiconfig.PropertyKafkaTlsClientAuth, tls.NoClientCert,
ClientAuth: config.GetOrDefault(
c, config.PropertyKafkaTlsClientAuth, tls.NoClientCert,
),
}
}

producer, err := sarama.NewSyncProducer(
spiconfig.GetOrDefault(config, spiconfig.PropertyKafkaBrokers, []string{"localhost:9092"}), c,
config.GetOrDefault(c, config.PropertyKafkaBrokers, []string{"localhost:9092"}), kafkaConfig,
)
if err != nil {
return nil, err
}

return &kafkaSink{
producer: producer,
encoder: encoding.NewJsonEncoderWithConfig(c),
}, nil
}

Expand All @@ -97,11 +99,11 @@ func (k *kafkaSink) Emit(
_ sink.Context, timestamp time.Time, topicName string, key, envelope schema.Struct,
) error {

keyData, err := json.Marshal(key)
keyData, err := k.encoder.Marshal(key)
if err != nil {
return err
}
envelopeData, err := json.Marshal(envelope)
envelopeData, err := k.encoder.Marshal(envelope)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ef14086

Please sign in to comment.