diff --git a/influxdb/client.go b/influxdb/client.go index c35a5f968..85013e044 100644 --- a/influxdb/client.go +++ b/influxdb/client.go @@ -2,13 +2,16 @@ package influxdb import ( "bytes" + "compress/gzip" "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -78,6 +81,9 @@ type Config struct { // Transport is the HTTP transport to use for requests // If nil, a default transport will be used. Transport *http.Transport + + // Which compression should we use for writing to influxdb, defaults to "gzip". + Compression string } // AuthenticationMethod defines the type of authentication used. @@ -106,11 +112,12 @@ type Credentials struct { // HTTPClient is safe for concurrent use. type HTTPClient struct { - mu sync.RWMutex - config Config - urls []url.URL - client *http.Client - index int32 + mu sync.RWMutex + config Config + urls []url.URL + client *http.Client + index int32 + compression string } // NewHTTPClient returns a new Client from the provided config. @@ -134,6 +141,14 @@ func NewHTTPClient(conf Config) (*HTTPClient, error) { Transport: conf.Transport, }, } + switch compression := strings.ToLower(strings.TrimSpace(conf.Compression)); compression { + case "none": + return c, nil + case "gzip", "": // treat gzip as default + c.compression = "gzip" + default: + return nil, fmt.Errorf("%s is not a supported compression type", compression) + } return c, nil } @@ -310,8 +325,29 @@ func (c *HTTPClient) Ping(ctx context.Context) (time.Duration, string, error) { } func (c *HTTPClient) Write(bp BatchPoints) error { - var b bytes.Buffer + b := bytes.Buffer{} precision := bp.Precision() + + u := c.url() + u.Path = "write" + v := url.Values{} + v.Set("db", bp.Database()) + v.Set("rp", bp.RetentionPolicy()) + v.Set("precision", bp.Precision()) + v.Set("consistency", bp.WriteConsistency()) + u.RawQuery = v.Encode() + + reqBody := io.Reader(&b) + + if c.compression == "gzip" { + var err error + reqBody, err = CompressWithGzip(reqBody, gzip.DefaultCompression) + if err != nil { + return err + } + + } + for _, p := range bp.Points() { if _, err := b.Write(p.Bytes(precision)); err != nil { return err @@ -322,19 +358,14 @@ func (c *HTTPClient) Write(bp BatchPoints) error { } } - u := c.url() - u.Path = "write" - v := url.Values{} - v.Set("db", bp.Database()) - v.Set("rp", bp.RetentionPolicy()) - v.Set("precision", bp.Precision()) - v.Set("consistency", bp.WriteConsistency()) - u.RawQuery = v.Encode() - req, err := http.NewRequest("POST", u.String(), &b) + req, err := http.NewRequest("POST", u.String(), reqBody) if err != nil { return err } req.Header.Set("Content-Type", "application/octet-stream") + if c.compression == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } _, err = c.do(req, nil, http.StatusNoContent, http.StatusOK) return err diff --git a/influxdb/client_test.go b/influxdb/client_test.go index aac711d82..d4d420a7d 100644 --- a/influxdb/client_test.go +++ b/influxdb/client_test.go @@ -1,12 +1,15 @@ package influxdb import ( + "compress/gzip" "encoding/json" + "io/ioutil" "net/http" "net/http/httptest" "strings" "sync" "testing" + "time" ) func TestClient_Query(t *testing.T) { @@ -154,6 +157,21 @@ func TestClient_Concurrent_Use(t *testing.T) { func TestClient_Write(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response + uncompressedBody, err := gzip.NewReader(r.Body) + if err != nil { + t.Error(err) + } + bod, err := ioutil.ReadAll(uncompressedBody) + if err != nil { + t.Error(err) + } + if r.Header.Get("Content-Encoding") != "gzip" { + t.Errorf("expected gzip Content-Encoding but got %s", r.Header.Get("Content-Encoding")) + } + expected := "testpt,tag1=tag1 value=1i 942105600000000003\n" + if string(bod) != expected { + t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod)) + } w.WriteHeader(http.StatusNoContent) _ = json.NewEncoder(w).Encode(data) })) @@ -162,10 +180,55 @@ func TestClient_Write(t *testing.T) { config := Config{URLs: []string{ts.URL}} c, _ := NewHTTPClient(config) + bp, err := NewBatchPoints(BatchPointsConfig{}) + bp.AddPoint(Point{ + Name: "testpt", + Tags: map[string]string{"tag1": "tag1"}, + Fields: map[string]interface{}{"value": 1}, + Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC), + }) + if err != nil { + t.Errorf("unexpected error. expected %v, actual %v", nil, err) + } + err = c.Write(bp) + if err != nil { + t.Errorf("unexpected error. expected %v, actual %v", nil, err) + } +} + +func TestClient_Write_noCompression(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var data Response + bod, err := ioutil.ReadAll(r.Body) + expected := "testpt,tag1=tag1 value=1i 942105600000000003\n" + if string(bod) != expected { + t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod)) + } + if err != nil { + t.Error(err) + } + w.WriteHeader(http.StatusNoContent) + _ = json.NewEncoder(w).Encode(data) + })) + defer ts.Close() + + config := Config{ + URLs: []string{ts.URL}, + Compression: "none", + } + c, _ := NewHTTPClient(config) + bp, err := NewBatchPoints(BatchPointsConfig{}) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } + + bp.AddPoint(Point{ + Name: "testpt", + Tags: map[string]string{"tag1": "tag1"}, + Fields: map[string]interface{}{"value": 1}, + Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC), + }) err = c.Write(bp) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) diff --git a/influxdb/gzip.go b/influxdb/gzip.go new file mode 100644 index 000000000..966673296 --- /dev/null +++ b/influxdb/gzip.go @@ -0,0 +1,29 @@ +package influxdb + +import ( + "compress/gzip" + "io" +) + +// CompressWithGzip takes an io.Reader as input and pipes +// it through a gzip.Writer returning an io.Reader containing +// the gzipped data. +// An error is returned if passing data to the gzip.Writer fails +// this is shamelessly stolen from https://github.com/influxdata/telegraf +func CompressWithGzip(data io.Reader, level int) (io.Reader, error) { + pipeReader, pipeWriter := io.Pipe() + gzipWriter, err := gzip.NewWriterLevel(pipeWriter, level) + if err != nil { + return nil, err + } + + go func() { + _, err := io.Copy(gzipWriter, data) + gzipWriter.Close() + // subsequent reads from the read half of the pipe will + // return no bytes and the error err, or EOF if err is nil. + pipeWriter.CloseWithError(err) + }() + + return pipeReader, err +} diff --git a/services/influxdb/config.go b/services/influxdb/config.go index d0b358ec4..cdafdfc9e 100644 --- a/services/influxdb/config.go +++ b/services/influxdb/config.go @@ -59,6 +59,7 @@ type Config struct { StartUpTimeout toml.Duration `toml:"startup-timeout" override:"startup-timeout"` SubscriptionSyncInterval toml.Duration `toml:"subscriptions-sync-interval" override:"subscriptions-sync-interval"` SubscriptionPath string `toml:"subscription-path" override:"subscription-path"` + Compression string `toml:"compression" override:"compression"` } func NewConfig() Config { @@ -80,6 +81,7 @@ func (c *Config) Init() { c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval) c.SubscriptionMode = ClusterMode c.SubscriptionPath = "" + c.Compression = "gzip" } func (c *Config) ApplyConditionalDefaults() { @@ -95,6 +97,9 @@ func (c *Config) ApplyConditionalDefaults() { if c.SubscriptionSyncInterval == toml.Duration(0) { c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval) } + if c.Compression == "" { + c.Compression = "gzip" + } } var validNamePattern = regexp.MustCompile(`^[-\._\p{L}0-9]+$`) @@ -124,6 +129,12 @@ func (c Config) Validate() error { default: return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %q: %v", c.SubscriptionProtocol, c) } + switch c.Compression { + case "gzip", "none": + default: + return fmt.Errorf("Invalid compression, must be one of 'gzip' or 'none', got %s", c.Compression) + } + return nil }