Skip to content

Commit

Permalink
Merge branch 'master' into chore.refactorprocessJobsForDest
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored and Rohith BCS committed Oct 21, 2024
2 parents 00107bc + da9f8c8 commit 8f61a4c
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 17 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ replace (
require (
cloud.google.com/go/bigquery v1.63.1
cloud.google.com/go/pubsub v1.44.0
cloud.google.com/go/storage v1.44.0
cloud.google.com/go/storage v1.45.0
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.2
Expand Down Expand Up @@ -71,12 +71,12 @@ require (
github.com/oschwald/maxminddb-golang v1.13.1
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/prometheus/client_model v0.6.1
github.com/redis/go-redis/v9 v9.6.2
github.com/redis/go-redis/v9 v9.7.0
github.com/rs/cors v1.11.1
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/bing-ads-go-sdk v0.2.3
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.43.0
github.com/rudderlabs/rudder-go-kit v0.44.0
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.5.3
github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a
Expand All @@ -103,7 +103,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
go.uber.org/mock v0.5.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.12.0/go.mod h1:fFLk2dp2oAhDz8QFKwqrjdJvxSp/W2g7nillojlL5Ho=
cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKuqnZI01LAA=
cloud.google.com/go/storage v1.44.0 h1:abBzXf4UJKMmQ04xxJf9dYM/fNl24KHoTuBjyJDX2AI=
cloud.google.com/go/storage v1.44.0/go.mod h1:wpPblkIuMP5jCB/E48Pz9zIo2S/zD8g+ITmxKkPCITE=
cloud.google.com/go/storage v1.45.0 h1:5av0QcIVj77t+44mV4gffFC/LscFRUhto6UBMB5SimM=
cloud.google.com/go/storage v1.45.0/go.mod h1:wpPblkIuMP5jCB/E48Pz9zIo2S/zD8g+ITmxKkPCITE=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.2.0/go.mod h1:Wc8y/uYyOhPy12KEnXG9XGrvfMz5F5SrYecQlbW1rwM=
cloud.google.com/go/trace v1.11.1 h1:UNqdP+HYYtnm6lb91aNA5JQ0X14GnxkABGlfz2PzPew=
Expand Down Expand Up @@ -1136,8 +1136,8 @@ github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoG
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk=
github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
Expand Down Expand Up @@ -1168,8 +1168,8 @@ github.com/rudderlabs/goqu/v10 v10.3.1 h1:rnfX+b4EwBWQ2UQfIGeEW299JBBkK5biEbnf7K
github.com/rudderlabs/goqu/v10 v10.3.1/go.mod h1:LH2vI5gGHBxEQuESqFyk5ZA2anGINc8o25hbidDWOYw=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.43.0 h1:N6CAvQdjufitdiUl424+AcMebEmieB0TO5PhARwXvw8=
github.com/rudderlabs/rudder-go-kit v0.43.0/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-go-kit v0.44.0 h1:tvwSy4XY1NCn2D6ddsme3yxzq+9KY5X9YS4/sUDjIk4=
github.com/rudderlabs/rudder-go-kit v0.44.0/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q=
github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8=
github.com/rudderlabs/rudder-schemas v0.5.3 h1:IWWjAo2TzsjwHNhS2EAr1+0MjvA8BoTpJvB2o/GFwNU=
Expand Down Expand Up @@ -1417,8 +1417,8 @@ go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwE
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down
31 changes: 31 additions & 0 deletions services/dedup/badger/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,37 @@ func Test_Badger(t *testing.T) {
require.NoError(t, err)
require.False(t, found)
})
t.Run("different messageID should not be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "c", Value: 1, WorkspaceID: "test"},
{Key: "d", Value: 1, WorkspaceID: "test"},
{Key: "e", Value: 1, WorkspaceID: "test"},
}
found, _, err := badger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.True(t, found[key])
}
})
t.Run("same messageID should be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 3},
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 4},
{Key: "g", Value: 1, WorkspaceID: "test", JobID: 5},
}
expected := map[types.KeyValue]bool{
keys[0]: true,
keys[1]: false,
keys[2]: true,
}
found, _, err := badger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.Equal(t, expected[key], found[key])
}
})
}

func TestBadgerClose(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions services/dedup/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func Test_Dedup(t *testing.T) {
{
name: "Scylla",
},
//{
// name: "MirrorScylla",
//},
{
name: "MirrorScylla",
},
{
name: "MirrorBadger",
},
Expand Down Expand Up @@ -134,6 +134,8 @@ func Test_Dedup(t *testing.T) {
for _, kv := range kvs {
require.Equal(t, expected[kv], found[kv])
}
err = d.Commit([]string{"h"})
require.NoError(t, err)
})
})
}
Expand Down
31 changes: 31 additions & 0 deletions services/dedup/mirrorBadger/mirrorBadger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,35 @@ func Test_MirrorBadger(t *testing.T) {
require.Nil(t, err)
require.False(t, found)
})
t.Run("different messageID should not be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "c", Value: 1, WorkspaceID: "test"},
{Key: "d", Value: 1, WorkspaceID: "test"},
{Key: "e", Value: 1, WorkspaceID: "test"},
}
found, _, err := mirrorBadger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.True(t, found[key])
}
})
t.Run("same messageID should be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 3},
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 4},
{Key: "g", Value: 1, WorkspaceID: "test", JobID: 5},
}
expected := map[types.KeyValue]bool{
keys[0]: true,
keys[1]: false,
keys[2]: true,
}
found, _, err := mirrorBadger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.Equal(t, expected[key], found[key])
}
})
}
31 changes: 31 additions & 0 deletions services/dedup/mirrorScylla/mirrorScylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,35 @@ func Test_MirrorBadger(t *testing.T) {
require.Nil(t, err)
require.False(t, found)
})
t.Run("different messageID should not be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "c", Value: 1, WorkspaceID: "test"},
{Key: "d", Value: 1, WorkspaceID: "test"},
{Key: "e", Value: 1, WorkspaceID: "test"},
}
found, _, err := mirrorScylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.True(t, found[key])
}
})
t.Run("same messageID should be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 3},
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 4},
{Key: "g", Value: 1, WorkspaceID: "test", JobID: 5},
}
expected := map[types.KeyValue]bool{
keys[0]: true,
keys[1]: false,
keys[2]: true,
}
found, _, err := mirrorScylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.Equal(t, expected[key], found[key])
}
})
}
57 changes: 57 additions & 0 deletions services/dedup/scylla/scylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,61 @@ func Test_Scylla(t *testing.T) {
require.NoError(t, err)
require.False(t, found)
})
t.Run("Same messageID should be deduped for same workspace from cache for Batch call", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "c", Value: 1, WorkspaceID: "test", JobID: 1},
{Key: "c", Value: 1, WorkspaceID: "test", JobID: 2},
{Key: "d", Value: 1, WorkspaceID: "test", JobID: 3},
}
expected := map[types.KeyValue]bool{
keys[0]: true,
keys[1]: false,
keys[2]: true,
}
found, _, err := scylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.Equal(t, expected[key], found[key])
}
err = scylla.Commit([]string{"c", "d"})
require.NoError(t, err)
})
t.Run("Different messageID should not be deduped for same workspace", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "e", Value: 1, WorkspaceID: "test", JobID: 1},
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 2},
{Key: "g", Value: 1, WorkspaceID: "test", JobID: 3},
}
found, _, err := scylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
require.True(t, found[key])
}
})
t.Run("Same messageID should not be deduped for different workspace", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "h", Value: 1, WorkspaceID: "test1", JobID: 1},
}
found, _, err := scylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 1)
for _, key := range keys {
require.True(t, found[key])
}
err = scylla.Commit([]string{"h"})
require.NoError(t, err)
keys = []types.KeyValue{
{Key: "h", Value: 1, WorkspaceID: "test2", JobID: 1},
}
found, _, err = scylla.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 1)
for _, key := range keys {
require.True(t, found[key])
}
err = scylla.Commit([]string{"h"})
require.NoError(t, err)
})
}
11 changes: 9 additions & 2 deletions utils/misc/dbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,16 @@ func NewDatabaseConnectionPool(
maxConnLifetime := maxConnLifetimeVar.Load()
db.SetConnMaxLifetime(maxConnLifetime)

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
rruntime.Go(func() {
ticker := time.NewTicker(
conf.GetDurationVar(
5,
time.Second,
"db."+componentName+".pool.configUpdateInterval",
"db.pool.configUpdateInterval",
),
)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand Down
29 changes: 29 additions & 0 deletions utils/misc/dbutils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package misc_test

import (
"context"
"database/sql"
"fmt"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand Down Expand Up @@ -128,3 +130,30 @@ func TestIdleTxTimeout(t *testing.T) {
require.NoError(t, tx.Commit())
})
}

func TestCommonPool(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

conf := config.New()
conf.Set("DB.host", postgresContainer.Host)
conf.Set("DB.user", postgresContainer.User)
conf.Set("DB.name", postgresContainer.Database)
conf.Set("DB.port", postgresContainer.Port)
conf.Set("DB.password", postgresContainer.Password)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf.Set("db.test.pool.configUpdateInterval", 10*time.Millisecond)
db, err := misc.NewDatabaseConnectionPool(ctx, conf, stats.NOP, "test")
require.NoError(t, err)
require.NoError(t, db.Ping())
defer db.Close()
require.Equal(t, 40, db.Stats().MaxOpenConnections)

conf.Set("db.test.pool.maxOpenConnections", 5)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 5, db.Stats().MaxOpenConnections)
}

0 comments on commit 8f61a4c

Please sign in to comment.