From 79898287db138f03c92bd9edf12c2875de3f4b12 Mon Sep 17 00:00:00 2001 From: Taki <76651967+taki-mekhalfa@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:25:14 +0200 Subject: [PATCH] feat: allow Request.GetBody to be set when gzipping (#108) --- CHANGELOG.md | 4 ++++ influxdb3/write.go | 47 +++++++++++++++++++++++++++++++---------- influxdb3/write_test.go | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b18925b..685d330 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.13.0 [unreleased] +### Features + +1. [#108](https://github.com/InfluxCommunity/influxdb3-go/pull/108): Allow Request.GetBody to be set when writing gzipped data to make calls more resilient. + ## 0.12.0 [2024-10-02] ### Features diff --git a/influxdb3/write.go b/influxdb3/write.go index 399103e..743d90b 100644 --- a/influxdb3/write.go +++ b/influxdb3/write.go @@ -133,11 +133,7 @@ func (c *Client) WriteWithOptions(ctx context.Context, options *WriteOptions, bu return c.write(ctx, buff, options) } -func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) error { - // Skip zero size batch - if len(buff) == 0 { - return nil - } +func (c *Client) makeHTTPParams(buff []byte, options *WriteOptions) (*httpParams, error) { var database string if options.Database != "" { database = options.Database @@ -145,7 +141,7 @@ func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) database = c.config.Database } if database == "" { - return errors.New("database not specified") + return nil, errors.New("database not specified") } var precision = options.Precision @@ -153,7 +149,6 @@ func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) var gzipThreshold = options.GzipThreshold var body io.Reader - var err error u, _ := c.apiURL.Parse("write") params := u.Query() params.Set("org", c.config.Organization) @@ -163,19 +158,49 @@ func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) body = bytes.NewReader(buff) headers := http.Header{"Content-Type": {"application/json"}} if gzipThreshold > 0 && len(buff) >= gzipThreshold { - body, err = gzip.CompressWithGzip(body) + r, err := gzip.CompressWithGzip(body) if err != nil { - return fmt.Errorf("unable to compress write body: %w", err) + return nil, fmt.Errorf("unable to compress body: %w", err) } + + // This is necessary for Request.GetBody to be set by NewRequest, ensuring that + // the Transport can retry the request when a network error occurs. + // See: https://github.com/golang/go/blob/726d898c92ed0159f283f324478d00f15419f476/src/net/http/request.go#L884 + // See: https://github.com/golang/go/blob/726d898c92ed0159f283f324478d00f15419f476/src/net/http/transport.go#L89-L92 + // + // It is particularly useful for handling transient errors in HTTP/2 and persistent + // connections in standard HTTP. + // Additionally, it helps manage graceful HTTP/2 shutdowns (e.g. GOAWAY frames). + b, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("unable to read compressed body: %w", err) + } + body = bytes.NewReader(b) + headers["Content-Encoding"] = []string{"gzip"} } - resp, err := c.makeAPICall(ctx, httpParams{ + + return &httpParams{ endpointURL: u, httpMethod: "POST", headers: headers, queryParams: u.Query(), body: body, - }) + }, nil +} + +func (c *Client) write(ctx context.Context, buff []byte, options *WriteOptions) error { + // Skip zero size batch + if len(buff) == 0 { + return nil + } + + params, err := c.makeHTTPParams(buff, options) + if err != nil { + return err + } + + resp, err := c.makeAPICall(ctx, *params) if err != nil { return err } diff --git a/influxdb3/write_test.go b/influxdb3/write_test.go index 5466c3d..63b67fe 100644 --- a/influxdb3/write_test.go +++ b/influxdb3/write_test.go @@ -830,3 +830,45 @@ func TestWriteWithOptionsNotSet(t *testing.T) { assert.Error(t, err) assert.EqualError(t, err, "options not set") } + +func TestMakeHTTPParamsBody(t *testing.T) { + points := genPoints(100) + byts := points2bytes(t, points) + + c, err := New(ClientConfig{ + Host: "http://localhost", + Token: "my-token", + Database: "my-database", + }) + require.NoError(t, err) + + for _, gzipThreshold := range []int{ + 0, // gzipping disabled + 1, // gzipping enabled + } { + c.config.WriteOptions.GzipThreshold = gzipThreshold + + params, err := c.makeHTTPParams(byts, c.config.WriteOptions) + assert.NoError(t, err) + + // copy URL + urlObj := *params.endpointURL + urlObj.RawQuery = params.queryParams.Encode() + + fullURL := urlObj.String() + + req, err := http.NewRequestWithContext(context.Background(), params.httpMethod, fullURL, params.body) + assert.NoError(t, err) + + slurp1, err := io.ReadAll(req.Body) + assert.NoError(t, err) + + newBody, err := req.GetBody() + assert.NoError(t, err) + + slurp2, err := io.ReadAll(newBody) + assert.NoError(t, err) + + assert.Equal(t, string(slurp1), string(slurp2)) + } +}