Skip to content

Commit

Permalink
feat(dm): restructured the module to have better separation of concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed Oct 12, 2023
1 parent a202965 commit a0ebb30
Show file tree
Hide file tree
Showing 23 changed files with 405 additions and 344 deletions.
10 changes: 5 additions & 5 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway"
gwThrottler "github.com/rudderlabs/rudder-server/gateway/throttler"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/processor"
proc "github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routerManager "github.com/rudderlabs/rudder-server/router/manager"
Expand Down Expand Up @@ -237,21 +237,21 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

var geoEnricher processor.PipelineEnricher
var geoEnricher enricher.PipelineEnricher
if config.GetBool("GeoEnrichment.enabled", false) {

dbProvider, err := processor.NewGeoDBProvider(config, a.log)
dbProvider, err := enricher.NewGeoDBProvider(config, a.log)
if err != nil {
return fmt.Errorf("creating new instance of db provider: %w", err)
}

geoEnricher, err = processor.NewGeoEnricher(dbProvider, config, a.log, stats.Default)
geoEnricher, err = enricher.NewGeoEnricher(dbProvider, config, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
}

} else {
geoEnricher = proc.NewNoOpGeoEnricher()
geoEnricher = enricher.NoOpGeoEnricher{}
}

proc := processor.New(
Expand Down
30 changes: 14 additions & 16 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,36 @@ import (

"github.com/go-chi/chi/v5"

"github.com/bugsnag/bugsnag-go/v2"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/archiver"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/router/throttler"
schema_forwarder "github.com/rudderlabs/rudder-server/schema-forwarder"
"github.com/rudderlabs/rudder-server/utils/payload"
"github.com/rudderlabs/rudder-server/utils/types/deployment"

"golang.org/x/sync/errgroup"

"github.com/bugsnag/bugsnag-go/v2"

kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/archiver"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
proc "github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routerManager "github.com/rudderlabs/rudder-server/router/manager"
"github.com/rudderlabs/rudder-server/router/throttler"
schema_forwarder "github.com/rudderlabs/rudder-server/schema-forwarder"
"github.com/rudderlabs/rudder-server/services/db"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/payload"
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"golang.org/x/sync/errgroup"
)

// processorApp is the type for Processor type implementation
Expand Down Expand Up @@ -230,21 +228,21 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

var geoEnricher proc.PipelineEnricher
var geoEnricher enricher.PipelineEnricher
if config.GetBool("GeoEnrichment.enabled", false) {

dbProvider, err := proc.NewGeoDBProvider(config.Default, a.log)
dbProvider, err := enricher.NewGeoDBProvider(config.Default, a.log)
if err != nil {
return fmt.Errorf("creating new instance of db provider: %w", err)
}

geoEnricher, err = proc.NewGeoEnricher(dbProvider, config.Default, a.log, stats.Default)
geoEnricher, err = enricher.NewGeoEnricher(dbProvider, config.Default, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
}

} else {
geoEnricher = proc.NewNoOpGeoEnricher()
geoEnricher = enricher.NoOpGeoEnricher{}
}

p := proc.New(
Expand Down
25 changes: 11 additions & 14 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,35 @@ import (
"testing"
"time"

arc "github.com/rudderlabs/rudder-server/archiver"
mock_jobs_forwarder "github.com/rudderlabs/rudder-server/mocks/jobs-forwarder"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"

destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
arc "github.com/rudderlabs/rudder-server/archiver"
backendConfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
mock_jobs_forwarder "github.com/rudderlabs/rudder-server/mocks/jobs-forwarder"
mocksTransformer "github.com/rudderlabs/rudder-server/mocks/processor/transformer"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/processor/stash"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routermanager "github.com/rudderlabs/rudder-server/router/manager"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -214,7 +211,7 @@ func TestDynamicClusterManager(t *testing.T) {
rsources.NewNoOpService(),
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
processor.NewNoOpGeoEnricher(),
enricher.NoOpGeoEnricher{},
processor.WithFeaturesRetryMaxAttempts(0),
)
processor.BackendConfig = mockBackendConfig
Expand Down
4 changes: 3 additions & 1 deletion backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ type SourceT struct {
DgSourceTrackingPlanConfig DgSourceTrackingPlanConfigT
Transient bool
EventSchemasEnabled bool
GeoEnrichmentEnabled bool
GeoEnrichment struct {
Enabled bool
}
}

func (s *SourceT) IsReplaySource() bool {
Expand Down
9 changes: 9 additions & 0 deletions internal/enricher/enricher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package enricher

import (
"github.com/rudderlabs/rudder-server/utils/types"
)

type PipelineEnricher interface {
Enrich(sourceId string, request *types.GatewayBatchRequest) error
}
117 changes: 49 additions & 68 deletions processor/geolocation.go → internal/enricher/geolocation.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package processor
package enricher

import (
"context"
"errors"
"fmt"
"os"
"strconv"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/processor/geolocation"
"github.com/rudderlabs/rudder-server/services/geolocation"
"github.com/rudderlabs/rudder-server/utils/types"
)

type PipelineEnricher interface {
Enrich(sourceId string, request *types.GatewayBatchRequest) error
}

type FileManager interface {
Download(context.Context, *os.File, string) error
type GeoDBProvider interface {
GetDB(string) (string, error)
}

type Geolocation struct {
Expand All @@ -32,36 +29,7 @@ type Geolocation struct {
Timezone string `json:"timezone"`
}

func extractMinimalGeoInfo(geocity *geolocation.GeoCity) *Geolocation {
if geocity == nil {
return nil
}

toReturn := &Geolocation{
City: geocity.City.Names["en"],
Country: geocity.Country.IsoCode,
Postal: geocity.Postal.Code,
Timezone: geocity.Location.TimeZone,
}

if len(geocity.Subdivisions) > 0 {
toReturn.Region = geocity.Subdivisions[0].Names["en"]
}

// default values of latitude and longitude can give
// incorrect result, so we have casted them in pointers so we know
// when the value is missing.
if geocity.Location.Latitude != nil && geocity.Location.Longitude != nil {
toReturn.Location = fmt.Sprintf("%f,%f",
*geocity.Location.Latitude,
*geocity.Location.Longitude,
)
}

return toReturn
}

type geolocationEnricher struct {
type geoEnricher struct {
fetcher geolocation.GeoFetcher
logger logger.Logger
stats stats.Stats
Expand All @@ -74,17 +42,18 @@ func NewGeoEnricher(
statClient stats.Stats,
) (PipelineEnricher, error) {
var upstreamDBKey = config.GetString("Geolocation.db.key.path", "geolite2City.mmdb")

localPath, err := dbProvider.GetDB(upstreamDBKey)
if err != nil {
return nil, fmt.Errorf("getting db from upstream: %w", err)
}

fetcher, err := geolocation.NewMaxmindGeoFetcher(localPath)
fetcher, err := geolocation.NewMaxmindDBReader(localPath)
if err != nil {
return nil, fmt.Errorf("creating new instance of maxmind's geolocation fetcher: %w", err)
}

return &geolocationEnricher{
return &geoEnricher{
fetcher: fetcher,
stats: statClient,
logger: log.Child("geolocation"),
Expand All @@ -94,7 +63,7 @@ func NewGeoEnricher(
// Enrich function runs on a request of GatewayBatchRequest which contains
// multiple singular events from a source. The enrich function augments the
// geolocation information per event based on IP address.
func (e *geolocationEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
func (e *geoEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
e.logger.Debugw("received a call to enrich gateway events for source", "sourceId", sourceId)

if request.RequestIP == "" {
Expand All @@ -114,57 +83,40 @@ func (e *geolocationEnricher) Enrich(sourceId string, request *types.GatewayBatc

geoip, err := e.fetcher.GeoIP(request.RequestIP)
if err != nil {
switch {
// InvalidIP address being sent for lookup, log it for further analysis.
case errors.Is(err, geolocation.ErrInvalidIP):
e.logger.Errorw("unable to enrich the request with geolocation", "error", err, "ip", request.RequestIP)
e.stats.NewTaggedStat(
"proc_geo_enricher_invalid_ip",
stats.CountType,
stats.Tags{"sourceId": sourceId}).Increment()
default:
e.logger.Errorw("unable to enrich the request with geolocation", "error", err)
e.stats.NewTaggedStat(
"proc_geo_enricher_geoip_lookup_failed",
stats.CountType,
stats.Tags{"sourceId": sourceId}).Increment()
}
e.stats.NewTaggedStat(
"proc_geo_enricher_geoip_lookup_failed",
stats.CountType,
stats.Tags{
"sourceId": sourceId,
"validIP": strconv.FormatBool(errors.Is(err, geolocation.ErrInvalidIP)),
}).Increment()

return fmt.Errorf("unable to enrich the request with geolocation: %w", err)
}

// for every event if we have context object set
// only then we add to geo section
for _, event := range request.Batch {

if context, ok := event["context"].(map[string]interface{}); ok {
context["geo"] = extractMinimalGeoInfo(geoip)
context["geo"] = extractGeolocationData(geoip)
}
}

return nil
}

func NewNoOpGeoEnricher() PipelineEnricher {
return NoOpGeoEnricher{}
}

type NoOpGeoEnricher struct {
}

func (NoOpGeoEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
func (e NoOpGeoEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
return nil
}

type GeoDBProvider interface {
GetDB(string) (string, error)
}

type geoDBProviderImpl struct {
bucket string
region string
downloadPath string
s3Manager FileManager
s3Manager filemanager.FileManager
}

// GetDB simply fetches the database from the upstream located at the key defined
Expand Down Expand Up @@ -209,3 +161,32 @@ func NewGeoDBProvider(conf *config.Config, log logger.Logger) (GeoDBProvider, er
s3Manager: manager,
}, nil
}

func extractGeolocationData(geocity *geolocation.GeoCity) *Geolocation {
if geocity == nil {
return nil
}

toReturn := &Geolocation{
City: geocity.City.Names["en"],
Country: geocity.Country.IsoCode,
Postal: geocity.Postal.Code,
Timezone: geocity.Location.TimeZone,
}

if len(geocity.Subdivisions) > 0 {
toReturn.Region = geocity.Subdivisions[0].Names["en"]
}

// default values of latitude and longitude can give
// incorrect result, so we have casted them in pointers so we know
// when the value is missing.
if geocity.Location.Latitude != nil && geocity.Location.Longitude != nil {
toReturn.Location = fmt.Sprintf("%f,%f",
*geocity.Location.Latitude,
*geocity.Location.Longitude,
)
}

return toReturn
}
Loading

0 comments on commit a0ebb30

Please sign in to comment.