Skip to content

Commit

Permalink
feat: warehouse transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 18, 2024
1 parent 9683021 commit 277bc71
Show file tree
Hide file tree
Showing 54 changed files with 15,504 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down
20 changes: 16 additions & 4 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func WithClient(client *http.Client) Opt {
}
}

// Transformer provides methods to transform events
type Transformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
type UserTransformer interface {
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type DestinationTransformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type TrackingPlanValidator interface {
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

// Transformer provides methods to transform events
type Transformer interface {
UserTransformer
DestinationTransformer
TrackingPlanValidator
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand Down Expand Up @@ -526,7 +538,7 @@ func (trans *handle) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%t", trans.conf.GetBool("Warehouse.enableIDResolution", false))
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
Expand Down
62 changes: 62 additions & 0 deletions warehouse/transformer/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package transformer

import (
"fmt"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/rules"
)

func (t *transformer) handleAliasEvent(pi *processingInfo) ([]map[string]any, error) {
event := make(map[string]any)
columnTypes := make(map[string]string)

if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["traits"], event, columnTypes, &prefixInfo{
completePrefix: "alias_traits_",
completeLevel: 2,
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}

Check warning on line 20 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L19-L20

Added lines #L19 - L20 were not covered by tests
if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["context"], event, columnTypes, &prefixInfo{
completePrefix: "alias_context_",
completeLevel: 2,
prefix: "context_",
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}

Check warning on line 27 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L26-L27

Added lines #L26 - L27 were not covered by tests
if err := t.setDataAndColumnTypeFromRules(pi, event, columnTypes,
lo.Assign(rules.DefaultRules, rules.AliasRules), rules.DefaultFunctionalRules,
); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from rules: %w", err)
}

Check warning on line 32 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L31-L32

Added lines #L31 - L32 were not covered by tests

if err := storeRudderEvent(pi, event, columnTypes); err != nil {
return nil, fmt.Errorf("alias: storing rudder event: %w", err)
}

Check warning on line 36 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L35-L36

Added lines #L35 - L36 were not covered by tests

tableName, err := SafeTableName(pi.event.Metadata.DestinationType, pi.itrOpts, "aliases")
if err != nil {
return nil, fmt.Errorf("alias: safe table name: %w", err)
}

Check warning on line 41 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L40-L41

Added lines #L40 - L41 were not covered by tests
columns, err := t.getColumns(pi.event.Metadata.DestinationType, event, columnTypes)
if err != nil {
return nil, fmt.Errorf("alias: getting columns: %w", err)
}

Check warning on line 45 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L44-L45

Added lines #L44 - L45 were not covered by tests

mergeEvents, err := t.handleMergeEvent(pi)
if err != nil {
return nil, fmt.Errorf("merge event: %w", err)
}

Check warning on line 50 in warehouse/transformer/alias.go

View check run for this annotation

Codecov / codecov/patch

warehouse/transformer/alias.go#L49-L50

Added lines #L49 - L50 were not covered by tests

aliasOutput := map[string]any{
"data": event,
"metadata": map[string]any{
"table": tableName,
"columns": columns,
"receivedAt": pi.event.Metadata.ReceivedAt,
},
"userId": "",
}
return append([]map[string]any{aliasOutput}, mergeEvents...), nil
}
Loading

0 comments on commit 277bc71

Please sign in to comment.