Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use append vs merge option from backend config #3965

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ require (
github.com/rs/cors v1.10.1
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.16.2
github.com/rudderlabs/rudder-go-kit v0.16.3
github.com/rudderlabs/sql-tunnels v0.1.5
github.com/samber/lo v1.38.1
github.com/segmentio/kafka-go v0.4.42
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,8 @@ github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xb
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.16.2 h1:1zR0ivPT3Rp9bHmfq5k8VVfOy3bJrag2gHbjnqbUmtM=
github.com/rudderlabs/rudder-go-kit v0.16.2/go.mod h1:vRRTcYmAtYg87R4liGy24wO3452WlGHkFwtEopgme3k=
github.com/rudderlabs/rudder-go-kit v0.16.3 h1:IZIg7RjwbQN0GAHpiZgNLW388AwBmgVnh3bYPXP7SKQ=
github.com/rudderlabs/rudder-go-kit v0.16.3/go.mod h1:vRRTcYmAtYg87R4liGy24wO3452WlGHkFwtEopgme3k=
github.com/rudderlabs/sql-tunnels v0.1.5 h1:L/e9GQtqJlTVMauAE+ym/XUqhg+Va6RZQiOvBgbhspY=
github.com/rudderlabs/sql-tunnels v0.1.5/go.mod h1:ZwQkCLb/5hHm5U90juAj9idkkFGv2R2dzDHJoPbKIto=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down
20 changes: 20 additions & 0 deletions testhelper/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package testhelper

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
)

func Clone[T any](t testing.TB, v T) T {
t.Helper()

buf, err := json.Marshal(v)
require.NoError(t, err)

var clone T
require.NoError(t, json.Unmarshal(buf, &clone))

return clone
}
113 changes: 57 additions & 56 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

"github.com/samber/lo"

"cloud.google.com/go/bigquery"
"github.com/samber/lo"
bqService "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand All @@ -26,6 +23,7 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery/middleware"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand All @@ -43,8 +41,7 @@ type BigQuery struct {
config struct {
setUsersLoadPartitionFirstEventFilter bool
customPartitionsEnabled bool
isUsersTableDedupEnabled bool
isDedupEnabled bool
allowMerge bool
enableDeleteByJobs bool
customPartitionsEnabledWorkspaceIDs []string
slowQueryThreshold time.Duration
Expand Down Expand Up @@ -140,8 +137,7 @@ func New(conf *config.Config, log logger.Logger) *BigQuery {

bq.config.setUsersLoadPartitionFirstEventFilter = conf.GetBool("Warehouse.bigquery.setUsersLoadPartitionFirstEventFilter", true)
bq.config.customPartitionsEnabled = conf.GetBool("Warehouse.bigquery.customPartitionsEnabled", false)
bq.config.isUsersTableDedupEnabled = conf.GetBool("Warehouse.bigquery.isUsersTableDedupEnabled", false)
bq.config.isDedupEnabled = conf.GetBool("Warehouse.bigquery.isDedupEnabled", false)
bq.config.allowMerge = conf.GetBool("Warehouse.bigquery.allowMerge", true)
bq.config.enableDeleteByJobs = conf.GetBool("Warehouse.bigquery.enableDeleteByJobs", false)
bq.config.customPartitionsEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs", nil)
bq.config.slowQueryThreshold = conf.GetDuration("Warehouse.bigquery.slowQueryThreshold", 5, time.Minute)
Expand All @@ -163,6 +159,7 @@ func (bq *BigQuery) getMiddleware() *middleware.Client {
logfield.DestinationType, bq.warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, bq.warehouse.WorkspaceID,
logfield.Schema, bq.namespace,
logfield.ShouldMerge, bq.shouldMerge(),
),
middleware.WithSlowQueryThreshold(bq.config.slowQueryThreshold),
)
Expand Down Expand Up @@ -193,23 +190,21 @@ func (bq *BigQuery) CreateTable(ctx context.Context, tableName string, columnMap
return fmt.Errorf("create table: %w", err)
}

if !bq.dedupEnabled() {
if err = bq.createTableView(ctx, tableName, columnMap); err != nil {
return fmt.Errorf("create view: %w", err)
}
if err = bq.createTableView(ctx, tableName, columnMap); err != nil {
return fmt.Errorf("create view: %w", err)
}

return nil
}

func (bq *BigQuery) DropTable(ctx context.Context, tableName string) (err error) {
err = bq.DeleteTable(ctx, tableName)
if err != nil {
return
func (bq *BigQuery) DropTable(ctx context.Context, tableName string) error {
if err := bq.DeleteTable(ctx, tableName); err != nil {
return fmt.Errorf("cannot delete table %q: %w", tableName, err)
}
if !bq.dedupEnabled() {
err = bq.DeleteTable(ctx, tableName+"_view")
if err := bq.DeleteTable(ctx, tableName+"_view"); err != nil {
return fmt.Errorf("cannot delete table %q: %w", tableName+"_view", err)
}
return
return nil
}

func (bq *BigQuery) createTableView(ctx context.Context, tableName string, columnMap model.TableSchema) (err error) {
Expand All @@ -225,9 +220,16 @@ func (bq *BigQuery) createTableView(ctx context.Context, tableName string, colum

// assuming it has field named id upon which dedup is done in view
viewQuery := `SELECT * EXCEPT (__row_number) FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY ` + partitionKey + viewOrderByStmt + `) AS __row_number FROM ` + "`" + bq.projectID + "." + bq.namespace + "." + tableName + "`" + ` WHERE _PARTITIONTIME BETWEEN TIMESTAMP_TRUNC(TIMESTAMP_MICROS(UNIX_MICROS(CURRENT_TIMESTAMP()) - 60 * 60 * 60 * 24 * 1000000), DAY, 'UTC')
AND TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, 'UTC')
)
SELECT *, ROW_NUMBER() OVER (PARTITION BY ` + partitionKey + viewOrderByStmt + `) AS __row_number
FROM ` + "`" + bq.projectID + "." + bq.namespace + "." + tableName + "`" + `
WHERE
_PARTITIONTIME BETWEEN TIMESTAMP_TRUNC(
TIMESTAMP_MICROS(UNIX_MICROS(CURRENT_TIMESTAMP()) - 60 * 60 * 60 * 24 * 1000000),
DAY,
'UTC'
)
AND TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, 'UTC')
)
WHERE __row_number = 1`
metaData := &bigquery.TableMetadata{
ViewQuery: viewQuery,
Expand All @@ -241,7 +243,8 @@ func (bq *BigQuery) schemaExists(ctx context.Context, _, _ string) (exists bool,
ds := bq.db.Dataset(bq.namespace)
_, err = ds.Metadata(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code == 404 {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 404 {
bq.logger.Debugf("BQ: Dataset %s not found", bq.namespace)
return false, nil
}
Expand Down Expand Up @@ -275,7 +278,8 @@ func (bq *BigQuery) CreateSchema(ctx context.Context) (err error) {
bq.logger.Infof("BQ: Creating schema: %s ...", bq.namespace)
err = ds.Create(ctx, meta)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code == 409 {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 409 {
bq.logger.Infof("BQ: Create schema %s failed as schema already exists", bq.namespace)
return nil
}
Expand All @@ -285,7 +289,8 @@ func (bq *BigQuery) CreateSchema(ctx context.Context) (err error) {

func checkAndIgnoreAlreadyExistError(err error) bool {
if err != nil {
if e, ok := err.(*googleapi.Error); ok {
var e *googleapi.Error
if errors.As(err, &e) {
// 409 is returned when we try to create a table that already exists
// 400 is returned for all kinds of invalid input - so we need to check the error message too
if e.Code == 409 || (e.Code == 400 && strings.Contains(e.Message, "already exists in schema")) {
Expand Down Expand Up @@ -384,14 +389,14 @@ func (bq *BigQuery) loadTable(
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

if bq.dedupEnabled() {
if bq.shouldMerge() {
return bq.loadTableByMerge(ctx, tableName, gcsRef, log, skipTempTableDelete)
}
return bq.loadTableByAppend(ctx, tableName, gcsRef, log)
}

func (bq *BigQuery) loadTableStrategy() string {
if bq.dedupEnabled() {
if bq.shouldMerge() {
return "MERGE"
}
return "APPEND"
Expand Down Expand Up @@ -599,8 +604,7 @@ func (bq *BigQuery) loadTableByMerge(
SET
%[6]s WHEN NOT MATCHED THEN INSERT (%[4]s)
VALUES
(%[5]s);
`,
(%[5]s);`,
bqTable(tableName),
bqTable(stagingTableName),
primaryJoinClause,
Expand Down Expand Up @@ -646,7 +650,7 @@ func (bq *BigQuery) loadTableByMerge(

func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]error) {
errorMap = map[string]error{warehouseutils.IdentifiesTable: nil}
bq.logger.Infof("BQ: Starting load for identifies and users tables\n")
bq.logger.Infof("BQ: Starting load for identifies and users tables")
_, identifyLoadTable, err := bq.loadTable(ctx, warehouseutils.IdentifiesTable, true)
if err != nil {
errorMap[warehouseutils.IdentifiesTable] = err
Expand Down Expand Up @@ -704,10 +708,12 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
bqIdentifiesTable := bqTable(warehouseutils.IdentifiesTable)
partition := fmt.Sprintf("TIMESTAMP('%s')", identifyLoadTable.partitionDate)
var identifiesFrom string
if bq.dedupEnabled() {
identifiesFrom = fmt.Sprintf(`%s WHERE user_id IS NOT NULL %s`, bqTable(identifyLoadTable.stagingTableName), loadedAtFilter())
if bq.shouldMerge() {
identifiesFrom = fmt.Sprintf(`%s WHERE user_id IS NOT NULL %s`,
bqTable(identifyLoadTable.stagingTableName), loadedAtFilter())
} else {
identifiesFrom = fmt.Sprintf(`%s WHERE _PARTITIONTIME = %s AND user_id IS NOT NULL %s`, bqIdentifiesTable, partition, loadedAtFilter())
identifiesFrom = fmt.Sprintf(`%s WHERE _PARTITIONTIME = %s AND user_id IS NOT NULL %s`,
bqIdentifiesTable, partition, loadedAtFilter())
}
sqlStatement := fmt.Sprintf(`SELECT DISTINCT * FROM (
SELECT id, %[1]s FROM (
Expand Down Expand Up @@ -824,7 +830,7 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
}
}

if !bq.dedupEnabled() {
if !bq.shouldMerge() {
loadUserTableByAppend()
return
}
Expand All @@ -847,45 +853,40 @@ func Connect(context context.Context, cred *BQCredentials) (*bigquery.Client, er
}
opts = append(opts, option.WithCredentialsJSON(credBytes))
}
client, err := bigquery.NewClient(context, cred.ProjectID, opts...)
return client, err
c, err := bigquery.NewClient(context, cred.ProjectID, opts...)
return c, err
}

func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery.Client, error) {
bq.logger.Infof("BQ: Connecting to BigQuery in project: %s", cred.ProjectID)
client, err := Connect(ctx, &cred)
return client, err
c, err := Connect(ctx, &cred)
return c, err
}

func (bq *BigQuery) dedupEnabled() bool {
return bq.config.isDedupEnabled || bq.config.isUsersTableDedupEnabled
// shouldMerge returns true if:
// * the server config says we allow merging
// * the user opted in to merging
func (bq *BigQuery) shouldMerge() bool {
return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
}

func (bq *BigQuery) CrashRecover(ctx context.Context) {
if !bq.dedupEnabled() {
return
}
bq.dropDanglingStagingTables(ctx)
}

func (bq *BigQuery) dropDanglingStagingTables(ctx context.Context) bool {
sqlStatement := fmt.Sprintf(`
SELECT
table_name
FROM
%[1]s.INFORMATION_SCHEMA.TABLES
WHERE
table_schema = '%[1]s'
AND table_name LIKE '%[2]s';
`,
AND table_name LIKE '%[2]s';`,
bq.namespace,
fmt.Sprintf(`%s%%`, warehouseutils.StagingTablePrefix(provider)),
)
query := bq.db.Query(sqlStatement)
it, err := bq.getMiddleware().Read(ctx, query)
if err != nil {
bq.logger.Errorf("WH: BQ: Error dropping dangling staging tables in BQ: %v\nQuery: %s\n", err, sqlStatement)
return false
return
}

var stagingTableNames []string
Expand All @@ -897,22 +898,20 @@ func (bq *BigQuery) dropDanglingStagingTables(ctx context.Context) bool {
break
}
bq.logger.Errorf("BQ: Error in processing fetched staging tables from information schema in dataset %v : %v", bq.namespace, err)
return false
return
}
if _, ok := values[0].(string); ok {
stagingTableNames = append(stagingTableNames, values[0].(string))
}
}
bq.logger.Infof("WH: PG: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
err := bq.DeleteTable(ctx, stagingTableName)
if err != nil {
bq.logger.Errorf("WH: BQ: Error dropping dangling staging table: %s in BQ: %v", stagingTableName, err)
delSuccess = false
return
}
}
return delSuccess
}

func (bq *BigQuery) IsEmpty(
Expand Down Expand Up @@ -1040,7 +1039,8 @@ func (bq *BigQuery) FetchSchema(ctx context.Context) (model.Schema, model.Schema

it, err := bq.getMiddleware().Read(ctx, query)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code == 404 {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 404 {
// if dataset resource is not found, return empty schema
return schema, unrecognizedSchema, nil
}
Expand Down Expand Up @@ -1109,7 +1109,8 @@ func (bq *BigQuery) tableExists(ctx context.Context, tableName string) (exists b
if err == nil {
return true, nil
}
if e, ok := err.(*googleapi.Error); ok {
var e *googleapi.Error
if errors.As(err, &e) {
if e.Code == 404 {
return false, nil
}
Expand Down
Loading
Loading