From 07093b17a454b25b8ce873c9129f953d37ffa91b Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Fri, 11 Nov 2022 15:17:30 +0530 Subject: [PATCH] chore(warehouse): use fastUUID with google UUID generation (#2598) --- services/pgnotifier/pgnotifier.go | 3 +-- warehouse/bigquery/bigquery_source_test.go | 5 +++-- warehouse/bigquery/bigquery_test.go | 7 ++++--- warehouse/constraint.go | 5 +++-- warehouse/deltalake/deltalake.go | 3 +-- warehouse/deltalake/deltalake_test.go | 8 ++++---- warehouse/identity/identity.go | 7 +++---- warehouse/redshift/redshift.go | 5 ++--- warehouse/redshift/redshift_source_test.go | 6 +++--- warehouse/slave.go | 5 ++--- warehouse/snowflake/snowflake_source_test.go | 5 +++-- warehouse/testhelper/setup.go | 5 ++--- warehouse/upload.go | 3 +-- warehouse/utils/utils.go | 12 ++++++++++-- warehouse/utils/utils_test.go | 6 +----- warehouse/validations/helper.go | 11 ++--------- warehouse/validations/validate.go | 4 ++-- 17 files changed, 47 insertions(+), 53 deletions(-) diff --git a/services/pgnotifier/pgnotifier.go b/services/pgnotifier/pgnotifier.go index b786174ff5..72b8b29dfb 100644 --- a/services/pgnotifier/pgnotifier.go +++ b/services/pgnotifier/pgnotifier.go @@ -9,7 +9,6 @@ import ( "time" "github.com/allisson/go-pglock/v2" - "github.com/gofrs/uuid" "github.com/lib/pq" "github.com/spaolacci/murmur3" @@ -535,7 +534,7 @@ func (notifier *PgNotifierT) Publish(payload MessagePayload, schema *whUtils.Sch } defer stmt.Close() - batchID := uuid.Must(uuid.NewV4()).String() + batchID := misc.FastUUID().String() pkgLogger.Infof("PgNotifier: Inserting %d records into %s as batch: %s", len(jobs), queueName, batchID) for _, job := range jobs { _, err = stmt.Exec(batchID, WaitingState, string(job), notifier.workspaceIdentifier, priority, payload.JobType) diff --git a/warehouse/bigquery/bigquery_source_test.go b/warehouse/bigquery/bigquery_source_test.go index 812effa847..8bedfa5cdf 100644 --- a/warehouse/bigquery/bigquery_source_test.go +++ b/warehouse/bigquery/bigquery_source_test.go @@ -10,11 +10,12 @@ import ( "os" "testing" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/stretchr/testify/require" "cloud.google.com/go/bigquery" - "github.com/gofrs/uuid" bigquery2 "github.com/rudderlabs/rudder-server/warehouse/bigquery" "github.com/rudderlabs/rudder-server/warehouse/client" "github.com/rudderlabs/rudder-server/warehouse/testhelper" @@ -106,7 +107,7 @@ func TestBigQueryIntegration(t *testing.T) { DestinationID: handle.DestinationId, Schema: handle.Schema, Tables: handle.Tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.BQ, LatestSourceRunConfig: testhelper.DefaultSourceRunConfig(), } diff --git a/warehouse/bigquery/bigquery_test.go b/warehouse/bigquery/bigquery_test.go index 3eb725b968..c67021d50c 100644 --- a/warehouse/bigquery/bigquery_test.go +++ b/warehouse/bigquery/bigquery_test.go @@ -9,6 +9,8 @@ import ( "os" "testing" + "github.com/rudderlabs/rudder-server/utils/misc" + backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" "github.com/rudderlabs/rudder-server/utils/timeutil" @@ -17,7 +19,6 @@ import ( "cloud.google.com/go/bigquery" - "github.com/gofrs/uuid" bigquery2 "github.com/rudderlabs/rudder-server/warehouse/bigquery" "github.com/rudderlabs/rudder-server/warehouse/client" "github.com/rudderlabs/rudder-server/warehouse/testhelper" @@ -78,7 +79,7 @@ func TestBigQueryIntegration(t *testing.T) { WriteKey: writeKey, Schema: schema, Tables: tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.BQ, SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR", DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn", @@ -180,7 +181,7 @@ func TestBigQueryIntegration(t *testing.T) { WriteKey: writeKey, Schema: schema, Tables: tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.BQ, SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR", DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn", diff --git a/warehouse/constraint.go b/warehouse/constraint.go index 4088b49603..cef43ac603 100644 --- a/warehouse/constraint.go +++ b/warehouse/constraint.go @@ -3,7 +3,8 @@ package warehouse import ( "fmt" - "github.com/gofrs/uuid" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/iancoleman/strcase" "github.com/rudderlabs/rudder-server/config" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -102,6 +103,6 @@ func (ic *IndexConstraintT) violates(brEvent *BatchRouterEventT, columnName stri } return &ConstraintsViolationT{ IsViolated: concatenatedLength > ic.Limit, - ViolatedIdentifier: fmt.Sprintf(`%s-%s`, strcase.ToKebab(warehouseutils.DiscardsTable), uuid.Must(uuid.NewV4()).String()), + ViolatedIdentifier: fmt.Sprintf(`%s-%s`, strcase.ToKebab(warehouseutils.DiscardsTable), misc.FastUUID().String()), } } diff --git a/warehouse/deltalake/deltalake.go b/warehouse/deltalake/deltalake.go index bd4e7a28f4..be7832dc95 100644 --- a/warehouse/deltalake/deltalake.go +++ b/warehouse/deltalake/deltalake.go @@ -8,7 +8,6 @@ import ( "github.com/iancoleman/strcase" - "github.com/gofrs/uuid" "github.com/rudderlabs/rudder-server/config" proto "github.com/rudderlabs/rudder-server/proto/databricks" "github.com/rudderlabs/rudder-server/services/stats" @@ -205,7 +204,7 @@ func Connect(cred *databricks.CredentialsT, connectTimeout time.Duration) (dbHan } ctx := context.Background() - identifier := uuid.Must(uuid.NewV4()).String() + identifier := misc.FastUUID().String() connConfig := &proto.ConnectionConfig{ Host: cred.Host, Port: cred.Port, diff --git a/warehouse/deltalake/deltalake_test.go b/warehouse/deltalake/deltalake_test.go index 57ddefb921..3bdc28c2ef 100644 --- a/warehouse/deltalake/deltalake_test.go +++ b/warehouse/deltalake/deltalake_test.go @@ -8,12 +8,12 @@ import ( "os" "testing" + "github.com/rudderlabs/rudder-server/utils/misc" + backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" "github.com/rudderlabs/rudder-server/utils/timeutil" - "github.com/gofrs/uuid" - proto "github.com/rudderlabs/rudder-server/proto/databricks" "github.com/rudderlabs/rudder-server/warehouse/client" @@ -89,7 +89,7 @@ func TestDeltalakeIntegration(t *testing.T) { WriteKey: handle.WriteKey, Schema: handle.Schema, Tables: handle.Tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.DELTALAKE, SourceID: "25H5EpYzojqQSepRSaGBrrPx3e4", DestinationID: "25IDjdnoEus6DDNrth3SWO1FOpu", @@ -144,7 +144,7 @@ func TestDeltalakeIntegration(t *testing.T) { WriteKey: handle.WriteKey, Schema: handle.Schema, Tables: handle.Tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.DELTALAKE, SourceID: "25H5EpYzojqQSepRSaGBrrPx3e4", DestinationID: "25IDjdnoEus6DDNrth3SWO1FOpu", diff --git a/warehouse/identity/identity.go b/warehouse/identity/identity.go index aa5cf5aa45..42fc4d7b7d 100644 --- a/warehouse/identity/identity.go +++ b/warehouse/identity/identity.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/gofrs/uuid" "github.com/lib/pq" "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/filemanager" @@ -86,7 +85,7 @@ func (idr *HandleT) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWrit // generate new one and assign to these two var rudderID string if len(rudderIDs) == 0 { - rudderID = uuid.Must(uuid.NewV4()).String() + rudderID = misc.FastUUID().String() } else { rudderID = rudderIDs[0] } @@ -173,7 +172,7 @@ func (idr *HandleT) addRules(txn *sql.Tx, loadFileNames []string, gzWriter *misc // add rules from load files into temp table // use original table to delete redundant ones from temp table // insert from temp table into original table - mergeRulesStagingTable := fmt.Sprintf(`rudder_identity_merge_rules_staging_%s`, strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")) + mergeRulesStagingTable := fmt.Sprintf(`rudder_identity_merge_rules_staging_%s`, warehouseutils.RandHex()) sqlStatement := fmt.Sprintf(`CREATE TEMP TABLE %s ON COMMIT DROP AS SELECT * FROM %s @@ -444,7 +443,7 @@ func (idr *HandleT) createTempGzFile(dirName string) (gzWriter misc.GZipWriter, panic(err) } fileExtension := warehouseutils.GetTempFileExtension(idr.Warehouse.Type) - path = tmpDirPath + dirName + fmt.Sprintf(`%s_%s/%v/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, idr.UploadID) + uuid.Must(uuid.NewV4()).String() + "." + fileExtension + path = tmpDirPath + dirName + fmt.Sprintf(`%s_%s/%v/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, idr.UploadID) + misc.FastUUID().String() + "." + fileExtension err = os.MkdirAll(filepath.Dir(path), os.ModePerm) if err != nil { panic(err) diff --git a/warehouse/redshift/redshift.go b/warehouse/redshift/redshift.go index e501779769..2026ab8b4c 100644 --- a/warehouse/redshift/redshift.go +++ b/warehouse/redshift/redshift.go @@ -13,7 +13,6 @@ import ( "github.com/tidwall/gjson" - "github.com/gofrs/uuid" "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/utils/logger" @@ -275,7 +274,7 @@ func (rs *HandleT) generateManifest(tableName string, _ map[string]string) (stri if err != nil { panic(err) } - localManifestPath := fmt.Sprintf("%v%v", tmpDirPath+dirName, uuid.Must(uuid.NewV4()).String()) + localManifestPath := fmt.Sprintf("%v%v", tmpDirPath+dirName, misc.FastUUID().String()) err = os.MkdirAll(filepath.Dir(localManifestPath), os.ModePerm) if err != nil { panic(err) @@ -301,7 +300,7 @@ func (rs *HandleT) generateManifest(tableName string, _ map[string]string) (stri return "", err } - uploadOutput, err := uploader.Upload(context.TODO(), file, manifestFolder, rs.Warehouse.Source.ID, rs.Warehouse.Destination.ID, time.Now().Format("01-02-2006"), tableName, uuid.Must(uuid.NewV4()).String()) + uploadOutput, err := uploader.Upload(context.TODO(), file, manifestFolder, rs.Warehouse.Source.ID, rs.Warehouse.Destination.ID, time.Now().Format("01-02-2006"), tableName, misc.FastUUID().String()) if err != nil { return "", err } diff --git a/warehouse/redshift/redshift_source_test.go b/warehouse/redshift/redshift_source_test.go index 32617ff778..457e7b50d2 100644 --- a/warehouse/redshift/redshift_source_test.go +++ b/warehouse/redshift/redshift_source_test.go @@ -10,9 +10,9 @@ import ( "os" "testing" - "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/gofrs/uuid" + "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-server/warehouse/client" "github.com/rudderlabs/rudder-server/warehouse/redshift" @@ -95,7 +95,7 @@ func TestSourceRedshiftIntegration(t *testing.T) { SourceID: handle.SourceId, DestinationID: handle.DestinationId, LatestSourceRunConfig: testhelper.DefaultSourceRunConfig(), - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.RS, } diff --git a/warehouse/slave.go b/warehouse/slave.go index df65e4d27c..05f840dcff 100644 --- a/warehouse/slave.go +++ b/warehouse/slave.go @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/gofrs/uuid" "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/services/pgnotifier" @@ -187,7 +186,7 @@ func (job *Payload) getDiscardsTable() string { func (jobRun *JobRunT) getLoadFilePath(tableName string) string { job := jobRun.job - randomness := uuid.Must(uuid.NewV4()).String() + randomness := misc.FastUUID().String() return strings.TrimSuffix(jobRun.stagingFilePath, "json.gz") + tableName + fmt.Sprintf(`.%s`, randomness) + fmt.Sprintf(`.%s`, warehouseutils.GetLoadFileFormat(job.DestinationType)) } @@ -679,7 +678,7 @@ func processClaimedAsyncJob(claimedJob pgnotifier.ClaimT) { func setupSlave(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) - slaveID := uuid.Must(uuid.NewV4()).String() + slaveID := misc.FastUUID().String() jobNotificationChannel := notifier.Subscribe(ctx, slaveID, noOfSlaveWorkerRoutines) for workerIdx := 0; workerIdx <= noOfSlaveWorkerRoutines-1; workerIdx++ { idx := workerIdx diff --git a/warehouse/snowflake/snowflake_source_test.go b/warehouse/snowflake/snowflake_source_test.go index 0d67860075..ff94fe5391 100644 --- a/warehouse/snowflake/snowflake_source_test.go +++ b/warehouse/snowflake/snowflake_source_test.go @@ -10,9 +10,10 @@ import ( "os" "testing" + "github.com/rudderlabs/rudder-server/utils/misc" + // "github.com/stretchr/testify/require" - "github.com/gofrs/uuid" "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-server/warehouse/client" @@ -93,7 +94,7 @@ func TestSnowflakeIntegration(t *testing.T) { }, Schema: handle.Schema, Tables: handle.Tables, - MessageId: uuid.Must(uuid.NewV4()).String(), + MessageId: misc.FastUUID().String(), Provider: warehouseutils.SNOWFLAKE, SourceID: handle.SourceId, DestinationID: handle.DestinationId, diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 31b617bae7..5681cc48a0 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -32,7 +32,6 @@ import ( "github.com/joho/godotenv" - "github.com/gofrs/uuid" azuresynapse "github.com/rudderlabs/rudder-server/warehouse/azure-synapse" "github.com/rudderlabs/rudder-server/warehouse/datalake" @@ -621,7 +620,7 @@ func DefaultSourceRunConfig() map[string]string { } func GetUserId(userType string) string { - return fmt.Sprintf("userId_%s_%s", strings.ToLower(userType), strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")) + return fmt.Sprintf("userId_%s_%s", strings.ToLower(userType), warehouseutils.RandHex()) } func CreateBucketForMinio(t testing.TB, bucketName string) { @@ -849,7 +848,7 @@ func DatabricksCredentials() (credentials databricks.CredentialsT, err error) { func (w *WareHouseTest) MsgId() string { if w.MessageId == "" { - return uuid.Must(uuid.NewV4()).String() + return misc.FastUUID().String() } return w.MessageId } diff --git a/warehouse/upload.go b/warehouse/upload.go index c4325d83ce..224dee90bb 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -12,7 +12,6 @@ import ( "time" "github.com/cenkalti/backoff/v4" - "github.com/gofrs/uuid" "github.com/lib/pq" "github.com/tidwall/gjson" @@ -1846,7 +1845,7 @@ func (job *UploadJobT) createLoadFiles(generateAll bool) (startLoadFileID, endLo publishBatchSize := config.GetInt("Warehouse.pgNotifierPublishBatchSize", 100) pkgLogger.Infof("[WH]: Starting batch processing %v stage files for %s:%s", publishBatchSize, destType, destID) - uniqueLoadGenID := uuid.Must(uuid.NewV4()).String() + uniqueLoadGenID := misc.FastUUID().String() job.upload.LoadFileGenStartTime = timeutil.Now() // Getting distinct destination revision ID from staging files metadata diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index ab60ad7200..de9d72903a 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -5,6 +5,7 @@ import ( "context" "crypto/sha512" "database/sql" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -21,7 +22,6 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/sts" - "github.com/gofrs/uuid" "github.com/iancoleman/strcase" "github.com/tidwall/gjson" @@ -1075,8 +1075,16 @@ func StagingTablePrefix(provider string) string { } func StagingTableName(provider, tableName string, tableNameLimit int) string { - randomNess := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + randomNess := RandHex() prefix := StagingTablePrefix(provider) stagingTableName := fmt.Sprintf(`%s%s_%s`, prefix, tableName, randomNess) return misc.TruncateStr(stagingTableName, tableNameLimit) } + +// RandHex returns a random hex string of length 32 +func RandHex() string { + u := misc.FastUUID() + var buf [32]byte + hex.Encode(buf[:], u[:]) + return string(buf[:]) +} diff --git a/warehouse/utils/utils_test.go b/warehouse/utils/utils_test.go index a84e2cdd2b..dbe9baf7b4 100644 --- a/warehouse/utils/utils_test.go +++ b/warehouse/utils/utils_test.go @@ -17,8 +17,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/gofrs/uuid" - "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-server/config" @@ -1396,9 +1394,7 @@ var _ = Describe("Utils", func() { It("SSL keys", func() { destinationID := "destID" - clientKey := uuid.Must(uuid.NewV4()).String() - clientCert := uuid.Must(uuid.NewV4()).String() - serverCA := uuid.Must(uuid.NewV4()).String() + clientKey, clientCert, serverCA := misc.FastUUID().String(), misc.FastUUID().String(), misc.FastUUID().String() err := WriteSSLKeys(backendconfig.DestinationT{ID: destinationID, Config: map[string]interface{}{"clientKey": clientKey, "clientCert": clientCert, "serverCA": serverCA}}) Expect(err).To(Equal(WriteSSLKeyError{})) diff --git a/warehouse/validations/helper.go b/warehouse/validations/helper.go index ba4125a8a2..9179b23ede 100644 --- a/warehouse/validations/helper.go +++ b/warehouse/validations/helper.go @@ -3,9 +3,7 @@ package validations import ( "encoding/json" "fmt" - "strings" - "github.com/gofrs/uuid" backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/utils/misc" @@ -15,8 +13,7 @@ import ( func warehouse(req *DestinationValidationRequest) warehouseutils.Warehouse { destination := req.Destination - randomSourceId := randomString() - randomSourceName := randomString() + randomSourceId, randomSourceName := warehouseutils.RandHex(), warehouseutils.RandHex() return warehouseutils.Warehouse{ Source: backendconfig.SourceT{ ID: randomSourceId, @@ -58,10 +55,6 @@ func parseOptions(req json.RawMessage, v interface{}) error { return nil } -func randomString() string { - return strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") -} - func stagingTableName() string { - return fmt.Sprintf(`%s_%s`, warehouseutils.CTStagingTablePrefix, randomString()) + return fmt.Sprintf(`%s_%s`, warehouseutils.CTStagingTablePrefix, warehouseutils.RandHex()) } diff --git a/warehouse/validations/validate.go b/warehouse/validations/validate.go index 902fc5ee0f..ccc6e4d15e 100644 --- a/warehouse/validations/validate.go +++ b/warehouse/validations/validate.go @@ -282,7 +282,7 @@ func uploadLoadFile(req *DestinationValidationRequest, filePath string) (uploadO defer func() { _ = uploadFile.Close() }() // uploading file to object storage - keyPrefixes := []string{connectionTestingFolder, destinationType, randomString(), time.Now().Format("01-02-2006")} + keyPrefixes := []string{connectionTestingFolder, destinationType, warehouseutils.RandHex(), time.Now().Format("01-02-2006")} uploadOutput, err = fm.Upload(context.TODO(), uploadFile, keyPrefixes...) if err != nil { pkgLogger.Errorf("[DCT]: Failed to upload filePath: %s with error: %s", filePath, err.Error()) @@ -310,7 +310,7 @@ func downloadLoadFile(req *DestinationValidationRequest, location string) (err e } // creating file path for temporary file - testFilePath := fmt.Sprintf("%v/%v/%v.%v.%v.%v", tmpDirPath, connectionTestingFolder, destinationType, randomString(), time.Now().Unix(), warehouseutils.GetLoadFileFormat(destinationType)) + testFilePath := fmt.Sprintf("%v/%v/%v.%v.%v.%v", tmpDirPath, connectionTestingFolder, destinationType, warehouseutils.RandHex(), time.Now().Unix(), warehouseutils.GetLoadFileFormat(destinationType)) err = os.MkdirAll(filepath.Dir(testFilePath), os.ModePerm) if err != nil { pkgLogger.Errorf("DCT: Failed to create directory at tempFilePath %s: with error: %s", testFilePath, err.Error())