Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gzip bodyFormat support #3904

Merged
merged 17 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions router/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package router

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -124,6 +126,22 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
formValues.Set(key, fmt.Sprint(val)) // transformer ensures top level string values, still val.(string) would be restrictive
}
payload = strings.NewReader(formValues.Encode())
case "GZIP":
strValue, ok := bodyValue["payload"].(string)
if !ok {
return &utils.SendPostResponse{
StatusCode: 400,
ResponseBody: []byte("400 Unable to construct gzip payload. Unexpected transformer response"),
}
}
zippedData, err := base64.StdEncoding.DecodeString(strValue)
koladilip marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return &utils.SendPostResponse{
StatusCode: 400,
ResponseBody: []byte("400 Unable to decode gzip data. Unexpected transformer response"),
}
}
payload = bytes.NewBuffer(zippedData)
default:
panic(fmt.Errorf("bodyFormat: %s is not supported", bodyFormat))
}
Expand Down
96 changes: 96 additions & 0 deletions router/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package router

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

"github.com/rudderlabs/rudder-go-kit/logger"
mocksSysUtils "github.com/rudderlabs/rudder-server/mocks/utils/sysUtils"
Expand All @@ -31,6 +36,96 @@ func (c *networkContext) Finish() {
c.mockCtrl.Finish()
}

func gzipAndEncodeBase64(data []byte) string {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
_, _ = zw.Write(data)
_ = zw.Close()
return base64.StdEncoding.EncodeToString(buf.Bytes())
}

func TestSendPostWithGzipData(t *testing.T) {
t.Run("should send Gzip data when payload is valid", func(r *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Encoding") != "gzip" {
w.WriteHeader(http.StatusBadRequest)
return
}
body, err := gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer body.Close()
buf := new(bytes.Buffer)
_, _ = buf.ReadFrom(body)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(buf.Bytes())
}))
network := &netHandle{}
network.logger = logger.NewLogger().Child("network")
network.httpClient = http.DefaultClient
eventData := []byte(`[{"event":"Signed Up"}]`)
var structData integrations.PostParametersT
structData.RequestMethod = "POST"
structData.Type = "REST"
structData.URL = testServer.URL
structData.UserID = "anon_id"
structData.Body = map[string]interface{}{
"GZIP": map[string]interface{}{
"payload": gzipAndEncodeBase64(eventData),
},
}
structData.Headers = map[string]interface{}{
"Content-Encoding": "gzip",
}

resp := network.SendPost(context.Background(), structData)
assert.Equal(r, resp.StatusCode, http.StatusOK)
assert.Equal(r, resp.ResponseBody, eventData)
})

t.Run("should fail to send Gzip data when payload is missing", func(r *testing.T) {
network := &netHandle{}
network.logger = logger.NewLogger().Child("network")
network.httpClient = http.DefaultClient
eventData := []byte(`[{"event":"Signed Up"}]`)
var structData integrations.PostParametersT
structData.RequestMethod = "POST"
structData.Type = "REST"
structData.UserID = "anon_id"
structData.Body = map[string]interface{}{
"GZIP": map[string]interface{}{
"abc": gzipAndEncodeBase64(eventData),
},
}

resp := network.SendPost(context.Background(), structData)
assert.Equal(r, resp.StatusCode, http.StatusBadRequest)
assert.Equal(r, resp.ResponseBody, []byte("400 Unable to construct gzip payload. Unexpected transformer response"))
})

t.Run("should fail to send Gzip data when payload is invalid", func(r *testing.T) {
network := &netHandle{}
network.logger = logger.NewLogger().Child("network")
network.httpClient = http.DefaultClient
eventData := "Signed up"
var structData integrations.PostParametersT
structData.RequestMethod = "POST"
structData.Type = "REST"
structData.UserID = "anon_id"
structData.Body = map[string]interface{}{
"GZIP": map[string]interface{}{
"payload": eventData,
},
}

resp := network.SendPost(context.Background(), structData)
assert.Equal(r, resp.StatusCode, http.StatusBadRequest)
assert.Equal(r, resp.ResponseBody, []byte("400 Unable to decode gzip data. Unexpected transformer response"))
})
}

var _ = Describe("Network", func() {
var c *networkContext

Expand Down Expand Up @@ -149,6 +244,7 @@ var _ = Describe("Network", func() {
"FORM": map[string]interface{}{},
"JSON": map[string]interface{}{},
"XML": map[string]interface{}{},
"GZIP": map[string]interface{}{},
koladilip marked this conversation as resolved.
Show resolved Hide resolved
}
requestParams.Files = map[string]interface{}{}

Expand Down
Loading