Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: webhook source transformation failed with error: cannot find module #4120

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions gateway/gateway_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package gateway_test

import (
"bytes"
"context"
"database/sql"
"fmt"
"net/http"
"os"
"path"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/runner"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/testhelper/health"
)

func TestWebhook(t *testing.T) {
bcServer := backendconfigtest.NewBuilder().
WithWorkspaceConfig(
backendconfigtest.NewConfigBuilder().
WithSource(
backendconfigtest.NewSourceBuilder().
WithID("source-1").
WithWriteKey("writekey-1").
WithSourceCategory("webhook").
WithSourceType("SeGment").
Build()).
Build()).
Build()
defer bcServer.Close()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

postgresContainer, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
transformerContainer, err := destination.SetupTransformer(pool, t)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runGateway(ctx, gwPort, postgresContainer, bcServer.URL, transformerContainer.TransformURL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})

url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())

// send an event
req, err := http.NewRequest(http.MethodPost, url+"/v1/webhook", bytes.NewReader([]byte(`{"userId": "user-1", "type": "identity"}`)))
require.NoError(t, err)
req.SetBasicAuth("writekey-1", "password")
resp, err := (&http.Client{}).Do(req)
require.NoError(t, err, "it should be able to send a webhook event to gateway")
require.Equal(t, http.StatusOK, resp.StatusCode)
func() { kithttputil.CloseResponse(resp) }()
require.NoError(t, err, "it should be able to send a webhook event to gateway")

// check that the event is stored in jobsdb
requireJobsCount(t, postgresContainer.DB, "gw", jobsdb.Unprocessed.State, 1)
}

func runGateway(
ctx context.Context,
port int,
postgresContainer *resource.PostgresResource,
cbURL, transformerURL, tmpDir string,
) (err error) {
// first run node migrations
mg := &migrator.Migrator{
Handle: postgresContainer.DB,
MigrationsTable: "node_migrations",
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}

err = mg.Migrate("node")
if err != nil {
return fmt.Errorf("Unable to run the migrations for the node, err: %w", err)
}

// then start the server
config.Set("CONFIG_BACKEND_URL", cbURL)
config.Set("WORKSPACE_TOKEN", "token")
config.Set("DB.port", postgresContainer.Port)
config.Set("DB.user", postgresContainer.User)
config.Set("DB.name", postgresContainer.Database)
config.Set("DB.password", postgresContainer.Password)
config.Set("DEST_TRANSFORM_URL", transformerURL)

config.Set("APP_TYPE", "gateway")

config.Set("Gateway.webPort", strconv.Itoa(port))
config.Set("JobsDB.backup.enabled", false)
config.Set("JobsDB.migrateDSLoopSleepDuration", "60m")
config.Set("RUDDER_TMPDIR", os.TempDir())
config.Set("recovery.storagePath", path.Join(tmpDir, "/recovery_data.json"))
config.Set("recovery.enabled", false)
config.Set("Profiler.Enabled", false)
config.Set("Gateway.enableSuppressUserFeature", false)

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panicked: %v", r)
}
}()
r := runner.New(runner.ReleaseInfo{EnterpriseToken: "DUMMY"})
c := r.Run(ctx, []string{"rudder-gw"})
if c != 0 {
err = fmt.Errorf("rudder-server exited with a non-0 exit code: %d", c)
}
return
}

// nolint: unparam
func requireJobsCount(
t *testing.T,
db *sql.DB,
queue, state string,
expectedCount int,
) {
t.Helper()

query := fmt.Sprintf(`SELECT count(*) FROM unionjobsdbmetadata('%s',1) WHERE job_state = '%s';`, queue, state)
if state == jobsdb.Unprocessed.State {
query = fmt.Sprintf(`SELECT count(*) FROM unionjobsdbmetadata('%s',1) WHERE job_state IS NULL;`, queue)
}
require.Eventually(t, func() bool {
var jobsCount int
require.NoError(t, db.QueryRow(query).Scan(&jobsCount))
t.Logf("%s %sJobCount: %d", queue, state, jobsCount)
return jobsCount == expectedCount
},
20*time.Second,
1*time.Second,
fmt.Sprintf("%d %s events should be in %s state", expectedCount, queue, state),
)
}
3 changes: 2 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -77,7 +78,7 @@ func newSourceTransformAdapter(version string) sourceTransformAdapter {

func getTransformerURL(version, sourceType string) (string, error) {
baseURL := config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
return url.JoinPath(baseURL, version, "sources", sourceType)
return url.JoinPath(baseURL, version, "sources", strings.ToLower(sourceType))
}

type outputToSource struct {
Expand Down
7 changes: 5 additions & 2 deletions gateway/webhook/webhookTransformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ func TestV0Adapter(t *testing.T) {

t.Run("should return the right url", func(t *testing.T) {
testSrcType := "testSrcType"
testSrcTypeLower := "testsrctype"
url, err := v0Adapter.getTransformerURL(testSrcType)
require.Nil(t, err)
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V0, testSrcType)))
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V0, testSrcTypeLower)))
})

t.Run("should return the body as is", func(t *testing.T) {
Expand All @@ -35,9 +36,11 @@ func TestV1Adapter(t *testing.T) {
t.Run("should return the right url", func(t *testing.T) {
v1Adapter := newSourceTransformAdapter(transformer.V1)
testSrcType := "testSrcType"
testSrcTypeLower := "testsrctype"

url, err := v1Adapter.getTransformerURL(testSrcType)
require.Nil(t, err)
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V1, testSrcType)))
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V1, testSrcTypeLower)))
})

t.Run("should return the body in v1 format", func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions testhelper/backendconfigtest/source_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@ func (b *SourceBuilder) WithGeoenrichmentEnabled(enabled bool) *SourceBuilder {
b.v.GeoEnrichment.Enabled = enabled
return b
}

// WithSourceCategory sets the source definition category
func (b *SourceBuilder) WithSourceCategory(category string) *SourceBuilder {
b.v.SourceDefinition.Category = category
return b
}

// WithSourceType sets the source type
func (b *SourceBuilder) WithSourceType(sourceType string) *SourceBuilder {
b.v.SourceDefinition.Name = sourceType
return b
}
Loading