Skip to content

Commit

Permalink
fix: webhook source transformation failed with error: cannot find module
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 13, 2023
1 parent f95fa51 commit 4750bb4
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 1 deletion.
190 changes: 190 additions & 0 deletions gateway/gateway_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package gateway_test

import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"net/http"
"os"
"path"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/runner"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/testhelper/health"
)

func TestWebhook(t *testing.T) {
bcServer := backendconfigtest.NewBuilder().
WithWorkspaceConfig(
backendconfigtest.NewConfigBuilder().
WithSource(
backendconfigtest.NewSourceBuilder().
WithID("source-1").
WithWriteKey("writekey-1").
WithSourceCategory("webhook").
WithSourceType("SeGment").
Build()).
Build()).
Build()
defer bcServer.Close()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

postgresContainer, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
transformerContainer, err := destination.SetupTransformer(pool, t)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

gwPort, err := kithelper.GetFreePort()
require.NoError(t, err)

wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := runGateway(ctx, gwPort, postgresContainer, bcServer.URL, transformerContainer.TransformURL, t.TempDir())
if err != nil {
t.Logf("rudder-server exited with error: %v", err)
}
return err
})

url := fmt.Sprintf("http://localhost:%d", gwPort)
health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name())

// send an event
err = func() error {
payload := []byte(fmt.Sprintf(`
{
"userId": %[1]q,
"type": "identity",
"context": {
"traits": {
"trait1": "new-val"
},
"ip": "14.5.67.21",
"library": {
"name": "http"
}
},
"timestamp": "2020-02-02T00:23:09.544Z"
}`,
rand.String(10),
))
req, err := http.NewRequest(http.MethodPost, url+"/v1/webhook", bytes.NewReader(payload))
if err != nil {
return err
}
req.SetBasicAuth("writekey-1", "password")

resp, err := (&http.Client{}).Do(req)
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b))
}
func() { kithttputil.CloseResponse(resp) }()
return nil
}()
require.NoError(t, err, "it should be able to send a webhook event to gateway")

requireJobsCount(t, postgresContainer.DB, "gw", jobsdb.Unprocessed.State, 1)
}

func runGateway(
ctx context.Context,
port int,
postgresContainer *resource.PostgresResource,
cbURL, transformerURL, tmpDir string,
) (err error) {
// first run node migrations
mg := &migrator.Migrator{
Handle: postgresContainer.DB,
MigrationsTable: "node_migrations",
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}

err = mg.Migrate("node")
if err != nil {
return fmt.Errorf("Unable to run the migrations for the node, err: %w", err)
}

// then start the server
config.Set("CONFIG_BACKEND_URL", cbURL)
config.Set("WORKSPACE_TOKEN", "token")
config.Set("DB.port", postgresContainer.Port)
config.Set("DB.user", postgresContainer.User)
config.Set("DB.name", postgresContainer.Database)
config.Set("DB.password", postgresContainer.Password)
config.Set("DEST_TRANSFORM_URL", transformerURL)

config.Set("APP_TYPE", "gateway")

config.Set("Gateway.webPort", strconv.Itoa(port))
config.Set("JobsDB.backup.enabled", false)
config.Set("JobsDB.migrateDSLoopSleepDuration", "60m")
config.Set("RUDDER_TMPDIR", os.TempDir())
config.Set("recovery.storagePath", path.Join(tmpDir, "/recovery_data.json"))
config.Set("recovery.enabled", false)
config.Set("Profiler.Enabled", false)
config.Set("Gateway.enableSuppressUserFeature", false)

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panicked: %v", r)
}
}()
r := runner.New(runner.ReleaseInfo{EnterpriseToken: "DUMMY"})
c := r.Run(ctx, []string{"rudder-gw"})
if c != 0 {
err = fmt.Errorf("rudder-server exited with a non-0 exit code: %d", c)
}
return
}

// nolint: unparam
func requireJobsCount(
t *testing.T,
db *sql.DB,
queue, state string,
expectedCount int,
) {
t.Helper()

query := fmt.Sprintf(`SELECT count(*) FROM unionjobsdbmetadata('%s',1) WHERE job_state = '%s';`, queue, state)
if state == jobsdb.Unprocessed.State {
query = fmt.Sprintf(`SELECT count(*) FROM unionjobsdbmetadata('%s',1) WHERE job_state IS NULL;`, queue)
}
require.Eventually(t, func() bool {
var jobsCount int
require.NoError(t, db.QueryRow(query).Scan(&jobsCount))
t.Logf("%s %sJobCount: %d", queue, state, jobsCount)
return jobsCount == expectedCount
},
20*time.Second,
1*time.Second,
fmt.Sprintf("%d %s events should be in %s state", expectedCount, queue, state),
)
}
3 changes: 2 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -77,7 +78,7 @@ func newSourceTransformAdapter(version string) sourceTransformAdapter {

func getTransformerURL(version, sourceType string) (string, error) {
baseURL := config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
return url.JoinPath(baseURL, version, "sources", sourceType)
return url.JoinPath(baseURL, version, "sources", strings.ToLower(sourceType))
}

type outputToSource struct {
Expand Down
12 changes: 12 additions & 0 deletions testhelper/backendconfigtest/source_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@ func (b *SourceBuilder) WithGeoenrichmentEnabled(enabled bool) *SourceBuilder {
b.v.GeoEnrichment.Enabled = enabled
return b
}

// WithSourceCategory sets the source definition category
func (b *SourceBuilder) WithSourceCategory(category string) *SourceBuilder {
b.v.SourceDefinition.Category = category
return b
}

// WithSourceType sets the source type
func (b *SourceBuilder) WithSourceType(sourceType string) *SourceBuilder {
b.v.SourceDefinition.Name = sourceType
return b
}

0 comments on commit 4750bb4

Please sign in to comment.