Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jul 23, 2024
1 parent b76ea10 commit 35a0edf
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 38 deletions.
5 changes: 3 additions & 2 deletions mocks/services/dedup/mock_dedup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
if ok, previousSize := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok {
if ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize}); !ok && err != nil {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1
continue
Expand Down
2 changes: 1 addition & 1 deletion processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2471,7 +2471,7 @@ var _ = Describe("Processor", Ordered, func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)

callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0)).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1)

// We expect one transform call to destination A, after callUnprocessed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ var (
}
)

var _ = Describe("Yandexmetrica", func() {
Describe(("NewManager function test"), func() {
var _ = Describe("Antisymmetric", func() {
Describe("NewManager function test", func() {
It("should return yandexmetrica manager", func() {
yandexmetrica, err := yandexmetrica.NewManager(destination, backendconfig.DefaultBackendConfig)
Expect(err).To(BeNil())
Expect(yandexmetrica).NotTo(BeNil())
})
})
Describe(("Upload function test"), func() {
Describe("Upload function test", func() {
It("Testing a successful scenario", func() {
cache := oauthv2.NewCache()
ctrl := gomock.NewController(GinkgoT())
Expand Down
38 changes: 26 additions & 12 deletions services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package badger
import (
"fmt"
"strconv"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
Expand All @@ -26,6 +27,7 @@ type BadgerDB struct {
gcDone chan struct{}
path string
opts badger.Options
once sync.Once
}

// DefaultPath returns the default path for the deduplication service's badger DB
Expand Down Expand Up @@ -68,10 +70,15 @@ func NewBadgerDB(path string) *BadgerDB {
return db
}

func (d *BadgerDB) Get(key string) (int64, bool) {
func (d *BadgerDB) Get(key string) (int64, bool, error) {
var payloadSize int64
var found bool
err := d.badgerDB.View(func(txn *badger.Txn) error {
var err error
err = d.init()
if err != nil {
return 0, false, err

Check warning on line 79 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L79

Added line #L79 was not covered by tests
}
err = d.badgerDB.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
Expand All @@ -83,12 +90,16 @@ func (d *BadgerDB) Get(key string) (int64, bool) {
return nil
})
if err != nil && err != badger.ErrKeyNotFound {
panic(err)
return 0, false, err

Check warning on line 93 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L93

Added line #L93 was not covered by tests
}
return payloadSize, found
return payloadSize, found, nil
}

func (d *BadgerDB) Set(kvs []types.KeyValue) error {
err := d.init()
if err != nil {
return err

Check warning on line 101 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L101

Added line #L101 was not covered by tests
}
txn := d.badgerDB.NewTransaction(true)
for _, message := range kvs {
value := strconv.FormatInt(message.Value, 10)
Expand All @@ -115,17 +126,20 @@ func (d *BadgerDB) Close() {
_ = d.badgerDB.Close()
}

func (d *BadgerDB) Start() {
func (d *BadgerDB) init() error {
var err error

d.badgerDB, err = badger.Open(d.opts)
if err != nil {
panic(err)
}
rruntime.Go(func() {
d.gcLoop()
close(d.gcDone)
d.once.Do(func() {
d.badgerDB, err = badger.Open(d.opts)
if err != nil {
return

Check warning on line 135 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L135

Added line #L135 was not covered by tests
}
rruntime.Go(func() {
d.gcLoop()
close(d.gcDone)
})
})
return err
}

func (d *BadgerDB) gcLoop() {
Expand Down
14 changes: 8 additions & 6 deletions services/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
// New creates a new deduplication service. The service needs to be closed after use.
func New() Dedup {
db := badger.NewBadgerDB(badger.DefaultPath())
db.Start()
return &dedup{
badgerDB: db,
cache: make(map[string]int64),
Expand All @@ -23,7 +22,7 @@ func New() Dedup {
// Dedup is the interface for deduplication service
type Dedup interface {
// Set returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value
Set(kv types.KeyValue) (bool, int64)
Set(kv types.KeyValue) (bool, int64, error)

// Commit commits a list of previously set keys to the DB
Commit(keys []string) error
Expand All @@ -38,17 +37,20 @@ type dedup struct {
cache map[string]int64
}

func (d *dedup) Set(kv types.KeyValue) (bool, int64) {
func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if previous, found := d.cache[kv.Key]; found {
return false, previous
return false, previous, nil
}
previous, found, err := d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err

Check warning on line 48 in services/dedup/dedup.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/dedup.go#L48

Added line #L48 was not covered by tests
}
previous, found := d.badgerDB.Get(kv.Key)
if !found {
d.cache[kv.Key] = kv.Value
}
return !found, previous
return !found, previous, nil
}

func (d *dedup) Commit(keys []string) error {
Expand Down
34 changes: 21 additions & 13 deletions services/dedup/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,37 @@ func Test_Dedup(t *testing.T) {
defer d.Close()

t.Run("if message id is not present in cache and badger db", func(t *testing.T) {
found, _ := d.Set(types.KeyValue{Key: "a", Value: 1})
found, _, err := d.Set(types.KeyValue{Key: "a", Value: 1})
require.Nil(t, err)
require.Equal(t, true, found)

// Checking it again should give us the previous value from the cache
found, value := d.Set(types.KeyValue{Key: "a", Value: 2})
found, value, err := d.Set(types.KeyValue{Key: "a", Value: 2})
require.Nil(t, err)
require.Equal(t, false, found)
require.Equal(t, int64(1), value)
})

t.Run("if message is committed, previous value should always return", func(t *testing.T) {
found, _ := d.Set(types.KeyValue{Key: "b", Value: 1})
found, _, err := d.Set(types.KeyValue{Key: "b", Value: 1})
require.Nil(t, err)
require.Equal(t, true, found)

err := d.Commit([]string{"a"})
err = d.Commit([]string{"a"})
require.NoError(t, err)

found, value := d.Set(types.KeyValue{Key: "b", Value: 2})
found, value, err := d.Set(types.KeyValue{Key: "b", Value: 2})
require.Nil(t, err)
require.Equal(t, false, found)
require.Equal(t, int64(1), value)
})

t.Run("committing a messageid not present in cache", func(t *testing.T) {
found, _ := d.Set(types.KeyValue{Key: "c", Value: 1})
found, _, err := d.Set(types.KeyValue{Key: "c", Value: 1})
require.Nil(t, err)
require.Equal(t, true, found)

err := d.Commit([]string{"d"})
err = d.Commit([]string{"d"})
require.NotNil(t, err)
})
}
Expand All @@ -73,17 +78,20 @@ func Test_Dedup_Window(t *testing.T) {
d := dedup.New()
defer d.Close()

found, _ := d.Set(types.KeyValue{Key: "to be deleted", Value: 1})
found, _, err := d.Set(types.KeyValue{Key: "to be deleted", Value: 1})
require.Nil(t, err)
require.Equal(t, true, found)

err := d.Commit([]string{"to be deleted"})
err = d.Commit([]string{"to be deleted"})
require.NoError(t, err)

found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 2})
found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 2})
require.Nil(t, err)
require.Equal(t, false, found)

require.Eventually(t, func() bool {
found, _ = d.Set(types.KeyValue{Key: "to be deleted", Value: 3})
found, _, err = d.Set(types.KeyValue{Key: "to be deleted", Value: 3})
require.Nil(t, err)
return found
}, 2*time.Second, 100*time.Millisecond)
}
Expand All @@ -103,7 +111,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) {
messages := make([]string, size)
for i := 0; i < size; i++ {
messages[i] = uuid.New().String()
d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)})
_, _, _ = d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)})
}
err := d.Commit(messages)
require.NoError(t, err)
Expand Down Expand Up @@ -134,7 +142,7 @@ func Benchmark_Dedup(b *testing.B) {

if i%batchSize == batchSize-1 || i == b.N-1 {
for _, msgID := range msgIDs[:i%batchSize] {
d.Set(msgID)
_, _, _ = d.Set(msgID)
keys = append(keys, msgID.Key)
}
err := d.Commit(keys)
Expand Down

0 comments on commit 35a0edf

Please sign in to comment.