Skip to content

Commit

Permalink
chore: reverts "feat: use append vs merge option from backend config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Nov 13, 2023
1 parent 1406702 commit fd46fc3
Show file tree
Hide file tree
Showing 21 changed files with 1,278 additions and 1,221 deletions.
20 changes: 0 additions & 20 deletions testhelper/clone.go

This file was deleted.

113 changes: 56 additions & 57 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

"github.com/samber/lo"

"cloud.google.com/go/bigquery"
bqService "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand All @@ -23,7 +26,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/integrations/bigquery/middleware"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand All @@ -41,7 +43,8 @@ type BigQuery struct {
config struct {
setUsersLoadPartitionFirstEventFilter bool
customPartitionsEnabled bool
allowMerge bool
isUsersTableDedupEnabled bool
isDedupEnabled bool
enableDeleteByJobs bool
customPartitionsEnabledWorkspaceIDs []string
slowQueryThreshold time.Duration
Expand Down Expand Up @@ -137,7 +140,8 @@ func New(conf *config.Config, log logger.Logger) *BigQuery {

bq.config.setUsersLoadPartitionFirstEventFilter = conf.GetBool("Warehouse.bigquery.setUsersLoadPartitionFirstEventFilter", true)
bq.config.customPartitionsEnabled = conf.GetBool("Warehouse.bigquery.customPartitionsEnabled", false)
bq.config.allowMerge = conf.GetBool("Warehouse.bigquery.allowMerge", true)
bq.config.isUsersTableDedupEnabled = conf.GetBool("Warehouse.bigquery.isUsersTableDedupEnabled", false)
bq.config.isDedupEnabled = conf.GetBool("Warehouse.bigquery.isDedupEnabled", false)
bq.config.enableDeleteByJobs = conf.GetBool("Warehouse.bigquery.enableDeleteByJobs", false)
bq.config.customPartitionsEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs", nil)
bq.config.slowQueryThreshold = conf.GetDuration("Warehouse.bigquery.slowQueryThreshold", 5, time.Minute)
Expand All @@ -159,7 +163,6 @@ func (bq *BigQuery) getMiddleware() *middleware.Client {
logfield.DestinationType, bq.warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, bq.warehouse.WorkspaceID,
logfield.Schema, bq.namespace,
logfield.ShouldMerge, bq.shouldMerge(),
),
middleware.WithSlowQueryThreshold(bq.config.slowQueryThreshold),
)
Expand Down Expand Up @@ -190,21 +193,23 @@ func (bq *BigQuery) CreateTable(ctx context.Context, tableName string, columnMap
return fmt.Errorf("create table: %w", err)
}

if err = bq.createTableView(ctx, tableName, columnMap); err != nil {
return fmt.Errorf("create view: %w", err)
if !bq.dedupEnabled() {
if err = bq.createTableView(ctx, tableName, columnMap); err != nil {
return fmt.Errorf("create view: %w", err)
}
}

return nil
}

func (bq *BigQuery) DropTable(ctx context.Context, tableName string) error {
if err := bq.DeleteTable(ctx, tableName); err != nil {
return fmt.Errorf("cannot delete table %q: %w", tableName, err)
func (bq *BigQuery) DropTable(ctx context.Context, tableName string) (err error) {
err = bq.DeleteTable(ctx, tableName)
if err != nil {
return
}
if err := bq.DeleteTable(ctx, tableName+"_view"); err != nil {
return fmt.Errorf("cannot delete table %q: %w", tableName+"_view", err)
if !bq.dedupEnabled() {
err = bq.DeleteTable(ctx, tableName+"_view")
}
return nil
return
}

func (bq *BigQuery) createTableView(ctx context.Context, tableName string, columnMap model.TableSchema) (err error) {
Expand All @@ -220,16 +225,9 @@ func (bq *BigQuery) createTableView(ctx context.Context, tableName string, colum

// assuming it has field named id upon which dedup is done in view
viewQuery := `SELECT * EXCEPT (__row_number) FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY ` + partitionKey + viewOrderByStmt + `) AS __row_number
FROM ` + "`" + bq.projectID + "." + bq.namespace + "." + tableName + "`" + `
WHERE
_PARTITIONTIME BETWEEN TIMESTAMP_TRUNC(
TIMESTAMP_MICROS(UNIX_MICROS(CURRENT_TIMESTAMP()) - 60 * 60 * 60 * 24 * 1000000),
DAY,
'UTC'
)
AND TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, 'UTC')
)
SELECT *, ROW_NUMBER() OVER (PARTITION BY ` + partitionKey + viewOrderByStmt + `) AS __row_number FROM ` + "`" + bq.projectID + "." + bq.namespace + "." + tableName + "`" + ` WHERE _PARTITIONTIME BETWEEN TIMESTAMP_TRUNC(TIMESTAMP_MICROS(UNIX_MICROS(CURRENT_TIMESTAMP()) - 60 * 60 * 60 * 24 * 1000000), DAY, 'UTC')
AND TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, 'UTC')
)
WHERE __row_number = 1`
metaData := &bigquery.TableMetadata{
ViewQuery: viewQuery,
Expand All @@ -243,8 +241,7 @@ func (bq *BigQuery) schemaExists(ctx context.Context, _, _ string) (exists bool,
ds := bq.db.Dataset(bq.namespace)
_, err = ds.Metadata(ctx)
if err != nil {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 404 {
if e, ok := err.(*googleapi.Error); ok && e.Code == 404 {
bq.logger.Debugf("BQ: Dataset %s not found", bq.namespace)
return false, nil
}
Expand Down Expand Up @@ -278,8 +275,7 @@ func (bq *BigQuery) CreateSchema(ctx context.Context) (err error) {
bq.logger.Infof("BQ: Creating schema: %s ...", bq.namespace)
err = ds.Create(ctx, meta)
if err != nil {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 409 {
if e, ok := err.(*googleapi.Error); ok && e.Code == 409 {
bq.logger.Infof("BQ: Create schema %s failed as schema already exists", bq.namespace)
return nil
}
Expand All @@ -289,8 +285,7 @@ func (bq *BigQuery) CreateSchema(ctx context.Context) (err error) {

func checkAndIgnoreAlreadyExistError(err error) bool {
if err != nil {
var e *googleapi.Error
if errors.As(err, &e) {
if e, ok := err.(*googleapi.Error); ok {
// 409 is returned when we try to create a table that already exists
// 400 is returned for all kinds of invalid input - so we need to check the error message too
if e.Code == 409 || (e.Code == 400 && strings.Contains(e.Message, "already exists in schema")) {
Expand Down Expand Up @@ -389,14 +384,14 @@ func (bq *BigQuery) loadTable(
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

if bq.shouldMerge() {
if bq.dedupEnabled() {
return bq.loadTableByMerge(ctx, tableName, gcsRef, log, skipTempTableDelete)
}
return bq.loadTableByAppend(ctx, tableName, gcsRef, log)
}

func (bq *BigQuery) loadTableStrategy() string {
if bq.shouldMerge() {
if bq.dedupEnabled() {
return "MERGE"
}
return "APPEND"
Expand Down Expand Up @@ -604,7 +599,8 @@ func (bq *BigQuery) loadTableByMerge(
SET
%[6]s WHEN NOT MATCHED THEN INSERT (%[4]s)
VALUES
(%[5]s);`,
(%[5]s);
`,
bqTable(tableName),
bqTable(stagingTableName),
primaryJoinClause,
Expand Down Expand Up @@ -650,7 +646,7 @@ func (bq *BigQuery) loadTableByMerge(

func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]error) {
errorMap = map[string]error{warehouseutils.IdentifiesTable: nil}
bq.logger.Infof("BQ: Starting load for identifies and users tables")
bq.logger.Infof("BQ: Starting load for identifies and users tables\n")
_, identifyLoadTable, err := bq.loadTable(ctx, warehouseutils.IdentifiesTable, true)
if err != nil {
errorMap[warehouseutils.IdentifiesTable] = err
Expand Down Expand Up @@ -708,12 +704,10 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
bqIdentifiesTable := bqTable(warehouseutils.IdentifiesTable)
partition := fmt.Sprintf("TIMESTAMP('%s')", identifyLoadTable.partitionDate)
var identifiesFrom string
if bq.shouldMerge() {
identifiesFrom = fmt.Sprintf(`%s WHERE user_id IS NOT NULL %s`,
bqTable(identifyLoadTable.stagingTableName), loadedAtFilter())
if bq.dedupEnabled() {
identifiesFrom = fmt.Sprintf(`%s WHERE user_id IS NOT NULL %s`, bqTable(identifyLoadTable.stagingTableName), loadedAtFilter())
} else {
identifiesFrom = fmt.Sprintf(`%s WHERE _PARTITIONTIME = %s AND user_id IS NOT NULL %s`,
bqIdentifiesTable, partition, loadedAtFilter())
identifiesFrom = fmt.Sprintf(`%s WHERE _PARTITIONTIME = %s AND user_id IS NOT NULL %s`, bqIdentifiesTable, partition, loadedAtFilter())
}
sqlStatement := fmt.Sprintf(`SELECT DISTINCT * FROM (
SELECT id, %[1]s FROM (
Expand Down Expand Up @@ -830,7 +824,7 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
}
}

if !bq.shouldMerge() {
if !bq.dedupEnabled() {
loadUserTableByAppend()
return
}
Expand All @@ -853,40 +847,45 @@ func Connect(context context.Context, cred *BQCredentials) (*bigquery.Client, er
}
opts = append(opts, option.WithCredentialsJSON(credBytes))
}
c, err := bigquery.NewClient(context, cred.ProjectID, opts...)
return c, err
client, err := bigquery.NewClient(context, cred.ProjectID, opts...)
return client, err
}

func (bq *BigQuery) connect(ctx context.Context, cred BQCredentials) (*bigquery.Client, error) {
bq.logger.Infof("BQ: Connecting to BigQuery in project: %s", cred.ProjectID)
c, err := Connect(ctx, &cred)
return c, err
client, err := Connect(ctx, &cred)
return client, err
}

// shouldMerge returns true if:
// * the server config says we allow merging
// * the user opted in to merging
func (bq *BigQuery) shouldMerge() bool {
return bq.config.allowMerge && bq.warehouse.GetBoolDestinationConfig(model.EnableMergeSetting)
func (bq *BigQuery) dedupEnabled() bool {
return bq.config.isDedupEnabled || bq.config.isUsersTableDedupEnabled
}

func (bq *BigQuery) CrashRecover(ctx context.Context) {
if !bq.dedupEnabled() {
return
}
bq.dropDanglingStagingTables(ctx)
}

func (bq *BigQuery) dropDanglingStagingTables(ctx context.Context) bool {
sqlStatement := fmt.Sprintf(`
SELECT
table_name
FROM
%[1]s.INFORMATION_SCHEMA.TABLES
WHERE
table_schema = '%[1]s'
AND table_name LIKE '%[2]s';`,
AND table_name LIKE '%[2]s';
`,
bq.namespace,
fmt.Sprintf(`%s%%`, warehouseutils.StagingTablePrefix(provider)),
)
query := bq.db.Query(sqlStatement)
it, err := bq.getMiddleware().Read(ctx, query)
if err != nil {
bq.logger.Errorf("WH: BQ: Error dropping dangling staging tables in BQ: %v\nQuery: %s\n", err, sqlStatement)
return
return false
}

var stagingTableNames []string
Expand All @@ -898,20 +897,22 @@ func (bq *BigQuery) CrashRecover(ctx context.Context) {
break
}
bq.logger.Errorf("BQ: Error in processing fetched staging tables from information schema in dataset %v : %v", bq.namespace, err)
return
return false
}
if _, ok := values[0].(string); ok {
stagingTableNames = append(stagingTableNames, values[0].(string))
}
}
bq.logger.Infof("WH: PG: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
err := bq.DeleteTable(ctx, stagingTableName)
if err != nil {
bq.logger.Errorf("WH: BQ: Error dropping dangling staging table: %s in BQ: %v", stagingTableName, err)
return
delSuccess = false
}
}
return delSuccess
}

func (bq *BigQuery) IsEmpty(
Expand Down Expand Up @@ -1039,8 +1040,7 @@ func (bq *BigQuery) FetchSchema(ctx context.Context) (model.Schema, model.Schema

it, err := bq.getMiddleware().Read(ctx, query)
if err != nil {
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 404 {
if e, ok := err.(*googleapi.Error); ok && e.Code == 404 {
// if dataset resource is not found, return empty schema
return schema, unrecognizedSchema, nil
}
Expand Down Expand Up @@ -1109,8 +1109,7 @@ func (bq *BigQuery) tableExists(ctx context.Context, tableName string) (exists b
if err == nil {
return true, nil
}
var e *googleapi.Error
if errors.As(err, &e) {
if e, ok := err.(*googleapi.Error); ok {
if e.Code == 404 {
return false, nil
}
Expand Down
Loading

0 comments on commit fd46fc3

Please sign in to comment.