Skip to content

Commit

Permalink
feature: gzip data that is sent to influxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
docmerlin committed Dec 18, 2020
1 parent f1ff9ab commit 336e51b
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 15 deletions.
61 changes: 46 additions & 15 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package influxdb

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -78,6 +81,9 @@ type Config struct {
// Transport is the HTTP transport to use for requests
// If nil, a default transport will be used.
Transport *http.Transport

// Which compression should we use for writing to influxdb, defaults to "gzip".
Compression string
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -106,11 +112,12 @@ type Credentials struct {

// HTTPClient is safe for concurrent use.
type HTTPClient struct {
mu sync.RWMutex
config Config
urls []url.URL
client *http.Client
index int32
mu sync.RWMutex
config Config
urls []url.URL
client *http.Client
index int32
compression string
}

// NewHTTPClient returns a new Client from the provided config.
Expand All @@ -134,6 +141,14 @@ func NewHTTPClient(conf Config) (*HTTPClient, error) {
Transport: conf.Transport,
},
}
switch compression := strings.ToLower(strings.TrimSpace(conf.Compression)); compression {
case "none":
return c, nil
case "gzip", "": // treat gzip as default
c.compression = "gzip"
default:
return nil, fmt.Errorf("%s is not a supported compression type", compression)
}
return c, nil
}

Expand Down Expand Up @@ -310,8 +325,29 @@ func (c *HTTPClient) Ping(ctx context.Context) (time.Duration, string, error) {
}

func (c *HTTPClient) Write(bp BatchPoints) error {
var b bytes.Buffer
b := bytes.Buffer{}
precision := bp.Precision()

u := c.url()
u.Path = "write"
v := url.Values{}
v.Set("db", bp.Database())
v.Set("rp", bp.RetentionPolicy())
v.Set("precision", bp.Precision())
v.Set("consistency", bp.WriteConsistency())
u.RawQuery = v.Encode()

reqBody := io.Reader(&b)

if c.compression == "gzip" {
var err error
reqBody, err = CompressWithGzip(reqBody, gzip.DefaultCompression)
if err != nil {
return err
}

}

for _, p := range bp.Points() {
if _, err := b.Write(p.Bytes(precision)); err != nil {
return err
Expand All @@ -322,19 +358,14 @@ func (c *HTTPClient) Write(bp BatchPoints) error {
}
}

u := c.url()
u.Path = "write"
v := url.Values{}
v.Set("db", bp.Database())
v.Set("rp", bp.RetentionPolicy())
v.Set("precision", bp.Precision())
v.Set("consistency", bp.WriteConsistency())
u.RawQuery = v.Encode()
req, err := http.NewRequest("POST", u.String(), &b)
req, err := http.NewRequest("POST", u.String(), reqBody)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
if c.compression == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

_, err = c.do(req, nil, http.StatusNoContent, http.StatusOK)
return err
Expand Down
63 changes: 63 additions & 0 deletions influxdb/client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package influxdb

import (
"compress/gzip"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)

func TestClient_Query(t *testing.T) {
Expand Down Expand Up @@ -154,6 +157,21 @@ func TestClient_Concurrent_Use(t *testing.T) {
func TestClient_Write(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
uncompressedBody, err := gzip.NewReader(r.Body)
if err != nil {
t.Error(err)
}
bod, err := ioutil.ReadAll(uncompressedBody)
if err != nil {
t.Error(err)
}
if r.Header.Get("Content-Encoding") != "gzip" {
t.Errorf("expected gzip Content-Encoding but got %s", r.Header.Get("Content-Encoding"))
}
expected := "testpt,tag1=tag1 value=1i 942105600000000003\n"
if string(bod) != expected {
t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod))
}
w.WriteHeader(http.StatusNoContent)
_ = json.NewEncoder(w).Encode(data)
}))
Expand All @@ -162,10 +180,55 @@ func TestClient_Write(t *testing.T) {
config := Config{URLs: []string{ts.URL}}
c, _ := NewHTTPClient(config)

bp, err := NewBatchPoints(BatchPointsConfig{})
bp.AddPoint(Point{
Name: "testpt",
Tags: map[string]string{"tag1": "tag1"},
Fields: map[string]interface{}{"value": 1},
Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC),
})
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
err = c.Write(bp)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_Write_noCompression(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
bod, err := ioutil.ReadAll(r.Body)
expected := "testpt,tag1=tag1 value=1i 942105600000000003\n"
if string(bod) != expected {
t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod))
}
if err != nil {
t.Error(err)
}
w.WriteHeader(http.StatusNoContent)
_ = json.NewEncoder(w).Encode(data)
}))
defer ts.Close()

config := Config{
URLs: []string{ts.URL},
Compression: "none",
}
c, _ := NewHTTPClient(config)

bp, err := NewBatchPoints(BatchPointsConfig{})
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}

bp.AddPoint(Point{
Name: "testpt",
Tags: map[string]string{"tag1": "tag1"},
Fields: map[string]interface{}{"value": 1},
Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC),
})
err = c.Write(bp)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
Expand Down
29 changes: 29 additions & 0 deletions influxdb/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package influxdb

import (
"compress/gzip"
"io"
)

// CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing
// the gzipped data.
// An error is returned if passing data to the gzip.Writer fails
// this is shamelessly stolen from https://github.com/influxdata/telegraf
func CompressWithGzip(data io.Reader, level int) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter, err := gzip.NewWriterLevel(pipeWriter, level)
if err != nil {
return nil, err
}

go func() {
_, err := io.Copy(gzipWriter, data)
gzipWriter.Close()
// subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil.
pipeWriter.CloseWithError(err)
}()

return pipeReader, err
}
11 changes: 11 additions & 0 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
StartUpTimeout toml.Duration `toml:"startup-timeout" override:"startup-timeout"`
SubscriptionSyncInterval toml.Duration `toml:"subscriptions-sync-interval" override:"subscriptions-sync-interval"`
SubscriptionPath string `toml:"subscription-path" override:"subscription-path"`
Compression string `toml:"compression" override:"compression"`
}

func NewConfig() Config {
Expand All @@ -80,6 +81,7 @@ func (c *Config) Init() {
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
c.SubscriptionMode = ClusterMode
c.SubscriptionPath = ""
c.Compression = "gzip"
}

func (c *Config) ApplyConditionalDefaults() {
Expand All @@ -95,6 +97,9 @@ func (c *Config) ApplyConditionalDefaults() {
if c.SubscriptionSyncInterval == toml.Duration(0) {
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
}
if c.Compression == "" {
c.Compression = "gzip"
}
}

var validNamePattern = regexp.MustCompile(`^[-\._\p{L}0-9]+$`)
Expand Down Expand Up @@ -124,6 +129,12 @@ func (c Config) Validate() error {
default:
return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %q: %v", c.SubscriptionProtocol, c)
}
switch c.Compression {
case "gzip", "none":
default:
return fmt.Errorf("Invalid compression, must be one of 'gzip' or 'none', got %s", c.Compression)
}

return nil
}

Expand Down

0 comments on commit 336e51b

Please sign in to comment.