diff --git a/mocks/services/dedup/mock_dedup.go b/mocks/services/dedup/mock_dedup.go index da6623610b..db321ec21e 100644 --- a/mocks/services/dedup/mock_dedup.go +++ b/mocks/services/dedup/mock_dedup.go @@ -12,7 +12,7 @@ package mock_dedup import ( reflect "reflect" - dedup "github.com/rudderlabs/rudder-server/services/dedup" + types "github.com/rudderlabs/rudder-server/services/dedup/types" gomock "go.uber.org/mock/gomock" ) @@ -66,12 +66,13 @@ func (mr *MockDedupMockRecorder) Commit(arg0 any) *gomock.Call { } // Set mocks base method. -func (m *MockDedup) Set(arg0 dedup.KeyValue) (bool, int64) { +func (m *MockDedup) Set(arg0 types.KeyValue) (bool, int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Set", arg0) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(int64) - return ret0, ret1 + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // Set indicates an expected call of Set. diff --git a/processor/processor.go b/processor/processor.go index f25f66bf8e..3bc4998e48 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -44,6 +44,7 @@ import ( destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation" "github.com/rudderlabs/rudder-server/services/dedup" + dedupTypes "github.com/rudderlabs/rudder-server/services/dedup/types" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/services/rsources" @@ -613,7 +614,7 @@ func (proc *Handle) Setup( }) } if proc.config.enableDedup { - proc.dedup = dedup.New(dedup.DefaultPath()) + proc.dedup = dedup.New() } proc.sourceObservers = []sourceObserver{delayed.NewEventStats(proc.statsFactory, proc.conf)} ctx, cancel := context.WithCancel(context.Background()) @@ -1708,7 +1709,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf p := payloadFunc() messageSize := int64(len(p)) dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) - if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: dedupKey, Value: messageSize}); !ok { + ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}) + if err != nil { + panic(err) + } + if !ok { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 continue diff --git a/processor/processor_test.go b/processor/processor_test.go index 03baee4782..222b07f6c7 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2471,7 +2471,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) - c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0)).After(callUnprocessed).Times(3) + c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3) c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1) // We expect one transform call to destination A, after callUnprocessed. diff --git a/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go b/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go index 70abe0215e..07ae04f714 100644 --- a/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go +++ b/router/batchrouter/asyncdestinationmanager/yandexmetrica/yandexmetrica_test.go @@ -46,15 +46,15 @@ var ( } ) -var _ = Describe("Yandexmetrica", func() { - Describe(("NewManager function test"), func() { +var _ = Describe("Antisymmetric", func() { + Describe("NewManager function test", func() { It("should return yandexmetrica manager", func() { yandexmetrica, err := yandexmetrica.NewManager(destination, backendconfig.DefaultBackendConfig) Expect(err).To(BeNil()) Expect(yandexmetrica).NotTo(BeNil()) }) }) - Describe(("Upload function test"), func() { + Describe("Upload function test", func() { It("Testing a successful scenario", func() { cache := oauthv2.NewCache() ctrl := gomock.NewController(GinkgoT()) diff --git a/services/dedup/badger.go b/services/dedup/badger/badger.go similarity index 51% rename from services/dedup/badger.go rename to services/dedup/badger/badger.go index 37de57a97b..30f4a2af8d 100644 --- a/services/dedup/badger.go +++ b/services/dedup/badger/badger.go @@ -1,20 +1,24 @@ -package dedup +package badger import ( + "fmt" "strconv" + "sync" "time" "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" "github.com/rudderlabs/rudder-go-kit/config" - + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/rruntime" + "github.com/rudderlabs/rudder-server/services/dedup/types" "github.com/rudderlabs/rudder-server/utils/misc" ) -type badgerDB struct { +type BadgerDB struct { stats stats.Stats logger loggerForBadger badgerDB *badger.DB @@ -23,12 +27,58 @@ type badgerDB struct { gcDone chan struct{} path string opts badger.Options + once sync.Once +} + +// DefaultPath returns the default path for the deduplication service's badger DB +func DefaultPath() string { + badgerPathName := "/badgerdbv4" + tmpDirPath, err := misc.CreateTMPDIR() + if err != nil { + panic(err) + } + return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName) +} + +func NewBadgerDB(path string) *BadgerDB { + dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") + + log := logger.NewLogger().Child("dedup") + badgerOpts := badger. + DefaultOptions(path). + WithCompression(options.None). + WithIndexCacheSize(16 << 20). // 16mb + WithNumGoroutines(1). + WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)). + WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). + WithBlockCacheSize(0). + WithNumVersionsToKeep(1). + WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). + WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). + WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)). + WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false)) + + db := &BadgerDB{ + stats: stats.Default, + logger: loggerForBadger{log}, + path: path, + gcDone: make(chan struct{}), + close: make(chan struct{}), + window: dedupWindow, + opts: badgerOpts, + } + return db } -func (d *badgerDB) Get(key string) (int64, bool) { +func (d *BadgerDB) Get(key string) (int64, bool, error) { var payloadSize int64 var found bool - err := d.badgerDB.View(func(txn *badger.Txn) error { + var err error + err = d.init() + if err != nil { + return 0, false, err + } + err = d.badgerDB.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err != nil { return err @@ -40,12 +90,16 @@ func (d *badgerDB) Get(key string) (int64, bool) { return nil }) if err != nil && err != badger.ErrKeyNotFound { - panic(err) + return 0, false, err } - return payloadSize, found + return payloadSize, found, nil } -func (d *badgerDB) Set(kvs []KeyValue) error { +func (d *BadgerDB) Set(kvs []types.KeyValue) error { + err := d.init() + if err != nil { + return err + } txn := d.badgerDB.NewTransaction(true) for _, message := range kvs { value := strconv.FormatInt(message.Value, 10) @@ -66,26 +120,29 @@ func (d *badgerDB) Set(kvs []KeyValue) error { return txn.Commit() } -func (d *badgerDB) Close() { +func (d *BadgerDB) Close() { close(d.close) <-d.gcDone _ = d.badgerDB.Close() } -func (d *badgerDB) start() { +func (d *BadgerDB) init() error { var err error - d.badgerDB, err = badger.Open(d.opts) - if err != nil { - panic(err) - } - rruntime.Go(func() { - d.gcLoop() - close(d.gcDone) + d.once.Do(func() { + d.badgerDB, err = badger.Open(d.opts) + if err != nil { + return + } + rruntime.Go(func() { + d.gcLoop() + close(d.gcDone) + }) }) + return err } -func (d *badgerDB) gcLoop() { +func (d *BadgerDB) gcLoop() { for { select { case <-d.close: @@ -113,3 +170,11 @@ func (d *badgerDB) gcLoop() { } } + +type loggerForBadger struct { + logger.Logger +} + +func (l loggerForBadger) Warningf(fmt string, args ...interface{}) { + l.Warnf(fmt, args...) +} diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 3fbc8f2d2f..6efc08c99e 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -5,58 +5,14 @@ package dedup import ( "fmt" "sync" - "time" - "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/badger/v4/options" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/rudderlabs/rudder-server/services/dedup/badger" + "github.com/rudderlabs/rudder-server/services/dedup/types" ) -type OptFn func(*badgerDB) - -// DefaultPath returns the default path for the deduplication service's badger DB -func DefaultPath() string { - badgerPathName := "/badgerdbv4" - tmpDirPath, err := misc.CreateTMPDIR() - if err != nil { - panic(err) - } - return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName) -} - // New creates a new deduplication service. The service needs to be closed after use. -func New(path string) Dedup { - dedupWindow := config.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") - - log := logger.NewLogger().Child("dedup") - badgerOpts := badger. - DefaultOptions(path). - WithCompression(options.None). - WithIndexCacheSize(16 << 20). // 16mb - WithNumGoroutines(1). - WithNumMemtables(config.GetInt("BadgerDB.numMemtable", 5)). - WithValueThreshold(config.GetInt64("BadgerDB.valueThreshold", 1048576)). - WithBlockCacheSize(0). - WithNumVersionsToKeep(1). - WithNumLevelZeroTables(config.GetInt("BadgerDB.numLevelZeroTables", 5)). - WithNumLevelZeroTablesStall(config.GetInt("BadgerDB.numLevelZeroTablesStall", 15)). - WithSyncWrites(config.GetBool("BadgerDB.syncWrites", false)). - WithDetectConflicts(config.GetBool("BadgerDB.detectConflicts", false)) - - db := &badgerDB{ - stats: stats.Default, - logger: loggerForBadger{log}, - path: path, - gcDone: make(chan struct{}), - close: make(chan struct{}), - window: dedupWindow, - opts: badgerOpts, - } - db.start() +func New() Dedup { + db := badger.NewBadgerDB(badger.DefaultPath()) return &dedup{ badgerDB: db, cache: make(map[string]int64), @@ -66,7 +22,7 @@ func New(path string) Dedup { // Dedup is the interface for deduplication service type Dedup interface { // Set returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value - Set(kv KeyValue) (bool, int64) + Set(kv types.KeyValue) (bool, int64, error) // Commit commits a list of previously set keys to the DB Commit(keys []string) error @@ -74,41 +30,40 @@ type Dedup interface { // Close closes the deduplication service Close() } -type KeyValue struct { - Key string - Value int64 -} type dedup struct { - badgerDB *badgerDB + badgerDB *badger.BadgerDB cacheMu sync.Mutex cache map[string]int64 } -func (d *dedup) Set(kv KeyValue) (bool, int64) { +func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) { d.cacheMu.Lock() defer d.cacheMu.Unlock() if previous, found := d.cache[kv.Key]; found { - return false, previous + return false, previous, nil + } + previous, found, err := d.badgerDB.Get(kv.Key) + if err != nil { + return false, 0, err } - previous, found := d.badgerDB.Get(kv.Key) if !found { d.cache[kv.Key] = kv.Value } - return !found, previous + return !found, previous, nil } func (d *dedup) Commit(keys []string) error { d.cacheMu.Lock() defer d.cacheMu.Unlock() - kvs := make([]KeyValue, len(keys)) + kvs := make([]types.KeyValue, len(keys)) for i, key := range keys { value, ok := d.cache[key] if !ok { return fmt.Errorf("key %v has not been previously set", key) } - kvs[i] = KeyValue{Key: key, Value: value} + kvs[i] = types.KeyValue{Key: key, Value: value} } err := d.badgerDB.Set(kvs) @@ -123,11 +78,3 @@ func (d *dedup) Commit(keys []string) error { func (d *dedup) Close() { d.badgerDB.Close() } - -type loggerForBadger struct { - logger.Logger -} - -func (l loggerForBadger) Warningf(fmt string, args ...interface{}) { - l.Warnf(fmt, args...) -} diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index 51f87e2a56..7d944c6d15 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -16,6 +16,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/services/dedup" + "github.com/rudderlabs/rudder-server/services/dedup/types" ) func Test_Dedup(t *testing.T) { @@ -26,36 +27,41 @@ func Test_Dedup(t *testing.T) { dbPath := os.TempDir() + "/dedup_test" defer func() { _ = os.RemoveAll(dbPath) }() _ = os.RemoveAll(dbPath) - d := dedup.New(dbPath) + d := dedup.New() defer d.Close() t.Run("if message id is not present in cache and badger db", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "a", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "a", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) // Checking it again should give us the previous value from the cache - found, value := d.Set(dedup.KeyValue{Key: "a", Value: 2}) + found, value, err := d.Set(types.KeyValue{Key: "a", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("if message is committed, previous value should always return", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "b", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "b", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"a"}) + err = d.Commit([]string{"a"}) require.NoError(t, err) - found, value := d.Set(dedup.KeyValue{Key: "b", Value: 2}) + found, value, err := d.Set(types.KeyValue{Key: "b", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Equal(t, int64(1), value) }) t.Run("committing a messageid not present in cache", func(t *testing.T) { - found, _ := d.Set(dedup.KeyValue{Key: "c", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "c", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"d"}) + err = d.Commit([]string{"d"}) require.NotNil(t, err) }) } @@ -69,20 +75,23 @@ func Test_Dedup_Window(t *testing.T) { defer func() { _ = os.RemoveAll(dbPath) }() _ = os.RemoveAll(dbPath) config.Set("Dedup.dedupWindow", "1s") - d := dedup.New(dbPath) + d := dedup.New() defer d.Close() - found, _ := d.Set(dedup.KeyValue{Key: "to be deleted", Value: 1}) + found, _, err := d.Set(types.KeyValue{Key: "to be deleted", Value: 1}) + require.Nil(t, err) require.Equal(t, true, found) - err := d.Commit([]string{"to be deleted"}) + err = d.Commit([]string{"to be deleted"}) require.NoError(t, err) - found, _ = d.Set(dedup.KeyValue{Key: "to be deleted", Value: 2}) + found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 2}) + require.Nil(t, err) require.Equal(t, false, found) require.Eventually(t, func() bool { - found, _ = d.Set(dedup.KeyValue{Key: "to be deleted", Value: 3}) + found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 3}) + require.Nil(t, err) return found }, 2*time.Second, 100*time.Millisecond) } @@ -95,14 +104,14 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { dbPath := os.TempDir() + "/dedup_test_errtxntoobig" defer os.RemoveAll(dbPath) os.RemoveAll(dbPath) - d := dedup.New(dbPath) + d := dedup.New() defer d.Close() size := 105_000 messages := make([]string, size) for i := 0; i < size; i++ { messages[i] = uuid.New().String() - d.Set(dedup.KeyValue{Key: messages[i], Value: int64(i + 1)}) + _, _, _ = d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)}) } err := d.Commit(messages) require.NoError(t, err) @@ -117,23 +126,23 @@ func Benchmark_Dedup(b *testing.B) { b.Logf("using path %s, since tmpDir has issues in macOS\n", dbPath) defer func() { _ = os.RemoveAll(dbPath) }() _ = os.MkdirAll(dbPath, 0o750) - d := dedup.New(dbPath) + d := dedup.New() b.Run("no duplicates 1000 batch unique", func(b *testing.B) { batchSize := 1000 - msgIDs := make([]dedup.KeyValue, batchSize) + msgIDs := make([]types.KeyValue, batchSize) keys := make([]string, 0) for i := 0; i < b.N; i++ { - msgIDs[i%batchSize] = dedup.KeyValue{ + msgIDs[i%batchSize] = types.KeyValue{ Key: uuid.New().String(), Value: int64(i + 1), } if i%batchSize == batchSize-1 || i == b.N-1 { for _, msgID := range msgIDs[:i%batchSize] { - d.Set(msgID) + _, _, _ = d.Set(msgID) keys = append(keys, msgID.Key) } err := d.Commit(keys) diff --git a/services/dedup/types/types.go b/services/dedup/types/types.go new file mode 100644 index 0000000000..ab8b351afc --- /dev/null +++ b/services/dedup/types/types.go @@ -0,0 +1,6 @@ +package types + +type KeyValue struct { + Key string + Value int64 +}