Skip to content

Commit

Permalink
chore: enable errcheck and unparam linters for warehouse (#3970)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Oct 12, 2023
1 parent 19ca0f6 commit d99967f
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 38 deletions.
6 changes: 0 additions & 6 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ issues:
linters:
- bodyclose

# Temporary disable until we fix the number of issues
- path: 'warehouse'
linters:
- errcheck
- unparam

- path: 'cmd/rudder-cli/status/status.go'
linters:
- bodyclose
Expand Down
2 changes: 1 addition & 1 deletion warehouse/client/controlplane/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestFetchSSHKeys(t *testing.T) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))

w.WriteHeader(tc.responseCode)
w.Write([]byte(tc.responseBody))
_, _ = w.Write([]byte(tc.responseBody))
}))

defer svc.Close()
Expand Down
10 changes: 5 additions & 5 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (idr *Identity) applyRule(txn *sqlmiddleware.Tx, ruleID int64, gzWriter *mi
// TODO : support add row for parquet loader
eventLoader.AddRow(columnNames, row)
data, _ := eventLoader.WriteToString()
gzWriter.WriteGZ(data)
_ = gzWriter.WriteGZ(data)
}

return len(rows), err
Expand Down Expand Up @@ -372,7 +372,7 @@ func (idr *Identity) writeTableToFile(tableName string, txn *sqlmiddleware.Tx, g
eventLoader.AddColumn(columnName, "", rowData[i])
}
rowString, _ := eventLoader.WriteToString()
gzWriter.WriteGZ(rowString)
_ = gzWriter.WriteGZ(rowString)
}
if err = rows.Err(); err != nil {
return
Expand Down Expand Up @@ -451,7 +451,7 @@ func (idr *Identity) processMergeRules(ctx context.Context, fileNames []string)
pkgLogger.Errorf(`IDR: Error adding rules to %s: %v`, idr.mergeRulesTable(), err)
return
}
mergeRulesFileGzWriter.CloseGZ()
_ = mergeRulesFileGzWriter.CloseGZ()
pkgLogger.Infof(`IDR: Added %d unique rules to %s and file`, len(ruleIDs), idr.mergeRulesTable())
// END: Add new merge rules to local pg table and also to file

Expand Down Expand Up @@ -479,7 +479,7 @@ func (idr *Identity) processMergeRules(ctx context.Context, fileNames []string)
)
}
}
mappingsFileGzWriter.CloseGZ()
_ = mappingsFileGzWriter.CloseGZ()
// END: Add new/changed identity mappings to local pg table and also to file

// upload new merge rules to object storage
Expand Down Expand Up @@ -526,7 +526,7 @@ func (idr *Identity) ResolveHistoricIdentities(ctx context.Context) (err error)
defer misc.RemoveFilePaths(loadFileNames...)
gzWriter, path := idr.createTempGzFile(fmt.Sprintf(`/%s/`, misc.RudderIdentityMergeRulesTmp))
err = idr.warehouseManager.DownloadIdentityRules(ctx, &gzWriter)
gzWriter.CloseGZ()
_ = gzWriter.CloseGZ()
if err != nil {
pkgLogger.Errorf(`IDR: Failed to download identity information from warehouse with error: %v`, err)
return
Expand Down
6 changes: 1 addition & 5 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func (as *AzureSynapse) CrashRecover(ctx context.Context) {
as.dropDanglingStagingTables(ctx)
}

func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool {
func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) {
sqlStatement := fmt.Sprintf(`
select
table_name
Expand All @@ -855,7 +855,6 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool {
rows, err := as.DB.QueryContext(ctx, sqlStatement)
if err != nil {
as.logger.Errorf("WH: SYNAPSE: Error dropping dangling staging tables in synapse: %v\nQuery: %s\n", err, sqlStatement)
return false
}
defer func() { _ = rows.Close() }()

Expand All @@ -872,15 +871,12 @@ func (as *AzureSynapse) dropDanglingStagingTables(ctx context.Context) bool {
panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err))
}
as.logger.Infof("WH: SYNAPSE: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
_, err := as.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, as.Namespace, stagingTableName))
if err != nil {
as.logger.Errorf("WH: SYNAPSE: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err)
delSuccess = false
}
}
return delSuccess
}

// FetchSchema returns the schema of the warehouse
Expand Down
7 changes: 2 additions & 5 deletions warehouse/integrations/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ func (ms *MSSQL) CrashRecover(ctx context.Context) {
ms.dropDanglingStagingTables(ctx)
}

func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool {
func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) {
sqlStatement := fmt.Sprintf(`
select
table_name
Expand All @@ -866,7 +866,7 @@ func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool {
rows, err := ms.DB.QueryContext(ctx, sqlStatement)
if err != nil {
ms.logger.Errorf("WH: MSSQL: Error dropping dangling staging tables in MSSQL: %v\nQuery: %s\n", err, sqlStatement)
return false
return
}
defer func() { _ = rows.Close() }()

Expand All @@ -883,15 +883,12 @@ func (ms *MSSQL) dropDanglingStagingTables(ctx context.Context) bool {
panic(fmt.Errorf("iterating result from query: %s\nwith Error : %w", sqlStatement, err))
}
ms.logger.Infof("WH: MSSQL: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
_, err := ms.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, ms.Namespace, stagingTableName))
if err != nil {
ms.logger.Errorf("WH: MSSQL: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err)
delSuccess = false
}
}
return delSuccess
}

// FetchSchema queries mssql and returns the schema associated with provided namespace
Expand Down
7 changes: 2 additions & 5 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ func (rs *Redshift) connect(ctx context.Context) (*sqlmiddleware.DB, error) {
return middleware, nil
}

func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool {
func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) {
sqlStatement := `
SELECT
table_name
Expand All @@ -1052,7 +1052,7 @@ func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool {
)
if err != nil {
rs.logger.Errorf("WH: RS: Error dropping dangling staging tables in redshift: %v\nQuery: %s\n", err, sqlStatement)
return false
return
}
defer func() { _ = rows.Close() }()

Expand All @@ -1069,15 +1069,12 @@ func (rs *Redshift) dropDanglingStagingTables(ctx context.Context) bool {
panic(fmt.Errorf("iterate result from query: %s\nwith Error : %w", sqlStatement, err))
}
rs.logger.Infof("WH: RS: Dropping dangling staging tables: %+v %+v\n", len(stagingTableNames), stagingTableNames)
delSuccess := true
for _, stagingTableName := range stagingTableNames {
_, err := rs.DB.ExecContext(ctx, fmt.Sprintf(`DROP TABLE "%[1]s"."%[2]s"`, rs.Namespace, stagingTableName))
if err != nil {
rs.logger.Errorf("WH: RS: Error dropping dangling staging table: %s in redshift: %v\n", stagingTableName, err)
delSuccess = false
}
}
return delSuccess
}

func (rs *Redshift) CreateSchema(ctx context.Context) (err error) {
Expand Down
21 changes: 10 additions & 11 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand All @@ -642,7 +642,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand All @@ -663,7 +663,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

c := config.New()
c.Set("Warehouse.snowflake.debugDuplicateWorkspaceIDs", []string{workspaceID})
Expand Down Expand Up @@ -719,7 +719,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, true)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, true)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand Down Expand Up @@ -766,7 +766,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, true, false)

c := config.New()
c.Set("Warehouse.snowflake.loadTableStrategy", "APPEND")
Expand Down Expand Up @@ -826,7 +826,7 @@ func TestIntegration(t *testing.T) {
loadFiles := []whutils.LoadFile{{
Location: "https://bucket.s3.amazonaws.com/rudder-warehouse-load-objects/load_file_not_exists_test_table/test_source_id/0ef75cb0-3fd0-4408-98b9-2bea9e476916-load_file_not_exists_test_table/load.csv.gz",
}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand All @@ -849,7 +849,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand Down Expand Up @@ -894,7 +894,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := testhelper.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand All @@ -921,7 +921,7 @@ func TestIntegration(t *testing.T) {
})

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, discardsSchema, discardsSchema, whutils.LoadFileTypeCsv, false, false)
mockUploader := newMockUploader(t, loadFiles, tableName, discardsSchema, discardsSchema, false, false)

sf, err := snowflake.New(config.Default, logger.NOP, stats.Default)
require.NoError(t, err)
Expand Down Expand Up @@ -1023,7 +1023,6 @@ func newMockUploader(
tableName string,
schemaInUpload model.TableSchema,
schemaInWarehouse model.TableSchema,
loadFileType string,
canAppend bool,
dedupUseNewRecord bool,
) whutils.Uploader {
Expand All @@ -1042,7 +1041,7 @@ func newMockUploader(
mockUploader.EXPECT().GetSampleLoadFileLocation(gomock.Any(), gomock.Any()).Return(loadFiles[0].Location, nil).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetLoadFileType().Return(loadFileType).AnyTimes()
mockUploader.EXPECT().GetLoadFileType().Return(whutils.LoadFileTypeCsv).AnyTimes()

return mockUploader
}
Expand Down

0 comments on commit d99967f

Please sign in to comment.