Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): added support for bigquery custom partition for workspaceIDs #2679

Merged
merged 9 commits into from
Nov 11, 2022
18 changes: 18 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"reflect"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -161,6 +162,23 @@ func (c *Config) checkAndHotReloadConfig(configMap map[string][]*configValue) {
fmt.Printf("The value of key:%s & variable:%p changed from %v to %v\n", key, configVal, *value, _value)
*value = _value
}
case *[]string:
var _value []string
var isSet bool
for _, key := range configVal.keys {
if c.IsSet(key) {
isSet = true
_value = c.GetStringSlice(key, configVal.defaultValue.([]string))
break
}
}
if !isSet {
_value = configVal.defaultValue.([]string)
}
if !reflect.DeepEqual(_value, *value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a normal []string comparison instead of reflect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using slices.Compare(_value, *value) != 0 now.

fmt.Printf("The value of key:%s & variable:%p changed from %v to %v\n", key, configVal, *value, _value)
*value = _value
}
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
isUsersTableDedupEnabled bool
isDedupEnabled bool
enableDeleteByJobs bool
customPartitionsEnabledWorkspaceIDs []string
)

type HandleT struct {
Expand Down Expand Up @@ -300,12 +301,12 @@ func (bq *HandleT) loadTable(tableName string, _, getLoadFileLocFromTableUploads

loadTableByAppend := func() (err error) {
stagingLoadTable.partitionDate = time.Now().Format("2006-01-02")
outputTable := tableName
outputTable := partitionedTable(tableName, stagingLoadTable.partitionDate)
// Tables created by RudderStack are ingestion-time partitioned table with pseudo column named _PARTITIONTIME. BigQuery automatically assigns rows to partitions based
// on the time when BigQuery ingests the data. To support custom field partitions, omitting loading into partitioned table like tableName$20191221
// TODO: Support custom field partition on users & identifies tables
if !customPartitionsEnabled {
outputTable = partitionedTable(tableName, stagingLoadTable.partitionDate)
if customPartitionsEnabled || misc.Contains(customPartitionsEnabledWorkspaceIDs, bq.warehouse.WorkspaceID) {
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
outputTable = tableName
}

loader := bq.db.Dataset(bq.namespace).Table(outputTable).LoaderFrom(gcsRef)
Expand Down Expand Up @@ -656,6 +657,7 @@ func loadConfig() {
config.RegisterBoolConfigVariable(false, &isUsersTableDedupEnabled, true, "Warehouse.bigquery.isUsersTableDedupEnabled") // TODO: Deprecate with respect to isDedupEnabled
config.RegisterBoolConfigVariable(false, &isDedupEnabled, true, "Warehouse.bigquery.isDedupEnabled")
config.RegisterBoolConfigVariable(false, &enableDeleteByJobs, true, "Warehouse.bigquery.enableDeleteByJobs")
config.RegisterStringSliceConfigVariable(nil, &customPartitionsEnabledWorkspaceIDs, true, "Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs")
}

func Init() {
Expand Down
158 changes: 105 additions & 53 deletions warehouse/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,15 @@ import (
"github.com/stretchr/testify/assert"
)

type TestHandle struct {
DB *bigquery.Client
WriteKey string
Schema string
Tables []string
}

var handle *TestHandle
type TestHandle struct{}

func (*TestHandle) VerifyConnection() error {
credentials, err := testhelper.BigqueryCredentials()
if err != nil {
return err
}
return testhelper.WithConstantBackoff(func() (err error) {
handle.DB, err = bigquery2.Connect(context.TODO(), &credentials)
_, err = bigquery2.Connect(context.TODO(), &credentials)
if err != nil {
err = fmt.Errorf("could not connect to warehouse bigquery with error: %s", err.Error())
return
Expand All @@ -50,10 +43,23 @@ func (*TestHandle) VerifyConnection() error {
}

func TestBigQueryIntegration(t *testing.T) {
credentials, err := testhelper.BigqueryCredentials()
require.NoError(t, err)

var (
schema = testhelper.Schema(warehouseutils.BQ, testhelper.BigqueryIntegrationTestSchema)
writeKey = "J77aX7tLFJ84qYU6UrN8ctecwZt"
tables = []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups", "groups"}
db *bigquery.Client
)

db, err = bigquery2.Connect(context.TODO(), &credentials)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, testhelper.WithConstantBackoff(func() (err error) {
return handle.DB.Dataset(handle.Schema).DeleteWithContents(context.TODO())
}), fmt.Sprintf("Failed dropping dataset %s for BigQuery", handle.Schema))
return db.Dataset(schema).DeleteWithContents(context.TODO())
}), fmt.Sprintf("Failed dropping dataset %s for BigQuery", schema))
})

t.Run("Merge Mode", func(t *testing.T) {
Expand All @@ -66,12 +72,12 @@ func TestBigQueryIntegration(t *testing.T) {

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: handle.DB,
BQ: db,
Type: client.BQClient,
},
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
Expand Down Expand Up @@ -112,47 +118,98 @@ func TestBigQueryIntegration(t *testing.T) {
})

t.Run("Append Mode", func(t *testing.T) {
require.NoError(t, testhelper.SetConfig([]warehouseutils.KeyValue{
testCases := []struct {
name string
customPartitionsEnabledWorkspaceIDs []string
prerequisite func(t *testing.T)
}{
{
Key: "Warehouse.bigquery.isDedupEnabled",
Value: false,
name: "Append mode without custom partitions",
},
}))

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: handle.DB,
Type: client.BQClient,
{
name: "Append mode with custom partitions",
customPartitionsEnabledWorkspaceIDs: []string{"BpLnfgDsc2WD8F2qNfHK5a84jjJ"},
prerequisite: func(t *testing.T) {
err = db.Dataset(schema).Create(context.Background(), &bigquery.DatasetMetadata{
Location: "US",
})
require.NoError(t, err)

err = db.Dataset(schema).Table("tracks").Create(
context.Background(),
&bigquery.TableMetadata{
Schema: []*bigquery.FieldSchema{{
Name: "timestamp",
Type: bigquery.TimestampFieldType,
}},
TimePartitioning: &bigquery.TimePartitioning{
Field: "timestamp",
},
})
require.NoError(t, err)
},
},
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
}

// Scenario 1
warehouseTest.TimestampBeforeSendingEvents = timeutil.Now()
warehouseTest.UserId = testhelper.GetUserId(warehouseutils.BQ)

sendEventsMap := testhelper.SendEventsMap()
testhelper.SendEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
_ = db.Dataset(schema).DeleteWithContents(context.TODO())

if tc.prerequisite != nil {
tc.prerequisite(t)
}

require.NoError(t, testhelper.SetConfig([]warehouseutils.KeyValue{
{
Key: "Warehouse.bigquery.isDedupEnabled",
Value: false,
},
{
Key: "Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs",
Value: tc.customPartitionsEnabledWorkspaceIDs,
},
}))

warehouseTest := &testhelper.WareHouseTest{
Client: &client.Client{
BQ: db,
Type: client.BQClient,
},
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
}

// Scenario 1
warehouseTest.TimestampBeforeSendingEvents = timeutil.Now()
warehouseTest.UserId = testhelper.GetUserId(warehouseutils.BQ)

sendEventsMap := testhelper.SendEventsMap()
testhelper.SendEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
}
})
}

func TestBigQueryConfigurationValidation(t *testing.T) {
t.Skip()

configurations := testhelper.PopulateTemplateConfigurations()
bqCredentials, err := testhelper.BigqueryCredentials()
require.NoError(t, err)
Expand Down Expand Up @@ -233,10 +290,5 @@ func TestMain(m *testing.M) {
return
}

handle = &TestHandle{
WriteKey: "J77aX7tLFJ84qYU6UrN8ctecwZt",
Schema: testhelper.Schema(warehouseutils.BQ, testhelper.BigqueryIntegrationTestSchema),
Tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups", "groups"},
}
os.Exit(testhelper.Run(m, handle))
os.Exit(testhelper.Run(m, &TestHandle{}))
}