Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for gzip compression to amqp input and output #5830

Merged
merged 4 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions internal/content_coding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package internal

import (
"bytes"
"compress/gzip"
"errors"
"io"
)

// NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()

case "identity", "":
return NewIdentityEncoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
}

// ContentEncoder applies a wrapper encoding to byte buffers.
type ContentEncoder interface {
Encode([]byte) ([]byte, error)
}

// GzipEncoder compresses the buffer using gzip at the default level.
type GzipEncoder struct {
writer *gzip.Writer
buf *bytes.Buffer
}

func NewGzipEncoder() (*GzipEncoder, error) {
var buf bytes.Buffer
return &GzipEncoder{
writer: gzip.NewWriter(&buf),
buf: &buf,
}, nil
}

func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
e.buf.Reset()
e.writer.Reset(e.buf)

_, err := e.writer.Write(data)
if err != nil {
return nil, err
}
err = e.writer.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}

// IdentityEncoder is a null encoder that applies no transformation.
type IdentityEncoder struct{}

func NewIdentityEncoder() *IdentityEncoder {
return &IdentityEncoder{}
}

func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
return data, nil
}

// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
Decode([]byte) ([]byte, error)
}

// GzipDecoder decompresses buffers with gzip compression.
type GzipDecoder struct {
reader *gzip.Reader
buf *bytes.Buffer
}

func NewGzipDecoder() (*GzipDecoder, error) {
return &GzipDecoder{
reader: new(gzip.Reader),
buf: new(bytes.Buffer),
}, nil
}

func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()

_, err := d.buf.ReadFrom(d.reader)
if err != nil && err != io.EOF {
return nil, err
}
err = d.reader.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}

// IdentityDecoder is a null decoder that returns the input.
type IdentityDecoder struct{}

func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}

func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}
58 changes: 58 additions & 0 deletions internal/content_coding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package internal

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGzipEncodeDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))
}

func TestGzipReuse(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))

payload, err = enc.Encode([]byte("doody"))
require.NoError(t, err)

actual, err = dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "doody", string(actual))
}

func TestIdentityEncodeDecode(t *testing.T) {
enc := NewIdentityEncoder()
dec := NewIdentityDecoder()

payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

actual, err := dec.Decode(payload)
require.NoError(t, err)

require.Equal(t, "howdy", string(actual))
}
4 changes: 4 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ The following defaults are known to work with RabbitMQ:
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
35 changes: 29 additions & 6 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
Expand Down Expand Up @@ -52,12 +53,15 @@ type AMQPConsumer struct {
AuthMethod string
tls.ClientConfig

ContentEncoding string `toml:"content_encoding"`

deliveries map[telegraf.TrackingID]amqp.Delivery

parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
cancel context.CancelFunc
parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
cancel context.CancelFunc
decoder internal.ContentDecoder
}

type externalAuth struct{}
Expand Down Expand Up @@ -147,6 +151,10 @@ func (a *AMQPConsumer) SampleConfig() string {
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
# content_encoding = "identity"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -201,6 +209,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return err
}

a.decoder, err = internal.NewContentDecoder(a.ContentEncoding)
if err != nil {
return err
}

msgs, err := a.connect(amqpConf)
if err != nil {
return err
Expand Down Expand Up @@ -428,8 +441,7 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
}

func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
onError := func() {
// Discard the message from the queue; will never be able to process
// this message.
rejErr := d.Ack(false)
Expand All @@ -438,6 +450,17 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
d.DeliveryTag, rejErr)
a.conn.Close()
}
}

body, err := a.decoder.Decode(d.Body)
if err != nil {
onError()
return err
}

metrics, err := a.parser.Parse(body)
if err != nil {
onError()
return err
}

Expand Down
8 changes: 8 additions & 0 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ For an introduction to AMQP see:
## Recommended to set to true.
# use_batch_format = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down
26 changes: 23 additions & 3 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"

"github.com/streadway/amqp"
)

Expand Down Expand Up @@ -55,13 +54,15 @@ type AMQP struct {
Headers map[string]string `toml:"headers"`
Timeout internal.Duration `toml:"timeout"`
UseBatchFormat bool `toml:"use_batch_format"`
ContentEncoding string `toml:"content_encoding"`
tls.ClientConfig

serializer serializers.Serializer
connect func(*ClientConfig) (Client, error)
client Client
config *ClientConfig
sentMessages int
encoder internal.ContentEncoder
}

type Client interface {
Expand Down Expand Up @@ -150,6 +151,14 @@ var sampleConfig = `
## Recommended to set to true.
# use_batch_format = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
##
## Please note that when use_batch_format = false each amqp message contains only
## a single metric, it is recommended to use compression with batch format
## for best results.
# content_encoding = "identity"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
Expand Down Expand Up @@ -178,11 +187,16 @@ func (q *AMQP) Connect() error {
q.config = config
}

client, err := q.connect(q.config)
var err error
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
if err != nil {
return err
}

q.client, err = q.connect(q.config)
if err != nil {
return err
}
q.client = client

return nil
}
Expand Down Expand Up @@ -228,6 +242,11 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
return err
}

body, err = q.encoder.Encode(body)
if err != nil {
return err
}

err = q.publish(key, body)
if err != nil {
// If this is the first attempt to publish and the connection is
Expand Down Expand Up @@ -299,6 +318,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
exchange: q.Exchange,
exchangeType: q.ExchangeType,
exchangePassive: q.ExchangePassive,
encoding: q.ContentEncoding,
timeout: q.Timeout.Duration,
}

Expand Down
10 changes: 6 additions & 4 deletions plugins/outputs/amqp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ClientConfig struct {
exchangePassive bool
exchangeDurable bool
exchangeArguments amqp.Table
encoding string
headers amqp.Table
deliveryMode uint8
tlsConfig *tls.Config
Expand Down Expand Up @@ -114,10 +115,11 @@ func (c *client) Publish(key string, body []byte) error {
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: c.config.headers,
ContentType: "text/plain",
Body: body,
DeliveryMode: c.config.deliveryMode,
Headers: c.config.headers,
ContentType: "text/plain",
ContentEncoding: c.config.encoding,
Body: body,
DeliveryMode: c.config.deliveryMode,
})
}

Expand Down