Skip to content

Commit

Permalink
feat: introduce merge window for snowflake ingestion (#5160)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Oct 14, 2024
1 parent 3dff5e6 commit 0e44f18
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
15 changes: 14 additions & 1 deletion warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,20 @@ func (sf *Snowflake) mergeIntoLoadTable(
updateSet = fmt.Sprintf(`original.%[1]q = original.%[1]q`, strKeys[0])
}

configKeyPrefix := "Warehouse.snowflake.mergeWindow." + sf.Warehouse.Destination.ID
mergeWindowTables := sf.conf.GetStringSlice(configKeyPrefix+".tables", nil)

if slices.Contains(mergeWindowTables, tableName) {
mergeWindowDuration := sf.conf.GetDuration(configKeyPrefix+".duration", 30*24, time.Hour)
mergeWindowColumn := sf.conf.GetString(configKeyPrefix+".column", "RECEIVED_AT")

additionalJoinClause += fmt.Sprintf(
` AND original.%s >= DATEADD(hour, -%d, CURRENT_TIMESTAMP())`,
mergeWindowColumn,
int(mergeWindowDuration.Hours()),
)
}

mergeStmt := fmt.Sprintf(`MERGE INTO %[1]s.%[2]q AS original USING (
SELECT *
FROM
Expand Down Expand Up @@ -477,7 +491,6 @@ func (sf *Snowflake) mergeIntoLoadTable(
sortedColumnNames, stagingColumnNames,
updateSet,
)

var rowsInserted, rowsUpdated int64
err := db.QueryRowContext(ctx, mergeStmt).Scan(
&rowsInserted,
Expand Down
68 changes: 68 additions & 0 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,74 @@ func TestIntegration(t *testing.T) {
)
require.Equal(t, records, whth.DedupTestRecords())
})
t.Run("dedup window", func(t *testing.T) {
tableName := whutils.ToProviderCase(destType, "merge_test_window_table")

schema := model.TableSchema{
"ID": "string",
"RECEIVED_AT": "datetime",
}

now := time.Now()

uploadOutput := whth.UploadLoad(t, fm, tableName, [][]string{
// {"id", "received_at"},
{"1", now.Format(time.RFC3339)},
{"2", now.Add(-1 * time.Hour).Format(time.RFC3339)},
{"3", now.Add(-25 * time.Hour).Format(time.RFC3339)},
{"4", now.Add(-25 * time.Hour).Format(time.RFC3339)},
})

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schema, schema, true, false)

c := config.New()
c.Set("Warehouse.snowflake.mergeWindow."+warehouse.Destination.ID+".tables", tableName)
c.Set("Warehouse.snowflake.mergeWindow."+warehouse.Destination.ID+".column", "RECEIVED_AT")
c.Set("Warehouse.snowflake.mergeWindow."+warehouse.Destination.ID+".duration", "24h")

sf := snowflake.New(c, logger.NOP, stats.NOP)
err := sf.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)

err = sf.CreateSchema(ctx)
require.NoError(t, err)

err = sf.CreateTable(ctx, tableName, schema)
require.NoError(t, err)

loadTableStat, err := sf.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, int64(4), loadTableStat.RowsInserted)
require.Equal(t, int64(0), loadTableStat.RowsUpdated)

loadTableStat, err = sf.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, int64(2), loadTableStat.RowsInserted,
"2nd copy on the same table with the same data should not have any 'rows_loaded'")
require.Equal(t, int64(2), loadTableStat.RowsUpdated,
"2nd copy on the same table with the same data should not have any 'rows_updated'")

records := whth.RetrieveRecordsFromWarehouse(t, sf.DB.DB,
fmt.Sprintf(
`SELECT
id,
received_at,
FROM %q.%q
ORDER BY id;`,
namespace,
tableName,
),
)
require.Equal(t, [][]string{
{"1", now.Format(time.RFC3339)},
{"2", now.Add(-1 * time.Hour).Format(time.RFC3339)},
{"3", now.Add(-25 * time.Hour).Format(time.RFC3339)},
{"3", now.Add(-25 * time.Hour).Format(time.RFC3339)},
{"4", now.Add(-25 * time.Hour).Format(time.RFC3339)},
{"4", now.Add(-25 * time.Hour).Format(time.RFC3339)},
}, records)
})
})
t.Run("append", func(t *testing.T) {
tableName := whutils.ToProviderCase(destType, "append_test_table")
Expand Down
34 changes: 34 additions & 0 deletions warehouse/integrations/testhelper/setup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package testhelper

import (
"compress/gzip"
"context"
"database/sql"
"encoding/csv"
"fmt"
"os"
"slices"
Expand Down Expand Up @@ -196,6 +198,38 @@ func UploadLoadFile(
return uploadOutput
}

func UploadLoad(
t testing.TB,
fm filemanager.FileManager,
tableName string,
content [][]string,
) filemanager.UploadedFile {
t.Helper()

tmpFile, err := os.CreateTemp("", "upload_load_*.csv.gz")
require.NoError(t, err)
defer func() { _ = tmpFile.Close() }()
defer func() { _ = os.Remove(tmpFile.Name()) }()

gzipWriter := gzip.NewWriter(tmpFile)
defer gzipWriter.Close()

writer := csv.NewWriter(gzipWriter)
defer writer.Flush()

for _, record := range content {
err := writer.Write(record)
require.NoError(t, err)
}

// Ensure all data is written and compressed
writer.Flush()
err = gzipWriter.Close()
require.NoError(t, err)

return UploadLoadFile(t, fm, tmpFile.Name(), tableName)
}

// RetrieveRecordsFromWarehouse retrieves records from the warehouse based on the given query.
// It returns a slice of slices, where each inner slice represents a record's values.
func RetrieveRecordsFromWarehouse(
Expand Down

0 comments on commit 0e44f18

Please sign in to comment.