Skip to content

Commit

Permalink
feat(dm): added tests for the geolocation and maxmind db reader
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed Oct 10, 2023
1 parent ce1fc7f commit a202965
Show file tree
Hide file tree
Showing 17 changed files with 26,459 additions and 150 deletions.
19 changes: 16 additions & 3 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
"github.com/rudderlabs/rudder-server/processor"
proc "github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
routerManager "github.com/rudderlabs/rudder-server/router/manager"
Expand Down Expand Up @@ -236,9 +237,21 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

geoEnricher, err := processor.NewGeoEnricher(config, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
var geoEnricher processor.PipelineEnricher
if config.GetBool("GeoEnrichment.enabled", false) {

dbProvider, err := processor.NewGeoDBProvider(config, a.log)
if err != nil {
return fmt.Errorf("creating new instance of db provider: %w", err)
}

geoEnricher, err = processor.NewGeoEnricher(dbProvider, config, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
}

} else {
geoEnricher = proc.NewNoOpGeoEnricher()
}

proc := processor.New(
Expand Down
45 changes: 41 additions & 4 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/go-chi/chi/v5"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/archiver"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router/throttler"
schema_forwarder "github.com/rudderlabs/rudder-server/schema-forwarder"
"github.com/rudderlabs/rudder-server/utils/payload"
Expand Down Expand Up @@ -229,9 +230,21 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

geoEnricher, err := processor.NewGeoEnricher(config.Default, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
var geoEnricher proc.PipelineEnricher
if config.GetBool("GeoEnrichment.enabled", false) {

dbProvider, err := proc.NewGeoDBProvider(config.Default, a.log)
if err != nil {
return fmt.Errorf("creating new instance of db provider: %w", err)
}

geoEnricher, err = proc.NewGeoEnricher(dbProvider, config.Default, a.log, stats.Default)
if err != nil {
return fmt.Errorf("starting geo enrichment process for pipeline: %w", err)
}

} else {
geoEnricher = proc.NewNoOpGeoEnricher()
}

p := proc.New(
Expand Down Expand Up @@ -327,6 +340,30 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return g.Wait()
}

func s3FileDownloader(bucket, region, key, downloadPath string) error {
s3Manager, err := filemanager.NewS3Manager(map[string]interface{}{
"Bucket": bucket,
"Region": region,
}, logger.NewLogger(), func() time.Duration { return 1000 * time.Millisecond })
if err != nil {
return fmt.Errorf("creating a new instance of s3 file manager: %w", err)
}

f, err := os.Create(downloadPath)
if err != nil {
return fmt.Errorf("creating local file for storing database: %w", err)
}

defer f.Close()

err = s3Manager.Download(context.Background(), f, key)
if err != nil {
return fmt.Errorf("downloading instance of database from upstream: %w", err)
}

return nil
}

func (a *processorApp) startHealthWebHandler(ctx context.Context, db *jobsdb.Handle) error {
// Port where Processor health handler is running
a.log.Infof("Starting in %d", a.config.http.webPort)
Expand Down
4 changes: 3 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ func TestDynamicClusterManager(t *testing.T) {
rsources.NewNoOpService(),
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
processor.WithFeaturesRetryMaxAttempts(0))
processor.NewNoOpGeoEnricher(),
processor.WithFeaturesRetryMaxAttempts(0),
)
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
Expand Down
191 changes: 139 additions & 52 deletions processor/geolocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package processor

import (
"context"
"errors"
"fmt"
"net"
"os"
"time"

Expand All @@ -16,109 +16,196 @@ import (
)

type PipelineEnricher interface {
Enrich(sourceId string, request *types.GatewayBatchRequest)
Enrich(sourceId string, request *types.GatewayBatchRequest) error
}

type FileManager interface {
Download(context.Context, *os.File, string) error
}

type geolocationEnricher struct {
fetcher geolocation.GeoFetcher
logger logger.Logger
fileManager FileManager
stats stats.Stats
type Geolocation struct {
City string `json:"city"`
Country string `json:"country"`
Region string `json:"region"`
Postal string `json:"postal"`
Location string `json:"location"`
Timezone string `json:"timezone"`
}

func NewGeoEnricher(config *config.Config, log logger.Logger, stat stats.Stats) (PipelineEnricher, error) {
func extractMinimalGeoInfo(geocity *geolocation.GeoCity) *Geolocation {
if geocity == nil {
return nil
}

var (
bucket = config.GetString("Geolocation.db.bucket", "rudderstack-geolocation")
region = config.GetString("Geolocation.db.bucket.region", "us-east-1")
key = config.GetString("Geolocation.db.key.path", "geolite2City.mmdb")
downloadPath = config.GetString("Geolocation.db.downloadPath", "geolite2City.mmdb")
)
toReturn := &Geolocation{
City: geocity.City.Names["en"],
Country: geocity.Country.IsoCode,
Postal: geocity.Postal.Code,
Timezone: geocity.Location.TimeZone,
}

s3Manager, err := filemanager.NewS3Manager(map[string]interface{}{
"Bucket": bucket,
"Region": region,
}, log, func() time.Duration { return 1000 * time.Millisecond })
if err != nil {
return nil, fmt.Errorf("creating a new instance of s3 file manager: %w", err)
if len(geocity.Subdivisions) > 0 {
toReturn.Region = geocity.Subdivisions[0].Names["en"]
}

f, err := os.Create(downloadPath)
if err != nil {
return nil, fmt.Errorf("creating local file for storing database: %w", err)
// default values of latitude and longitude can give
// incorrect result, so we have casted them in pointers so we know
// when the value is missing.
if geocity.Location.Latitude != nil && geocity.Location.Longitude != nil {
toReturn.Location = fmt.Sprintf("%f,%f",
*geocity.Location.Latitude,
*geocity.Location.Longitude,
)
}

defer f.Close()
return toReturn
}

type geolocationEnricher struct {
fetcher geolocation.GeoFetcher
logger logger.Logger
stats stats.Stats
}

err = s3Manager.Download(context.Background(), f, key)
func NewGeoEnricher(
dbProvider GeoDBProvider,
config *config.Config,
log logger.Logger,
statClient stats.Stats,
) (PipelineEnricher, error) {
var upstreamDBKey = config.GetString("Geolocation.db.key.path", "geolite2City.mmdb")
localPath, err := dbProvider.GetDB(upstreamDBKey)
if err != nil {
return nil, fmt.Errorf("downloading instance of database from upstream: %w", err)
return nil, fmt.Errorf("getting db from upstream: %w", err)
}

fetcher, err := geolocation.NewMaxmindGeoFetcher(downloadPath)
fetcher, err := geolocation.NewMaxmindGeoFetcher(localPath)
if err != nil {
return nil, fmt.Errorf("creating new instance of maxmind's geolocation fetcher: %w", err)
}

return &geolocationEnricher{
fetcher: fetcher,
fileManager: s3Manager,
stats: stat,
logger: log.Child("geolocation"),
fetcher: fetcher,
stats: statClient,
logger: log.Child("geolocation"),
}, nil
}

// Enrich function runs on a request of GatewayBatchRequest which contains
// multiple singular events from a source. The enrich function augments the
// geolocation information per event based on IP address.
func (e *geolocationEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) {
e.logger.Debugw("received a call to enrich gateway events for the customer", "sourceId", sourceId)
func (e *geolocationEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
e.logger.Debugw("received a call to enrich gateway events for source", "sourceId", sourceId)

if request.RequestIP == "" {
e.stats.NewTaggedStat("proc_geo_enrincher_empty_ip", stats.CountType, stats.Tags{
"sourceId": sourceId,
}).Increment()
return

return nil
}

defer func(from time.Time) {
e.stats.NewTaggedStat(
"pro_geo_enricher_request_latency",
"proc_geo_enricher_request_latency",
stats.TimerType,
stats.Tags{"sourceId": sourceId}).Since(from)
}(time.Now())

parsedIP := net.ParseIP(request.RequestIP)
if parsedIP == nil {
e.stats.NewTaggedStat("proc_geo_enricher_invalid_ip", stats.CountType, stats.Tags{
"sourceId": sourceId,
}).Increment()
return
}

geoip, err := e.fetcher.GeoIP(parsedIP)
geoip, err := e.fetcher.GeoIP(request.RequestIP)
if err != nil {
e.logger.Errorw("unable to enrich the request with geolocation", "error", err)
e.stats.NewTaggedStat(
"proc_geo_enricher_geoip_lookup_failed",
stats.CountType,
stats.Tags{"sourceId": sourceId}).Increment()
return
switch {
// InvalidIP address being sent for lookup, log it for further analysis.
case errors.Is(err, geolocation.ErrInvalidIP):
e.logger.Errorw("unable to enrich the request with geolocation", "error", err, "ip", request.RequestIP)
e.stats.NewTaggedStat(
"proc_geo_enricher_invalid_ip",
stats.CountType,
stats.Tags{"sourceId": sourceId}).Increment()
default:
e.logger.Errorw("unable to enrich the request with geolocation", "error", err)
e.stats.NewTaggedStat(
"proc_geo_enricher_geoip_lookup_failed",
stats.CountType,
stats.Tags{"sourceId": sourceId}).Increment()
}

return fmt.Errorf("unable to enrich the request with geolocation: %w", err)
}

// for every event if we have context object set
// only then we add to geo section
for _, event := range request.Batch {

if context, ok := event["context"].(map[string]interface{}); ok {
context["geo"] = geoip
context["geo"] = extractMinimalGeoInfo(geoip)
}
}

return nil
}

func NewNoOpGeoEnricher() PipelineEnricher {
return NoOpGeoEnricher{}
}

type NoOpGeoEnricher struct {
}

func (NoOpGeoEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) {
func (NoOpGeoEnricher) Enrich(sourceId string, request *types.GatewayBatchRequest) error {
return nil
}

type GeoDBProvider interface {
GetDB(string) (string, error)
}

type geoDBProviderImpl struct {
bucket string
region string
downloadPath string
s3Manager FileManager
}

// GetDB simply fetches the database from the upstream located at the key defined
// in the argument.
func (p *geoDBProviderImpl) GetDB(key string) (string, error) {
f, err := os.Open(p.downloadPath)
if err != nil {
return "", fmt.Errorf("creating a file to store db contents: %w", err)
}

defer f.Close()

err = p.s3Manager.Download(context.Background(), f, key)
if err != nil {
return "", fmt.Errorf("downloading db from upstream and storing in file: %w", err)
}

return p.downloadPath, nil
}

func NewGeoDBProvider(conf *config.Config, log logger.Logger) (GeoDBProvider, error) {
var (
bucket = config.GetString("Geolocation.db.bucket", "rudderstack-geolocation")
region = config.GetString("Geolocation.db.bucket.region", "us-east-1")
downloadPath = config.GetString("Geolocation.db.downloadPath", "geolite2City.mmdb")
)

manager, err := filemanager.NewS3Manager(map[string]interface{}{
"Bucket": bucket,
"Region": region}, log, func() time.Duration {
return 1000 * time.Millisecond
})

if err != nil {
return nil, fmt.Errorf("creating a new s3 manager client: %w", err)
}

return &geoDBProviderImpl{
bucket: bucket,
region: region,
downloadPath: downloadPath,
s3Manager: manager,
}, nil
}
Loading

0 comments on commit a202965

Please sign in to comment.