From c8f3c558d5e3c807b1055d13d13b588e0f04a5b8 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Mon, 22 Jul 2024 15:01:50 +0530 Subject: [PATCH 1/3] chore: refactor dedup package --- mocks/services/dedup/mock_dedup.go | 4 +- processor/processor.go | 3 +- services/dedup/{ => badger}/badger.go | 56 ++++++++++++++++++++---- services/dedup/dedup.go | 62 ++++----------------------- services/dedup/dedup_test.go | 23 +++++----- services/dedup/types/types.go | 6 +++ 6 files changed, 79 insertions(+), 75 deletions(-) rename services/dedup/{ => badger}/badger.go (61%) create mode 100644 services/dedup/types/types.go diff --git a/mocks/services/dedup/mock_dedup.go b/mocks/services/dedup/mock_dedup.go index 44f61fd905..0dc3e27d35 100644 --- a/mocks/services/dedup/mock_dedup.go +++ b/mocks/services/dedup/mock_dedup.go @@ -8,7 +8,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - dedup "github.com/rudderlabs/rudder-server/services/dedup" + types "github.com/rudderlabs/rudder-server/services/dedup/types" ) // MockDedup is a mock of Dedup interface. @@ -61,7 +61,7 @@ func (mr *MockDedupMockRecorder) Commit(arg0 interface{}) *gomock.Call { } // Set mocks base method. -func (m *MockDedup) Set(arg0 dedup.KeyValue) (bool, int64) { +func (m *MockDedup) Set(arg0 types.KeyValue) (bool, int64) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Set", arg0) ret0, _ := ret[0].(bool) diff --git a/processor/processor.go b/processor/processor.go index f25f66bf8e..cd04f9da6c 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -44,6 +44,7 @@ import ( destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/dedup" + dedupTypes "github.com/rudderlabs/rudder-server/services/dedup/types" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/services/rsources" @@ -1708,7 +1709,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf p := payloadFunc() messageSize := int64(len(p)) dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) - if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: dedupKey, Value: messageSize}); !ok { + if ok, previousSize := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 continue diff --git a/services/dedup/badger.go b/services/dedup/badger/badger.go similarity index 61% rename from services/dedup/badger.go rename to services/dedup/badger/badger.go index 37de57a97b..aa2bfd3764 100644 --- a/services/dedup/badger.go +++ b/services/dedup/badger/badger.go @@ -1,20 +1,22 @@ -package dedup +package badger import ( "strconv" "time" "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "github.com/rudderlabs/rudder-go-kit/config" - + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/rruntime" + "github.com/rudderlabs/rudder-server/services/dedup/types" "github.com/rudderlabs/rudder-server/utils/misc" ) -type badgerDB struct { +type BadgerDB struct { stats stats.Stats logger loggerForBadger badgerDB *badger.DB @@ -25,7 +27,37 @@ type badgerDB struct { opts badger.Options } -func (d *badgerDB) Get(key string) (int64, bool) { +func NewBadgerDB(path string) *BadgerDB { + dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") + + log := logger.NewLogger().Child("dedup") + badgerOpts := badger. + DefaultOptions(path). + WithCompression(options.None). + WithIndexCacheSize(16 << 20). // 16mb + WithNumGoroutines(1). + WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)). + WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). + WithBlockCacheSize(0). + WithNumVersionsToKeep(1). + WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). + WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). + WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)). + WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false)) + + db := &BadgerDB{ + stats: stats.Default, + logger: loggerForBadger{log}, + path: path, + gcDone: make(chan struct{}), + close: make(chan struct{}), + window: dedupWindow, + opts: badgerOpts, + } + return db +} + +func (d *BadgerDB) Get(key string) (int64, bool) { var payloadSize int64 var found bool err := d.badgerDB.View(func(txn *badger.Txn) error { @@ -45,7 +77,7 @@ func (d *badgerDB) Get(key string) (int64, bool) { return payloadSize, found } -func (d *badgerDB) Set(kvs []KeyValue) error { +func (d *BadgerDB) Set(kvs []types.KeyValue) error { txn := d.badgerDB.NewTransaction(true) for _, message := range kvs { value := strconv.FormatInt(message.Value, 10) @@ -66,13 +98,13 @@ func (d *badgerDB) Set(kvs []KeyValue) error { return txn.Commit() } -func (d *badgerDB) Close() { +func (d *BadgerDB) Close() { close(d.close) <-d.gcDone _ = d.badgerDB.Close() } -func (d *badgerDB) start() { +func (d *BadgerDB) Start() { var err error d.badgerDB, err = badger.Open(d.opts) @@ -85,7 +117,7 @@ func (d *badgerDB) start() { }) } -func (d *badgerDB) gcLoop() { +func (d *BadgerDB) gcLoop() { for { select { case <-d.close: @@ -113,3 +145,11 @@ func (d *badgerDB) gcLoop() { } } + +type loggerForBadger struct { + logger.Logger +} + +func (l loggerForBadger) Warningf(fmt string, args ...interface{}) { + l.Warnf(fmt, args...) +} diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 3fbc8f2d2f..3ddf6ddd77 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -5,19 +5,12 @@ package dedup import ( "fmt" "sync" - "time" - "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/badger/v4/options" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/services/dedup/badger" + "github.com/rudderlabs/rudder-server/services/dedup/types" "github.com/rudderlabs/rudder-server/utils/misc" ) -type OptFn func(*badgerDB) - // DefaultPath returns the default path for the deduplication service's badger DB func DefaultPath() string { badgerPathName := "/badgerdbv4" @@ -30,33 +23,8 @@ func DefaultPath() string { // New creates a new deduplication service. The service needs to be closed after use. func New(path string) Dedup { - dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") - - log := logger.NewLogger().Child("dedup") - badgerOpts := badger. - DefaultOptions(path). - WithCompression(options.None). - WithIndexCacheSize(16 << 20). // 16mb - WithNumGoroutines(1). - WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)). - WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). - WithBlockCacheSize(0). - WithNumVersionsToKeep(1). - WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). - WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). - WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)). - WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false)) - - db := &badgerDB{ - stats: stats.Default, - logger: loggerForBadger{log}, - path: path, - gcDone: make(chan struct{}), - close: make(chan struct{}), - window: dedupWindow, - opts: badgerOpts, - } - db.start() + db := badger.NewBadgerDB(path) + db.Start() return &dedup{ badgerDB: db, cache: make(map[string]int64), @@ -66,7 +34,7 @@ func New(path string) Dedup { // Dedup is the interface for deduplication service type Dedup interface { // Set returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value - Set(kv KeyValue) (bool, int64) + Set(kv types.KeyValue) (bool, int64) // Commit commits a list of previously set keys to the DB Commit(keys []string) error @@ -74,18 +42,14 @@ type Dedup interface { // Close closes the deduplication service Close() } -type KeyValue struct { - Key string - Value int64 -} type dedup struct { - badgerDB *badgerDB + badgerDB *badger.BadgerDB cacheMu sync.Mutex cache map[string]int64 } -func (d *dedup) Set(kv KeyValue) (bool, int64) { +func (d *dedup) Set(kv types.KeyValue) (bool, int64) { d.cacheMu.Lock() defer d.cacheMu.Unlock() if previous, found := d.cache[kv.Key]; found { @@ -102,13 +66,13 @@ func (d *dedup) Commit(keys []string) error { d.cacheMu.Lock() defer d.cacheMu.Unlock() - kvs := make([]KeyValue, len(keys)) + kvs := make([]types.KeyValue, len(keys)) for i, key := range keys { value, ok := d.cache[key] if !ok { return fmt.Errorf("key %v has not been previously set", key) } - kvs[i] = KeyValue{Key: key, Value: value} + kvs[i] = types.KeyValue{Key: key, Value: value} } err := d.badgerDB.Set(kvs) @@ -123,11 +87,3 @@ func (d *dedup) Commit(keys []string) error { func (d *dedup) Close() { d.badgerDB.Close() } - -type loggerForBadger struct { - logger.Logger -} - -func (l loggerForBadger) Warningf(fmt string, args ...interface{}) { - l.Warnf(fmt, args...) -} diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index 51f87e2a56..b791fba581 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -16,6 +16,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/services/dedup" + "github.com/rudderlabs/rudder-server/services/dedup/types" ) func Test_Dedup(t *testing.T) { @@ -30,29 +31,29 @@ func Test_Dedup(t *testing.T) { defer d.Close() t.Run("if message id is not present in cache and badger db", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "a", Value: 1}) + found, _ := d.Set(types.KeyValue{Key: "a", Value: 1}) require.Equal(t, true, found) // Checking it again should give us the previous value from the cache - found, value := d.Set(dedup.KeyValue{Key: "a", Value: 2}) + found, value := d.Set(types.KeyValue{Key: "a", Value: 2}) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("if message is committed, previous value should always return", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "b", Value: 1}) + found, _ := d.Set(types.KeyValue{Key: "b", Value: 1}) require.Equal(t, true, found) err := d.Commit([]string{"a"}) require.NoError(t, err) - found, value := d.Set(dedup.KeyValue{Key: "b", Value: 2}) + found, value := d.Set(types.KeyValue{Key: "b", Value: 2}) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("committing a messageid not present in cache", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "c", Value: 1}) + found, _ := d.Set(types.KeyValue{Key: "c", Value: 1}) require.Equal(t, true, found) err := d.Commit([]string{"d"}) @@ -72,17 +73,17 @@ func Test_Dedup_Window(t *testing.T) { d := dedup.New(dbPath) defer d.Close() - found, _ := d.Set(dedup.KeyValue{Key: "to be deleted", Value: 1}) + found, _ := d.Set(types.KeyValue{Key: "to be deleted", Value: 1}) require.Equal(t, true, found) err := d.Commit([]string{"to be deleted"}) require.NoError(t, err) - found, _ = d.Set(dedup.KeyValue{Key: "to be deleted", Value: 2}) + found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 2}) require.Equal(t, false, found) require.Eventually(t, func() bool { - found, _ = d.Set(dedup.KeyValue{Key: "to be deleted", Value: 3}) + found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 3}) return found }, 2*time.Second, 100*time.Millisecond) } @@ -102,7 +103,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { messages := make([]string, size) for i := 0; i < size; i++ { messages[i] = uuid.New().String() - d.Set(dedup.KeyValue{Key: messages[i], Value: int64(i + 1)}) + d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)}) } err := d.Commit(messages) require.NoError(t, err) @@ -122,11 +123,11 @@ func Benchmark_Dedup(b *testing.B) { b.Run("no duplicates 1000 batch unique", func(b *testing.B) { batchSize := 1000 - msgIDs := make([]dedup.KeyValue, batchSize) + msgIDs := make([]types.KeyValue, batchSize) keys := make([]string, 0) for i := 0; i < b.N; i++ { - msgIDs[i%batchSize] = dedup.KeyValue{ + msgIDs[i%batchSize] = types.KeyValue{ Key: uuid.New().String(), Value: int64(i + 1), } diff --git a/services/dedup/types/types.go b/services/dedup/types/types.go new file mode 100644 index 0000000000..ab8b351afc --- /dev/null +++ b/services/dedup/types/types.go @@ -0,0 +1,6 @@ +package types + +type KeyValue struct { + Key string + Value int64 +} From 35a0edfed7e2cd6bd303a73c4ca97a7e99d89061 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 23 Jul 2024 17:38:24 +0530 Subject: [PATCH 2/3] chore: review comments --- mocks/services/dedup/mock_dedup.go | 5 ++- processor/processor.go | 2 +- processor/processor_test.go | 2 +- .../yandexmetrica/yandexmetrica_test.go | 6 +-- services/dedup/badger/badger.go | 38 +++++++++++++------ services/dedup/dedup.go | 14 ++++--- services/dedup/dedup_test.go | 34 ++++++++++------- 7 files changed, 63 insertions(+), 38 deletions(-) diff --git a/mocks/services/dedup/mock_dedup.go b/mocks/services/dedup/mock_dedup.go index b252034541..db321ec21e 100644 --- a/mocks/services/dedup/mock_dedup.go +++ b/mocks/services/dedup/mock_dedup.go @@ -66,12 +66,13 @@ func (mr *MockDedupMockRecorder) Commit(arg0 any) *gomock.Call { } // Set mocks base method. -func (m *MockDedup) Set(arg0 types.KeyValue) (bool, int64) { +func (m *MockDedup) Set(arg0 types.KeyValue) (bool, int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Set", arg0) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(int64) - return ret0, ret1 + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // Set indicates an expected call of Set. diff --git a/processor/processor.go b/processor/processor.go index 1bea4225c9..c1f738844c 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1709,7 +1709,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf p := payloadFunc() messageSize := int64(len(p)) dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) - if ok, previousSize := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok { + if ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok && err != nil { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 continue diff --git a/processor/processor_test.go b/processor/processor_test.go index 03baee4782..222b07f6c7 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2471,7 +2471,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) - c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0)).After(callUnprocessed).Times(3) + c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3) c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1) // We expect one transform call to destination A, after callUnprocessed. diff --git a/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go b/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go index 70abe0215e..07ae04f714 100644 --- a/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go +++ b/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go @@ -46,15 +46,15 @@ var ( } ) -var _ = Describe("Yandexmetrica", func() { - Describe(("NewManager function test"), func() { +var _ = Describe("Antisymmetric", func() { + Describe("NewManager function test", func() { It("should return yandexmetrica manager", func() { yandexmetrica, err := yandexmetrica.NewManager(destination, backendconfig.DefaultBackendConfig) Expect(err).To(BeNil()) Expect(yandexmetrica).NotTo(BeNil()) }) }) - Describe(("Upload function test"), func() { + Describe("Upload function test", func() { It("Testing a successful scenario", func() { cache := oauthv2.NewCache() ctrl := gomock.NewController(GinkgoT()) diff --git a/services/dedup/badger/badger.go b/services/dedup/badger/badger.go index 181a7f6bf3..30f4a2af8d 100644 --- a/services/dedup/badger/badger.go +++ b/services/dedup/badger/badger.go @@ -3,6 +3,7 @@ package badger import ( "fmt" "strconv" + "sync" "time" "github.com/dgraph-io/badger/v4" @@ -26,6 +27,7 @@ type BadgerDB struct { gcDone chan struct{} path string opts badger.Options + once sync.Once } // DefaultPath returns the default path for the deduplication service's badger DB @@ -68,10 +70,15 @@ func NewBadgerDB(path string) *BadgerDB { return db } -func (d *BadgerDB) Get(key string) (int64, bool) { +func (d *BadgerDB) Get(key string) (int64, bool, error) { var payloadSize int64 var found bool - err := d.badgerDB.View(func(txn *badger.Txn) error { + var err error + err = d.init() + if err != nil { + return 0, false, err + } + err = d.badgerDB.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err != nil { return err @@ -83,12 +90,16 @@ func (d *BadgerDB) Get(key string) (int64, bool) { return nil }) if err != nil && err != badger.ErrKeyNotFound { - panic(err) + return 0, false, err } - return payloadSize, found + return payloadSize, found, nil } func (d *BadgerDB) Set(kvs []types.KeyValue) error { + err := d.init() + if err != nil { + return err + } txn := d.badgerDB.NewTransaction(true) for _, message := range kvs { value := strconv.FormatInt(message.Value, 10) @@ -115,17 +126,20 @@ func (d *BadgerDB) Close() { _ = d.badgerDB.Close() } -func (d *BadgerDB) Start() { +func (d *BadgerDB) init() error { var err error - d.badgerDB, err = badger.Open(d.opts) - if err != nil { - panic(err) - } - rruntime.Go(func() { - d.gcLoop() - close(d.gcDone) + d.once.Do(func() { + d.badgerDB, err = badger.Open(d.opts) + if err != nil { + return + } + rruntime.Go(func() { + d.gcLoop() + close(d.gcDone) + }) }) + return err } func (d *BadgerDB) gcLoop() { diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 89d3b41f7b..6efc08c99e 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -13,7 +13,6 @@ import ( // New creates a new deduplication service. The service needs to be closed after use. func New() Dedup { db := badger.NewBadgerDB(badger.DefaultPath()) - db.Start() return &dedup{ badgerDB: db, cache: make(map[string]int64), @@ -23,7 +22,7 @@ func New() Dedup { // Dedup is the interface for deduplication service type Dedup interface { // Set returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value - Set(kv types.KeyValue) (bool, int64) + Set(kv types.KeyValue) (bool, int64, error) // Commit commits a list of previously set keys to the DB Commit(keys []string) error @@ -38,17 +37,20 @@ type dedup struct { cache map[string]int64 } -func (d *dedup) Set(kv types.KeyValue) (bool, int64) { +func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) { d.cacheMu.Lock() defer d.cacheMu.Unlock() if previous, found := d.cache[kv.Key]; found { - return false, previous + return false, previous, nil + } + previous, found, err := d.badgerDB.Get(kv.Key) + if err != nil { + return false, 0, err } - previous, found := d.badgerDB.Get(kv.Key) if !found { d.cache[kv.Key] = kv.Value } - return !found, previous + return !found, previous, nil } func (d *dedup) Commit(keys []string) error { diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index dd7f641fb6..7d944c6d15 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -31,32 +31,37 @@ func Test_Dedup(t *testing.T) { defer d.Close() t.Run("if message id is not present in cache and badger db", func(t *testing.T) { - found, _ := d.Set(types.KeyValue{Key: "a", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "a", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) // Checking it again should give us the previous value from the cache - found, value := d.Set(types.KeyValue{Key: "a", Value: 2}) + found, value, err := d.Set(types.KeyValue{Key: "a", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("if message is committed, previous value should always return", func(t *testing.T) { - found, _ := d.Set(types.KeyValue{Key: "b", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "b", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"a"}) + err = d.Commit([]string{"a"}) require.NoError(t, err) - found, value := d.Set(types.KeyValue{Key: "b", Value: 2}) + found, value, err := d.Set(types.KeyValue{Key: "b", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("committing a messageid not present in cache", func(t *testing.T) { - found, _ := d.Set(types.KeyValue{Key: "c", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "c", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"d"}) + err = d.Commit([]string{"d"}) require.NotNil(t, err) }) } @@ -73,17 +78,20 @@ func Test_Dedup_Window(t *testing.T) { d := dedup.New() defer d.Close() - found, _ := d.Set(types.KeyValue{Key: "to be deleted", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "to be deleted", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"to be deleted"}) + err = d.Commit([]string{"to be deleted"}) require.NoError(t, err) - found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 2}) + found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Eventually(t, func() bool { - found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 3}) + found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 3}) + require.Nil(t, err) return found }, 2*time.Second, 100*time.Millisecond) } @@ -103,7 +111,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { messages := make([]string, size) for i := 0; i < size; i++ { messages[i] = uuid.New().String() - d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)}) + _, _, _ = d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)}) } err := d.Commit(messages) require.NoError(t, err) @@ -134,7 +142,7 @@ func Benchmark_Dedup(b *testing.B) { if i%batchSize == batchSize-1 || i == b.N-1 { for _, msgID := range msgIDs[:i%batchSize] { - d.Set(msgID) + _, _, _ = d.Set(msgID) keys = append(keys, msgID.Key) } err := d.Commit(keys) From 4206ed460963c9b82ca0e23376cada11f842c40b Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Thu, 25 Jul 2024 18:06:12 +0530 Subject: [PATCH 3/3] chore: review comments addressed --- processor/processor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/processor/processor.go b/processor/processor.go index c1f738844c..3bc4998e48 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1709,7 +1709,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf p := payloadFunc() messageSize := int64(len(p)) dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) - if ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok && err != nil { + ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}) + if err != nil { + panic(err) + } + if !ok { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 continue