From d99967ffba83753cdde641f06b62baa6bc923294 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Thu, 12 Oct 2023 21:39:42 +0530 Subject: [PATCH] chore: enable errcheck and unparam linters for warehouse (#3970) --- .golangci.yml | 6 ------ warehouse/client/controlplane/client_test.go | 2 +- warehouse/identity/identity.go | 10 ++++----- .../azure-synapse/azure-synapse.go | 6 +----- warehouse/integrations/mssql/mssql.go | 7 ++----- warehouse/integrations/redshift/redshift.go | 7 ++----- .../integrations/snowflake/snowflake_test.go | 21 +++++++++---------- 7 files changed, 21 insertions(+), 38 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 7d7a576886..753738e19d 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -52,12 +52,6 @@ issues: linters: - bodyclose - # Temporary disable until we fix the number of issues - - path: 'warehouse' - linters: - - errcheck - - unparam - - path: 'cmd/rudder-cli/status/status.go' linters: - bodyclose diff --git a/warehouse/client/controlplane/client_test.go b/warehouse/client/controlplane/client_test.go index 54254131c8..7782e723ad 100644 --- a/warehouse/client/controlplane/client_test.go +++ b/warehouse/client/controlplane/client_test.go @@ -66,7 +66,7 @@ func TestFetchSSHKeys(t *testing.T) { require.Equal(t, "application/json", r.Header.Get("Content-Type")) w.WriteHeader(tc.responseCode) - w.Write([]byte(tc.responseBody)) + _, _ = w.Write([]byte(tc.responseBody)) })) defer svc.Close() diff --git a/warehouse/identity/identity.go b/warehouse/identity/identity.go index b425fe1dec..39e858f91d 100644 --- a/warehouse/identity/identity.go +++ b/warehouse/identity/identity.go @@ -189,7 +189,7 @@ func (idr *Identity) applyRule(txn *sqlmiddleware.Tx, ruleID int64, gzWriter *mi // TODO : support add row for parquet loader eventLoader.AddRow(columnNames, row) data, _ := eventLoader.WriteToString() - gzWriter.WriteGZ(data) + _ = gzWriter.WriteGZ(data) } return len(rows), err @@ -372,7 +372,7 @@ func (idr *Identity) writeTableToFile(tableName string, txn *sqlmiddleware.Tx, g eventLoader.AddColumn(columnName, "", rowData[i]) } rowString, _ := eventLoader.WriteToString() - gzWriter.WriteGZ(rowString) + _ = gzWriter.WriteGZ(rowString) } if err = rows.Err(); err != nil { return @@ -451,7 +451,7 @@ func (idr *Identity) processMergeRules(ctx context.Context, fileNames []string) pkgLogger.Errorf(`IDR: Error adding rules to %s: %v`, idr.mergeRulesTable(), err) return } - mergeRulesFileGzWriter.CloseGZ() + _ = mergeRulesFileGzWriter.CloseGZ() pkgLogger.Infof(`IDR: Added %d unique rules to %s and file`, len(ruleIDs), idr.mergeRulesTable()) // END: Add new merge rules to local pg table and also to file @@ -479,7 +479,7 @@ func (idr *Identity) processMergeRules(ctx context.Context, fileNames []string) ) } } - mappingsFileGzWriter.CloseGZ() + _ = mappingsFileGzWriter.CloseGZ() // END: Add new/changed identity mappings to local pg table and also to file // upload new merge rules to object storage @@ -526,7 +526,7 @@ func (idr *Identity) ResolveHistoricIdentities(ctx context.Context) (err error) defer misc.RemoveFilePaths(loadFileNames...) gzWriter, path := idr.createTempGzFile(fmt.Sprintf(`/%s/`, misc.RudderIdentityMergeRulesTmp)) err = idr.warehouseManager.DownloadIdentityRules(ctx, &gzWriter) - gzWriter.CloseGZ() + _ = gzWriter.CloseGZ() if err != nil { pkgLogger.Errorf(`IDR: Failed to download identity information from warehouse with error: %v`, err) return diff --git a/warehouse/integrations/azure-synapse/azure-synapse.go b/warehouse/integrations/azure-synapse/azure-synapse.go index f096a76908..2c61e9087e 100644 --- a/warehouse/integrations/azure-synapse/azure-synapse.go +++ b/warehouse/integrations/azure-synapse/azure-synapse.go @@ -839,7 +839,7 @@ func (as *AzureSynapse) CrashRecover(ctx context.Context) { as.dropDanglingStagingTables(ctx) } -func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool { +func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) { sqlStatement := fmt.Sprintf(` select table_name @@ -855,7 +855,6 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool { rows, err := as.DB.QueryContext(ctx, sqlStatement) if err != nil { as.logger.Errorf("WH: SYNAPSE: Error dropping dangling staging tables in synapse: %v\nQuery: %s\n", err, sqlStatement) - return false } defer func() { _ = rows.Close() }() @@ -872,15 +871,12 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool { panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err)) } as.logger.Infof("WH: SYNAPSE: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) - delSuccess := true for _, stagingTableName := range stagingTableNames { _, err := as.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, as.Namespace, stagingTableName)) if err != nil { as.logger.Errorf("WH: SYNAPSE: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err) - delSuccess = false } } - return delSuccess } // FetchSchema returns the schema of the warehouse diff --git a/warehouse/integrations/mssql/mssql.go b/warehouse/integrations/mssql/mssql.go index 9dedb8a583..8071c9d6a9 100644 --- a/warehouse/integrations/mssql/mssql.go +++ b/warehouse/integrations/mssql/mssql.go @@ -850,7 +850,7 @@ func (ms *MSSQL) CrashRecover(ctx context.Context) { ms.dropDanglingStagingTables(ctx) } -func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool { +func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) { sqlStatement := fmt.Sprintf(` select table_name @@ -866,7 +866,7 @@ func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool { rows, err := ms.DB.QueryContext(ctx, sqlStatement) if err != nil { ms.logger.Errorf("WH: MSSQL: Error dropping dangling staging tables in MSSQL: %v\nQuery: %s\n", err, sqlStatement) - return false + return } defer func() { _ = rows.Close() }() @@ -883,15 +883,12 @@ func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool { panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err)) } ms.logger.Infof("WH: MSSQL: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) - delSuccess := true for _, stagingTableName := range stagingTableNames { _, err := ms.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, ms.Namespace, stagingTableName)) if err != nil { ms.logger.Errorf("WH: MSSQL: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err) - delSuccess = false } } - return delSuccess } // FetchSchema queries mssql and returns the schema associated with provided namespace diff --git a/warehouse/integrations/redshift/redshift.go b/warehouse/integrations/redshift/redshift.go index a86ac26c7a..02b1db5f01 100644 --- a/warehouse/integrations/redshift/redshift.go +++ b/warehouse/integrations/redshift/redshift.go @@ -1035,7 +1035,7 @@ func (rs *Redshift) connect(ctx context.Context) (*sqlmiddleware.DB, error) { return middleware, nil } -func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool { +func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) { sqlStatement := ` SELECT table_name @@ -1052,7 +1052,7 @@ func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool { ) if err != nil { rs.logger.Errorf("WH: RS: Error dropping dangling staging tables in redshift: %v\nQuery: %s\n", err, sqlStatement) - return false + return } defer func() { _ = rows.Close() }() @@ -1069,15 +1069,12 @@ func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool { panic(fmt.Errorf("iterate result from query: %s\nwith Error : %w", sqlStatement, err)) } rs.logger.Infof("WH: RS: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames) - delSuccess := true for _, stagingTableName := range stagingTableNames { _, err := rs.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, rs.Namespace, stagingTableName)) if err != nil { rs.logger.Errorf("WH: RS: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err) - delSuccess = false } } - return delSuccess } func (rs *Redshift) CreateSchema(ctx context.Context) (err error) { diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index 2e9c614b3b..51476b186e 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -625,7 +625,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -642,7 +642,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -663,7 +663,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) c := config.New() c.Set("Warehouse.snowflake.debugDuplicateWorkspaceIDs", []string{workspaceID}) @@ -719,7 +719,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, true) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, true) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -766,7 +766,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false) c := config.New() c.Set("Warehouse.snowflake.loadTableStrategy", "APPEND") @@ -826,7 +826,7 @@ func TestIntegration(t *testing.T) { loadFiles := []whutils.LoadFile{{ Location: "https://bucket.s3.amazonaws.com/rudder-warehouse-load-objects/load_file_not_exists_test_table/test_source_id/0ef75cb0-3fd0-4408-98b9-2bea9e476916-load_file_not_exists_test_table/load.csv.gz", }} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -849,7 +849,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -894,7 +894,7 @@ func TestIntegration(t *testing.T) { uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -921,7 +921,7 @@ func TestIntegration(t *testing.T) { }) loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}} - mockUploader := newMockUploader(t, loadFiles, tableName, discardsSchema, discardsSchema, whutils.LoadFileTypeCsv, false, false) + mockUploader := newMockUploader(t, loadFiles, tableName, discardsSchema, discardsSchema, false, false) sf, err := snowflake.New(config.Default, logger.NOP, stats.Default) require.NoError(t, err) @@ -1023,7 +1023,6 @@ func newMockUploader( tableName string, schemaInUpload model.TableSchema, schemaInWarehouse model.TableSchema, - loadFileType string, canAppend bool, dedupUseNewRecord bool, ) whutils.Uploader { @@ -1042,7 +1041,7 @@ func newMockUploader( mockUploader.EXPECT().GetSampleLoadFileLocation(gomock.Any(), gomock.Any()).Return(loadFiles[0].Location, nil).AnyTimes() mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes() mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes() - mockUploader.EXPECT().GetLoadFileType().Return(loadFileType).AnyTimes() + mockUploader.EXPECT().GetLoadFileType().Return(whutils.LoadFileTypeCsv).AnyTimes() return mockUploader }