Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gzip data by default that is sent to influxdb. #2456

Merged
merged 2 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#2055](https://github.com/influxdata/kapacitor/pull/2055): Add support for correlate in the Alerta AlertNode, thanks @nermolaev!
- [#2409](https://github.com/influxdata/kapacitor/pull/2409): Optionally use kapacitor alert details as opsgenie description text, thanks @JamesClonk!
- [#2441](https://github.com/influxdata/kapacitor/pull/2441): Preallocate GroupIDs for increased performance by reducing allocations.
- [#2456](https://github.com/influxdata/kapacitor/pull/2456): Gzip data by default that is sent to influxdb.

## v1.5.7 [2020-10-27]

Expand Down
61 changes: 46 additions & 15 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package influxdb

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -78,6 +81,9 @@ type Config struct {
// Transport is the HTTP transport to use for requests
// If nil, a default transport will be used.
Transport *http.Transport

// Which compression should we use for writing to influxdb, defaults to "gzip".
Compression string
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -106,11 +112,12 @@ type Credentials struct {

// HTTPClient is safe for concurrent use.
type HTTPClient struct {
mu sync.RWMutex
config Config
urls []url.URL
client *http.Client
index int32
mu sync.RWMutex
config Config
urls []url.URL
client *http.Client
index int32
compression string
}

// NewHTTPClient returns a new Client from the provided config.
Expand All @@ -134,6 +141,14 @@ func NewHTTPClient(conf Config) (*HTTPClient, error) {
Transport: conf.Transport,
},
}
switch compression := strings.ToLower(strings.TrimSpace(conf.Compression)); compression {
case "none":
return c, nil
case "gzip", "": // treat gzip as default
c.compression = "gzip"
default:
return nil, fmt.Errorf("%s is not a supported compression type", compression)
}
return c, nil
}

Expand Down Expand Up @@ -310,8 +325,29 @@ func (c *HTTPClient) Ping(ctx context.Context) (time.Duration, string, error) {
}

func (c *HTTPClient) Write(bp BatchPoints) error {
var b bytes.Buffer
b := bytes.Buffer{}
precision := bp.Precision()

u := c.url()
u.Path = "write"
v := url.Values{}
v.Set("db", bp.Database())
v.Set("rp", bp.RetentionPolicy())
v.Set("precision", bp.Precision())
v.Set("consistency", bp.WriteConsistency())
u.RawQuery = v.Encode()

reqBody := io.Reader(&b)

if c.compression == "gzip" {
var err error
reqBody, err = CompressWithGzip(reqBody, gzip.DefaultCompression)
if err != nil {
return err
}

}

for _, p := range bp.Points() {
if _, err := b.Write(p.Bytes(precision)); err != nil {
return err
Expand All @@ -322,19 +358,14 @@ func (c *HTTPClient) Write(bp BatchPoints) error {
}
}

u := c.url()
u.Path = "write"
v := url.Values{}
v.Set("db", bp.Database())
v.Set("rp", bp.RetentionPolicy())
v.Set("precision", bp.Precision())
v.Set("consistency", bp.WriteConsistency())
u.RawQuery = v.Encode()
req, err := http.NewRequest("POST", u.String(), &b)
req, err := http.NewRequest("POST", u.String(), reqBody)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
if c.compression == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

_, err = c.do(req, nil, http.StatusNoContent, http.StatusOK)
return err
Expand Down
63 changes: 63 additions & 0 deletions influxdb/client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package influxdb

import (
"compress/gzip"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)

func TestClient_Query(t *testing.T) {
Expand Down Expand Up @@ -154,6 +157,21 @@ func TestClient_Concurrent_Use(t *testing.T) {
func TestClient_Write(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
uncompressedBody, err := gzip.NewReader(r.Body)
if err != nil {
t.Error(err)
}
bod, err := ioutil.ReadAll(uncompressedBody)
if err != nil {
t.Error(err)
}
if r.Header.Get("Content-Encoding") != "gzip" {
t.Errorf("expected gzip Content-Encoding but got %s", r.Header.Get("Content-Encoding"))
}
expected := "testpt,tag1=tag1 value=1i 942105600000000003\n"
if string(bod) != expected {
t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod))
}
w.WriteHeader(http.StatusNoContent)
_ = json.NewEncoder(w).Encode(data)
}))
Expand All @@ -162,10 +180,55 @@ func TestClient_Write(t *testing.T) {
config := Config{URLs: []string{ts.URL}}
c, _ := NewHTTPClient(config)

bp, err := NewBatchPoints(BatchPointsConfig{})
bp.AddPoint(Point{
Name: "testpt",
Tags: map[string]string{"tag1": "tag1"},
Fields: map[string]interface{}{"value": 1},
Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC),
})
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
err = c.Write(bp)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
}

func TestClient_Write_noCompression(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
bod, err := ioutil.ReadAll(r.Body)
expected := "testpt,tag1=tag1 value=1i 942105600000000003\n"
if string(bod) != expected {
t.Errorf("unexpected send, expected '%s', got '%s'", expected, string(bod))
}
if err != nil {
t.Error(err)
}
w.WriteHeader(http.StatusNoContent)
_ = json.NewEncoder(w).Encode(data)
}))
defer ts.Close()

config := Config{
URLs: []string{ts.URL},
Compression: "none",
}
c, _ := NewHTTPClient(config)

bp, err := NewBatchPoints(BatchPointsConfig{})
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}

bp.AddPoint(Point{
Name: "testpt",
Tags: map[string]string{"tag1": "tag1"},
Fields: map[string]interface{}{"value": 1},
Time: time.Date(1999, 11, 9, 0, 0, 0, 3, time.UTC),
})
err = c.Write(bp)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
Expand Down
29 changes: 29 additions & 0 deletions influxdb/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package influxdb

import (
"compress/gzip"
"io"
)

// CompressWithGzip takes an io.Reader as input and pipes
// it through a gzip.Writer returning an io.Reader containing
// the gzipped data.
// An error is returned if passing data to the gzip.Writer fails
// this is shamelessly stolen from https://github.com/influxdata/telegraf
func CompressWithGzip(data io.Reader, level int) (io.Reader, error) {
pipeReader, pipeWriter := io.Pipe()
gzipWriter, err := gzip.NewWriterLevel(pipeWriter, level)
if err != nil {
return nil, err
}

go func() {
_, err := io.Copy(gzipWriter, data)
gzipWriter.Close()
// subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil.
pipeWriter.CloseWithError(err)
}()

return pipeReader, err
}
11 changes: 11 additions & 0 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Config struct {
StartUpTimeout toml.Duration `toml:"startup-timeout" override:"startup-timeout"`
SubscriptionSyncInterval toml.Duration `toml:"subscriptions-sync-interval" override:"subscriptions-sync-interval"`
SubscriptionPath string `toml:"subscription-path" override:"subscription-path"`
Compression string `toml:"compression" override:"compression"`
}

func NewConfig() Config {
Expand All @@ -80,6 +81,7 @@ func (c *Config) Init() {
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
c.SubscriptionMode = ClusterMode
c.SubscriptionPath = ""
c.Compression = "gzip"
}

func (c *Config) ApplyConditionalDefaults() {
Expand All @@ -95,6 +97,9 @@ func (c *Config) ApplyConditionalDefaults() {
if c.SubscriptionSyncInterval == toml.Duration(0) {
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
}
if c.Compression == "" {
c.Compression = "gzip"
}
}

var validNamePattern = regexp.MustCompile(`^[-\._\p{L}0-9]+$`)
Expand Down Expand Up @@ -124,6 +129,12 @@ func (c Config) Validate() error {
default:
return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %q: %v", c.SubscriptionProtocol, c)
}
switch c.Compression {
case "gzip", "none":
default:
return fmt.Errorf("Invalid compression, must be one of 'gzip' or 'none', got %s", c.Compression)
}

return nil
}

Expand Down