Skip to content

Commit

Permalink
feat: introduce scylla dedup (#4922)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Aug 13, 2024
1 parent eae14f8 commit 31a033d
Show file tree
Hide file tree
Showing 19 changed files with 780 additions and 196 deletions.
26 changes: 5 additions & 21 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ concurrency:
jobs:
integration:
name: Integration
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
matrix:
FEATURES: [ oss ,enterprise ]
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -42,7 +38,7 @@ jobs:
RSERVER_ENABLE_MULTITENANCY: false
warehouse-integration:
name: Warehouse Integration
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand All @@ -67,10 +63,6 @@ jobs:
- package: warehouse/integrations/snowflake
destination: snowflake
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
Expand Down Expand Up @@ -107,12 +99,8 @@ jobs:
path: coverage.txt
unit:
name: Unit
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
Expand All @@ -127,7 +115,7 @@ jobs:
path: coverage.txt
package-unit:
name: Package Unit
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand All @@ -154,10 +142,6 @@ jobs:
exclude: services/rsources

steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
Expand Down Expand Up @@ -186,7 +170,7 @@ jobs:
path: coverage.txt
coverage:
name: Coverage
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
needs:
- warehouse-integration
- unit
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/gocql/gocql v1.14.2
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/golang/mock v1.6.0
github.com/gomodule/redigo v1.9.2
Expand Down Expand Up @@ -224,6 +225,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,16 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitfield/gotestdox v0.2.2 h1:x6RcPAbBbErKLnapz1QeAlf3ospg8efBsedU93CDsnE=
github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8eqRJ3N+9pY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow=
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE=
github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bobg/gcsobj v0.1.2/go.mod h1:vS49EQ1A1Ib8FgrL58C8xXYZyOCR2TgzAdopy6/ipa8=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
Expand Down Expand Up @@ -755,6 +759,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 h1:CWyXh/jylQWp2dtiV33mY4iSSp6
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLmrppXsfT07oC8k0XNDxd8sVU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 h1:NEoabXt33PDWK4fXryK4e+XX+fSKDmmu9vg3yb9YI2M=
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9/go.mod h1:fQVdB2mFZBhPW1D5Abej41LMvrErARGrrdjOnKbm5yw=
github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
Expand Down Expand Up @@ -1149,6 +1155,8 @@ github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWR
github.com/samber/lo v1.46.0 h1:w8G+oaCPgz1PoCJztqymCFaKwXt+5cCXn51uPxExFfQ=
github.com/samber/lo v1.46.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scylladb/gocql v1.14.2 h1:IBPtfJFcRDzifCjXYMtrZ14oQ7OqpqQjwITQCwtGZsc=
github.com/scylladb/gocql v1.14.2/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE=
github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs=
Expand Down
12 changes: 6 additions & 6 deletions mocks/services/dedup/mock_dedup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (proc *LifecycleManager) Start() error {
proc.Handle.transformer = proc.Transformer
}

proc.Handle.Setup(
if err := proc.Handle.Setup(
proc.BackendConfig,
proc.gatewayDB,
proc.routerDB,
Expand All @@ -74,7 +74,9 @@ func (proc *LifecycleManager) Start() error {
proc.transDebugger,
proc.enrichers,
proc.trackedUsersReporter,
)
); err != nil {
return err
}

currentCtx, cancel := context.WithCancel(context.Background())
proc.currentCancel = cancel
Expand Down
11 changes: 8 additions & 3 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (proc *Handle) Setup(
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersReporter trackedusers.UsersReporter,
) {
) error {
proc.reporting = reporting
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
Expand Down Expand Up @@ -614,7 +614,11 @@ func (proc *Handle) Setup(
})
}
if proc.config.enableDedup {
proc.dedup = dedup.New()
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
if err != nil {
return err
}
}
proc.sourceObservers = []sourceObserver{delayed.NewEventStats(proc.statsFactory, proc.conf)}
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -643,6 +647,7 @@ func (proc *Handle) Setup(
}))

proc.crashRecover()
return nil
}

func (proc *Handle) setupReloadableVars() {
Expand Down Expand Up @@ -1708,7 +1713,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize})
ok, previousSize, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize, WorkspaceID: batchEvent.WorkspaceId})
if err != nil {
panic(err)
}
Expand Down
29 changes: 16 additions & 13 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
Expand All @@ -32,6 +29,7 @@ import (

"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
Expand Down Expand Up @@ -78,7 +76,7 @@ type mockTrackedUsersReporter struct {
}
}

func (m *mockTrackedUsersReporter) ReportUsers(ctx context.Context, reports []*trackedusers.UsersReport, tx *Tx) error {
func (m *mockTrackedUsersReporter) ReportUsers(_ context.Context, reports []*trackedusers.UsersReport, _ *Tx) error {
m.reportCalls = append(m.reportCalls, struct {
reportedReports []*trackedusers.UsersReport
}{reportedReports: reports})
Expand Down Expand Up @@ -1874,7 +1872,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1893,6 +1891,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1905,7 +1904,7 @@ var _ = Describe("Processor", Ordered, func() {

c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1924,6 +1923,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1941,7 +1941,7 @@ var _ = Describe("Processor", Ordered, func() {

processor := prepareHandle(NewHandle(config.Default, mockTransformer))

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1960,6 +1960,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand Down Expand Up @@ -2533,7 +2534,7 @@ var _ = Describe("Processor", Ordered, func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)

callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Get(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1)

// We expect one transform call to destination A, after callUnprocessed.
Expand Down Expand Up @@ -3044,7 +3045,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3063,7 +3064,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)

Expect(err).To(BeNil())
setMainLoopTimeout(processor, 1*time.Second)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down Expand Up @@ -3103,7 +3104,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3122,6 +3123,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
defer processor.Shutdown()

processor.config.readLoopSleep = config.SingleValueLoader(time.Millisecond)
Expand Down Expand Up @@ -5111,7 +5113,7 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) {

func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) {
setDisableDedupFeature(processor, enableDedup)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -5130,6 +5132,7 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool)
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
processor.reportingEnabled = enableReporting
processor.sourceObservers = []sourceObserver{c.MockObserver}
}
Expand Down
9 changes: 3 additions & 6 deletions router/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,7 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
// support of array type in params is handled if the
// response from transformers are "," separated
queryParams := req.URL.Query()
for key, val := range requestQueryParams { // list := strings.Split(valString, ",")
// for _, listItem := range list {
// queryParams.Add(key, fmt.Sprint(listItem))
// }
for key, val := range requestQueryParams {
formattedVal := handleQueryParam(val)
queryParams.Add(key, formattedVal)
}
Expand All @@ -195,7 +192,7 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
if err != nil {
return &utils.SendPostResponse{
StatusCode: http.StatusGatewayTimeout,
ResponseBody: []byte(fmt.Sprintf(`504 Unable to make %q request for URL : %q. Error: %s`, requestMethod, postInfo.URL, err.Error())),
ResponseBody: []byte(fmt.Sprintf(`504 Unable to make %q request for URL : %q. Error: %v`, requestMethod, postInfo.URL, err)),
}
}

Expand All @@ -205,7 +202,7 @@ func (network *netHandle) SendPost(ctx context.Context, structData integrations.
if err != nil {
return &utils.SendPostResponse{
StatusCode: resp.StatusCode,
ResponseBody: []byte(fmt.Sprintf(`Failed to read response body for request for URL : %q. Error: %s`, postInfo.URL, err.Error())),
ResponseBody: []byte(fmt.Sprintf(`Failed to read response body for request for URL : %q. Error: %v`, postInfo.URL, err)),
}
}
network.logger.Debug(postInfo.URL, " : ", req.Proto, " : ", resp.Proto, resp.ProtoMajor, resp.ProtoMinor, resp.ProtoAtLeast)
Expand Down
Loading

0 comments on commit 31a033d

Please sign in to comment.