Skip to content

Commit

Permalink
chore: postgres changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Oct 19, 2023
1 parent cd21e45 commit 2bce617
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 62 deletions.
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (bq *BigQuery) getMiddleware() *middleware.Client {
logfield.DestinationType, bq.warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, bq.warehouse.WorkspaceID,
logfield.Schema, bq.namespace,
logfield.AllowMerge, bq.config.allowMerge,
logfield.ShouldMerge, bq.shouldMerge(),
),
middleware.WithSlowQueryThreshold(bq.config.slowQueryThreshold),
)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (d *Deltalake) dropDanglingStagingTables(ctx context.Context) {
logfield.DestinationType, d.Warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, d.Warehouse.WorkspaceID,
logfield.Namespace, d.Namespace,
logfield.AllowMerge, d.config.allowMerge,
logfield.ShouldMerge, d.ShouldMerge(),
logfield.Error, err.Error(),
)
return
Expand Down Expand Up @@ -583,7 +583,7 @@ func (d *Deltalake) loadTable(
logfield.WorkspaceID, d.Warehouse.WorkspaceID,
logfield.Namespace, d.Namespace,
logfield.TableName, tableName,
logfield.AllowMerge, d.config.allowMerge,
logfield.ShouldMerge, d.ShouldMerge(),
)
log.Infow("started loading")

Expand Down
64 changes: 28 additions & 36 deletions warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (pg *Postgres) LoadTable(ctx context.Context, tableName string) (*types.Loa
logfield.WorkspaceID, pg.Warehouse.WorkspaceID,
logfield.Namespace, pg.Namespace,
logfield.TableName, tableName,
logfield.LoadTableStrategy, pg.loadTableStrategy(),
logfield.ShouldMerge, pg.shouldMerge(),
)
log.Infow("started loading")

Expand Down Expand Up @@ -78,7 +78,7 @@ func (pg *Postgres) loadTable(
logfield.WorkspaceID, pg.Warehouse.WorkspaceID,
logfield.Namespace, pg.Namespace,
logfield.TableName, tableName,
logfield.LoadTableStrategy, pg.loadTableStrategy(),
logfield.ShouldMerge, pg.shouldMerge(),
)
log.Infow("started loading")

Expand Down Expand Up @@ -143,7 +143,7 @@ func (pg *Postgres) loadTable(
}

var rowsDeleted int64
if pg.enableDedup() {
if pg.shouldMerge() {
log.Infow("deleting from load table")
rowsDeleted, err = pg.deleteFromLoadTable(
ctx, txn, tableName,
Expand Down Expand Up @@ -225,13 +225,6 @@ func (pg *Postgres) loadDataIntoStagingTable(
return nil
}

func (pg *Postgres) loadTableStrategy() string {
if !pg.enableDedup() {
return "APPEND"
}
return "MERGE"
}

func (pg *Postgres) deleteFromLoadTable(
ctx context.Context,
txn *sqlmiddleware.Tx,
Expand Down Expand Up @@ -532,32 +525,30 @@ func (pg *Postgres) loadUsersTable(

// Deduplication
// Delete from users table if the id is present in the staging table
if pg.enableDedup() {
primaryKey := "id"
query = fmt.Sprintf(`
primaryKey := "id"
query = fmt.Sprintf(`
DELETE FROM %[1]q.%[2]q using %[3]q _source
WHERE _source.%[4]s = %[1]s.%[2]s.%[4]s;`,
pg.Namespace,
warehouseutils.UsersTable,
usersStagingTableName,
primaryKey,
)
pg.Namespace,
warehouseutils.UsersTable,
usersStagingTableName,
primaryKey,
)

pg.logger.Infow("deduplication for users table",
logfield.SourceID, pg.Warehouse.Source.ID,
logfield.SourceType, pg.Warehouse.Source.SourceDefinition.Name,
logfield.DestinationID, pg.Warehouse.Destination.ID,
logfield.DestinationType, pg.Warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, pg.Warehouse.WorkspaceID,
logfield.TableName, warehouseutils.UsersTable,
logfield.StagingTableName, usersStagingTableName,
logfield.Namespace, pg.Namespace,
logfield.Query, query,
)
if _, err = tx.ExecContext(ctx, query); err != nil {
return loadUsersTableResponse{
usersError: fmt.Errorf("deduplication for users table: %w", err),
}
pg.logger.Infow("deduplication for users table",
logfield.SourceID, pg.Warehouse.Source.ID,
logfield.SourceType, pg.Warehouse.Source.SourceDefinition.Name,
logfield.DestinationID, pg.Warehouse.Destination.ID,
logfield.DestinationType, pg.Warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, pg.Warehouse.WorkspaceID,
logfield.TableName, warehouseutils.UsersTable,
logfield.StagingTableName, usersStagingTableName,
logfield.Namespace, pg.Namespace,
logfield.Query, query,
)
if _, err = tx.ExecContext(ctx, query); err != nil {
return loadUsersTableResponse{
usersError: fmt.Errorf("deduplication for users table: %w", err),
}
}

Expand Down Expand Up @@ -594,7 +585,8 @@ func (pg *Postgres) loadUsersTable(
return loadUsersTableResponse{}
}

func (pg *Postgres) enableDedup() bool {
return !slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID) &&
pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
func (pg *Postgres) shouldMerge() bool {
return pg.config.allowMerge &&
pg.Warehouse.GetBoolDestinationConfig(model.EnableMergeSetting) &&
!slices.Contains(pg.config.skipDedupDestinationIDs, pg.Warehouse.Destination.ID)
}
2 changes: 2 additions & 0 deletions warehouse/integrations/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Postgres struct {
LoadFileDownloader downloader.Downloader

config struct {
allowMerge bool
enableDeleteByJobs bool
numWorkersDownloadLoadFiles int
slowQueryThreshold time.Duration
Expand Down Expand Up @@ -162,6 +163,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats) *Postgres {
pg.logger = log.Child("integrations").Child("postgres")
pg.stats = stat

pg.config.allowMerge = conf.GetBool("Warehouse.postgres.allowMerge", true)
pg.config.enableDeleteByJobs = conf.GetBool("Warehouse.postgres.enableDeleteByJobs", false)
pg.config.numWorkersDownloadLoadFiles = conf.GetInt("Warehouse.postgres.numWorkersDownloadLoadFiles", 1)
pg.config.slowQueryThreshold = conf.GetDuration("Warehouse.postgres.slowQueryThreshold", 5, time.Minute)
Expand Down
54 changes: 32 additions & 22 deletions warehouse/integrations/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgres_test
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"strconv"
Expand All @@ -11,36 +12,28 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/compose-test/compose"
"github.com/rudderlabs/compose-test/testcompose"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/compose-test/compose"

"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/tunnelling"

"github.com/rudderlabs/compose-test/testcompose"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"

"github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/validations"

"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/tunnelling"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

func TestIntegration(t *testing.T) {
Expand Down Expand Up @@ -589,8 +582,11 @@ func TestIntegration(t *testing.T) {
c := config.New()
c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID)

mergeWarehouse := cloneWarehouse(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

pg := postgres.New(c, logger.NOP, stats.Default)
err := pg.Setup(ctx, warehouse, mockUploader)
err := pg.Setup(ctx, mergeWarehouse, mockUploader)
require.NoError(t, err)

err = pg.CreateSchema(ctx)
Expand Down Expand Up @@ -639,8 +635,11 @@ func TestIntegration(t *testing.T) {
c := config.New()
c.Set("Warehouse.postgres.EnableSQLStatementExecutionPlanWorkspaceIDs", workspaceID)

mergeWarehouse := cloneWarehouse(t, warehouse)
mergeWarehouse.Destination.Config[string(model.EnableMergeSetting)] = true

pg := postgres.New(config.Default, logger.NOP, stats.Default)
err := pg.Setup(ctx, warehouse, mockUploader)
err := pg.Setup(ctx, mergeWarehouse, mockUploader)
require.NoError(t, err)

err = pg.CreateSchema(ctx)
Expand Down Expand Up @@ -857,3 +856,14 @@ func mockUploader(

return mockUploader
}

func cloneWarehouse(t *testing.T, wh model.Warehouse) model.Warehouse {
t.Helper()
buf, err := json.Marshal(wh)
require.NoError(t, err)

var clone model.Warehouse
require.NoError(t, json.Unmarshal(buf, &clone))

return clone
}
2 changes: 1 addition & 1 deletion warehouse/logfield/logfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ const (
IntervalInHours = "intervalInHours"
StartTime = "startTime"
EndTime = "endTime"
AllowMerge = "allowMerge"
ShouldMerge = "shouldMerge"
)

0 comments on commit 2bce617

Please sign in to comment.