From 20973ae682f4116895e83923cb26c36046ff9a16 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Thu, 5 Sep 2024 22:45:40 +0530 Subject: [PATCH 1/2] fix: revert webhook headers support --- gateway/webhook/webhook.go | 34 ++++++++++++++++----------------- gateway/webhook/webhook_test.go | 12 ++++++------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/gateway/webhook/webhook.go b/gateway/webhook/webhook.go index ccd6fc9bbf..ebcc633ccc 100644 --- a/gateway/webhook/webhook.go +++ b/gateway/webhook/webhook.go @@ -269,16 +269,16 @@ func (webhook *HandleT) batchRequests(sourceDef string, requestQ chan *webhookT) } } -func getXHeaders(req *http.Request) map[string]string { - xHeaders := make(map[string]string) - for key, values := range req.Header { - lowerCaseKey := strings.ToLower(key) - if !strings.HasPrefix(lowerCaseKey, "x-forwarded-") && strings.HasPrefix(lowerCaseKey, "x-") { - xHeaders[key] = strings.Join(values, ",") - } - } - return xHeaders -} +// func getXHeaders(req *http.Request) map[string]string { +// xHeaders := make(map[string]string) +// for key, values := range req.Header { +// lowerCaseKey := strings.ToLower(key) +// if !strings.HasPrefix(lowerCaseKey, "x-forwarded-") && strings.HasPrefix(lowerCaseKey, "x-") { +// xHeaders[key] = strings.Join(values, ",") +// } +// } +// return xHeaders +// } func prepareRequestBody(req *http.Request, sourceType string, sourceListForParsingParams []string) ([]byte, error) { defer func() { @@ -303,14 +303,14 @@ func prepareRequestBody(req *http.Request, sourceType string, sourceListForParsi } } - xHeaders := getXHeaders(req) - if len(xHeaders) > 0 { - body, err = sjson.SetBytes(body, "headers", xHeaders) - if err != nil { - return nil, errors.New(response.InvalidJSON) - } + // xHeaders := getXHeaders(req) + // if len(xHeaders) > 0 { + // body, err = sjson.SetBytes(body, "headers", xHeaders) + // if err != nil { + // return nil, errors.New(response.InvalidJSON) + // } - } + // } return body, nil } diff --git a/gateway/webhook/webhook_test.go b/gateway/webhook/webhook_test.go index 7774f651f2..e4af321485 100644 --- a/gateway/webhook/webhook_test.go +++ b/gateway/webhook/webhook_test.go @@ -549,12 +549,12 @@ func TestPrepareRequestBody(t *testing.T) { sourceType: "shopify", expectedResponse: `{"key":"value","query_parameters":{}}`, }, - { - name: "Some payload with headers for shopify", - req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key":"value"}`), headers: map[string]string{"X-Key": "header-value"}}), - sourceType: "shopify", - expectedResponse: `{"key":"value","query_parameters":{},"headers":{"X-Key":"header-value"}}`, - }, + // { + // name: "Some payload with headers for shopify", + // req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key":"value"}`), headers: map[string]string{"X-Key": "header-value"}}), + // sourceType: "shopify", + // expectedResponse: `{"key":"value","query_parameters":{},"headers":{"X-Key":"header-value"}}`, + // }, { name: "Some payload with query parameters for Adjust", req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key1":"value1"}`), params: map[string]string{"key2": "value2"}}), From e12d8b4977ab47fdbb666cdce6b8073320de5061 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Fri, 6 Sep 2024 07:41:01 +0530 Subject: [PATCH 2/2] fix: set headers only if we can set without any error --- gateway/webhook/integration_test.go | 20 +++++++------ gateway/webhook/webhook.go | 44 +++++++++++++++-------------- gateway/webhook/webhook_test.go | 28 +++++++++++++----- go.mod | 2 +- go.sum | 4 +-- 5 files changed, 59 insertions(+), 39 deletions(-) diff --git a/gateway/webhook/integration_test.go b/gateway/webhook/integration_test.go index 6a3ae97c4c..3897d0d810 100644 --- a/gateway/webhook/integration_test.go +++ b/gateway/webhook/integration_test.go @@ -284,14 +284,18 @@ func TestIntegrationWebhook(t *testing.T) { assert.JSONEq(t, string(p), string(batch.Batch[0])) } - r, err = errDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{ - WorkspaceID: workspaceID, - // ParameterFilters: []jobsdb.ParameterFilterT{{ - // Name: "source_id", - // Value: sourceID, - // }}, - JobsLimit: 1, - }) + require.Eventually(t, func() bool { + r, err = errDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{ + WorkspaceID: workspaceID, + ParameterFilters: []jobsdb.ParameterFilterT{{ + Name: "source_id", + Value: sourceID, + }}, + JobsLimit: 10, + }) + return err == nil && len(r.Jobs) == len(tc.Output.ErrQueue) + }, time.Second, time.Millisecond*10) + require.NoError(t, err) assert.Len(t, r.Jobs, len(tc.Output.ErrQueue)) for i, p := range tc.Output.ErrQueue { diff --git a/gateway/webhook/webhook.go b/gateway/webhook/webhook.go index ebcc633ccc..bfe8a92078 100644 --- a/gateway/webhook/webhook.go +++ b/gateway/webhook/webhook.go @@ -269,18 +269,18 @@ func (webhook *HandleT) batchRequests(sourceDef string, requestQ chan *webhookT) } } -// func getXHeaders(req *http.Request) map[string]string { -// xHeaders := make(map[string]string) -// for key, values := range req.Header { -// lowerCaseKey := strings.ToLower(key) -// if !strings.HasPrefix(lowerCaseKey, "x-forwarded-") && strings.HasPrefix(lowerCaseKey, "x-") { -// xHeaders[key] = strings.Join(values, ",") -// } -// } -// return xHeaders -// } - -func prepareRequestBody(req *http.Request, sourceType string, sourceListForParsingParams []string) ([]byte, error) { +func getXHeaders(req *http.Request) map[string]string { + xHeaders := make(map[string]string) + for key, values := range req.Header { + lowerCaseKey := strings.ToLower(key) + if !strings.HasPrefix(lowerCaseKey, "x-forwarded-") && strings.HasPrefix(lowerCaseKey, "x-") { + xHeaders[key] = strings.Join(values, ",") + } + } + return xHeaders +} + +func (webhook *HandleT) prepareRequestBody(req *http.Request, sourceType string, sourceListForParsingParams []string) ([]byte, error) { defer func() { _ = req.Body.Close() }() @@ -303,14 +303,16 @@ func prepareRequestBody(req *http.Request, sourceType string, sourceListForParsi } } - // xHeaders := getXHeaders(req) - // if len(xHeaders) > 0 { - // body, err = sjson.SetBytes(body, "headers", xHeaders) - // if err != nil { - // return nil, errors.New(response.InvalidJSON) - // } - - // } + xHeaders := getXHeaders(req) + if len(xHeaders) > 0 { + bodyWithHeaders, err := sjson.SetBytes(body, "headers", xHeaders) + if err != nil { + // When headers are not set in body, log the error and continue without setting headers + webhook.logger.Infof("Error while setting headers for source type=%s in body: %s", sourceType, err.Error()) + } else { + body = bodyWithHeaders + } + } return body, nil } @@ -343,7 +345,7 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() { var payloadArr [][]byte var webRequests []*webhookT for _, req := range breq.batchRequest { - body, err := prepareRequestBody(req.request, breq.sourceType, bt.webhook.config.sourceListForParsingParams) + body, err := bt.webhook.prepareRequestBody(req.request, breq.sourceType, bt.webhook.config.sourceListForParsingParams) if err != nil { req.done <- transformerResponse{Err: response.GetStatus(err.Error())} continue diff --git a/gateway/webhook/webhook_test.go b/gateway/webhook/webhook_test.go index e4af321485..c074b80bdc 100644 --- a/gateway/webhook/webhook_test.go +++ b/gateway/webhook/webhook_test.go @@ -482,6 +482,12 @@ func TestRecordWebhookErrors(t *testing.T) { } func TestPrepareRequestBody(t *testing.T) { + initWebhook() + ctrl := gomock.NewController(t) + mockGW := mockWebhook.NewMockGateway(ctrl) + statsStore, err := memstats.New() + require.NoError(t, err) + webhookHandler := Setup(mockGW, transformer.NewNoOpService(), statsStore) type requestOpts struct { method string target string @@ -508,6 +514,7 @@ func TestPrepareRequestBody(t *testing.T) { name string req *http.Request sourceType string + description string includeQueryParams bool wantError bool expectedResponse string @@ -549,12 +556,19 @@ func TestPrepareRequestBody(t *testing.T) { sourceType: "shopify", expectedResponse: `{"key":"value","query_parameters":{}}`, }, - // { - // name: "Some payload with headers for shopify", - // req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key":"value"}`), headers: map[string]string{"X-Key": "header-value"}}), - // sourceType: "shopify", - // expectedResponse: `{"key":"value","query_parameters":{},"headers":{"X-Key":"header-value"}}`, - // }, + { + name: "Some payload with headers for shopify", + req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key":"value"}`), headers: map[string]string{"X-Key": "header-value"}}), + sourceType: "shopify", + expectedResponse: `{"key":"value","query_parameters":{},"headers":{"X-Key":"header-value"}}`, + }, + { + name: "Some payload with headers for sendgrid", + req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`[{"key":"value"}]`), headers: map[string]string{"X-Key": "header-value"}}), + sourceType: "webhook", + description: "Request Body is array so headers won't be included", + expectedResponse: `[{"key":"value"}]`, + }, { name: "Some payload with query parameters for Adjust", req: createRequest(requestOpts{method: http.MethodPost, target: "http://example.com", body: strings.NewReader(`{"key1":"value1"}`), params: map[string]string{"key2": "value2"}}), @@ -571,7 +585,7 @@ func TestPrepareRequestBody(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result, err := prepareRequestBody(tc.req, tc.sourceType, []string{"adjust", "shopify"}) + result, err := webhookHandler.prepareRequestBody(tc.req, tc.sourceType, []string{"adjust", "shopify"}) if tc.wantError { require.Error(t, err) return diff --git a/go.mod b/go.mod index 6322503170..d67ec881f3 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( github.com/rudderlabs/rudder-go-kit v0.38.0 github.com/rudderlabs/rudder-observability-kit v0.0.3 github.com/rudderlabs/rudder-schemas v0.5.1 - github.com/rudderlabs/rudder-transformer/go v0.0.0-20240812044419-23196ec42acf + github.com/rudderlabs/rudder-transformer/go v0.0.0-20240904053719-1777c6cb23f9 github.com/rudderlabs/sql-tunnels v0.1.7 github.com/rudderlabs/sqlconnect-go v1.9.0 github.com/samber/lo v1.47.0 diff --git a/go.sum b/go.sum index b1b5631f28..72c29f81a3 100644 --- a/go.sum +++ b/go.sum @@ -1134,8 +1134,8 @@ github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2Y github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8= github.com/rudderlabs/rudder-schemas v0.5.1 h1:g4I5wp2yA6ZWQZ1MjSNn4zby3XctG/TOgbYUW3dS4z4= github.com/rudderlabs/rudder-schemas v0.5.1/go.mod h1:JoDTB9nCDXwRz+G+aYwP3Fj42HLssKARxsFFm+qqgb4= -github.com/rudderlabs/rudder-transformer/go v0.0.0-20240812044419-23196ec42acf h1:nsU2tKjPV/sbmOoIk39ncFT8D5HBDVppmrCWO0v9HsU= -github.com/rudderlabs/rudder-transformer/go v0.0.0-20240812044419-23196ec42acf/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE= +github.com/rudderlabs/rudder-transformer/go v0.0.0-20240904053719-1777c6cb23f9 h1:ecY7YIVRITu04y8fcRUdcTASlFUtzOtjhsyu8P6b4eY= +github.com/rudderlabs/rudder-transformer/go v0.0.0-20240904053719-1777c6cb23f9/go.mod h1:3NGitPz4pYRRZ6Xt09S+8hb0tHK/9pZcKJ3OgOTaSmE= github.com/rudderlabs/sql-tunnels v0.1.7 h1:wDCRl6zY4M5gfWazf7XkSTGQS3yjBzUiUgEMBIfHNDA= github.com/rudderlabs/sql-tunnels v0.1.7/go.mod h1:5f7+YL49JHYgteP4rAgqKnr4K2OadB0oIpUS+Tt3sPM= github.com/rudderlabs/sqlconnect-go v1.9.0 h1:icLgqvVQ15Vh+oP7epA0b0yK6sIzxRVwPlRzOoDNVRA=