Skip to content

Commit

Permalink
feat: add gzip bodyFormat support (#3904)
Browse files Browse the repository at this point in the history
* feat: add gzip bodyFormat support

* feat: add tests for gzip compression

* chore: remove unused function

* refactor: use native tests for gzip body format

* test: add testcases

* refactor: fix golangci linting errors

* chore: update go mod and go sum

* refactor: remove redundant header

* feat: remove base64 encoding and add gzip compression

* refactor: apply review suggestions

---------

Co-authored-by: Dilip Kola <kdilipkola@gmail.com>
  • Loading branch information
Gauravudia and koladilip authored Oct 4, 2023
1 parent 7ead8a9 commit b050450
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 1 deletion.
35 changes: 34 additions & 1 deletion router/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package router

import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
Expand Down Expand Up @@ -90,6 +92,7 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
}

var payload io.Reader
headers := map[string]string{"User-Agent": "RudderLabs"}
// support for JSON and FORM body type
if len(bodyValue) > 0 {
switch bodyFormat {
Expand Down Expand Up @@ -124,6 +127,34 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
formValues.Set(key, fmt.Sprint(val)) // transformer ensures top level string values, still val.(string) would be restrictive
}
payload = strings.NewReader(formValues.Encode())
case "GZIP":
strValue, ok := bodyValue["payload"].(string)
if !ok {
return &utils.SendPostResponse{
StatusCode: 400,
ResponseBody: []byte("400 Unable to parse json list. Unexpected transformer response"),
}
}
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
defer zw.Close()

if _, err := zw.Write([]byte(strValue)); err != nil {
return &utils.SendPostResponse{
StatusCode: 400,
ResponseBody: []byte("400 Unable to compress data. Unexpected response"),
}
}

if err := zw.Close(); err != nil {
return &utils.SendPostResponse{
StatusCode: 400,
ResponseBody: []byte("400 Unable to flush compressed data. Unexpected response"),
}
}

headers["Content-Encoding"] = "gzip"
payload = &buf
default:
panic(fmt.Errorf("bodyFormat: %s is not supported", bodyFormat))
}
Expand Down Expand Up @@ -156,7 +187,9 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
req.Header.Add(key, val.(string))
}

req.Header.Add("User-Agent", "RudderLabs")
for key, val := range headers {
req.Header.Add(key, val)
}

resp, err := client.Do(req)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions router/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package router

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/logger"
mocksSysUtils "github.com/rudderlabs/rudder-server/mocks/utils/sysUtils"
Expand All @@ -31,6 +35,65 @@ func (c *networkContext) Finish() {
c.mockCtrl.Finish()
}

func TestSendPostWithGzipData(t *testing.T) {
t.Run("should send Gzip data when payload is valid", func(r *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Encoding") != "gzip" {
w.WriteHeader(http.StatusBadRequest)
return
}
body, err := gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer body.Close()
buf := new(bytes.Buffer)
_, _ = buf.ReadFrom(body)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf.Bytes())
}))
network := &netHandle{}
network.logger = logger.NewLogger().Child("network")
network.httpClient = http.DefaultClient
eventData := `[{"event":"Signed Up"}]`
var structData integrations.PostParametersT
structData.RequestMethod = "POST"
structData.Type = "REST"
structData.URL = testServer.URL
structData.UserID = "anon_id"
structData.Body = map[string]interface{}{
"GZIP": map[string]interface{}{
"payload": eventData,
},
}

resp := network.SendPost(context.Background(), structData)
require.Equal(r, resp.StatusCode, http.StatusOK)
require.Equal(r, string(resp.ResponseBody), eventData)
})

t.Run("should fail to send Gzip data when payload is missing", func(r *testing.T) {
network := &netHandle{}
network.logger = logger.NewLogger().Child("network")
network.httpClient = http.DefaultClient
eventData := `[{"event":"Signed Up"}]`
var structData integrations.PostParametersT
structData.RequestMethod = "POST"
structData.Type = "REST"
structData.UserID = "anon_id"
structData.Body = map[string]interface{}{
"GZIP": map[string]interface{}{
"abc": eventData,
},
}

resp := network.SendPost(context.Background(), structData)
require.Equal(r, resp.StatusCode, http.StatusBadRequest)
require.Equal(r, resp.ResponseBody, []byte("400 Unable to parse json list. Unexpected transformer response"))
})
}

var _ = Describe("Network", func() {
var c *networkContext

Expand Down Expand Up @@ -149,6 +212,7 @@ var _ = Describe("Network", func() {
"FORM": map[string]interface{}{},
"JSON": map[string]interface{}{},
"XML": map[string]interface{}{},
"GZIP": map[string]interface{}{},
}
requestParams.Files = map[string]interface{}{}

Expand Down

0 comments on commit b050450

Please sign in to comment.