diff --git a/warehouse/integrations/datalake/datalake_test.go b/warehouse/integrations/datalake/datalake_test.go index 80dbdc2cec..369979cd52 100644 --- a/warehouse/integrations/datalake/datalake_test.go +++ b/warehouse/integrations/datalake/datalake_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/minio/minio-go/v7" + "github.com/trinodb/trino-go-client/trino" "cloud.google.com/go/storage" @@ -539,6 +540,94 @@ func TestIntegration(t *testing.T) { 1*time.Second, ) require.Equal(t, int64(8), count) + + t.Log("By default parquet_use_column_index=true and parquet_ignore_statistics=false") + t.Log("parquet_use_column_index=true") + require.Eventually(t, func() bool { + err := db.QueryRowContext(ctx, ` + select + count(*) + from + minio.rudderstack.tracks + where + context_library_name = 'http' + `).Scan(&count) + if err != nil { + t.Log("select count with where clause: ", err) + } + + var e *trino.ErrQueryFailed + if err != nil && errors.As(err, &e) && e.StatusCode == 200 { + var ei *trino.ErrTrino + if errors.As(e.Reason, &ei) && ei.ErrorName == "HIVE_CURSOR_ERROR" { + return true + } + } + return false + }, + 60*time.Second, + 1*time.Second, + ) + + t.Log("parquet_use_column_index=false") + dsnWithoutIndex := fmt.Sprintf("http://user@localhost:%d?catalog=minio&schema=default&session_properties=minio.parquet_use_column_index=false", + c.Port("trino", 8080), + ) + dbWithoutIndex, err := sql.Open("trino", dsnWithoutIndex) + require.NoError(t, err) + t.Cleanup(func() { + _ = dbWithoutIndex.Close() + }) + + require.Eventually(t, func() bool { + err := dbWithoutIndex.QueryRowContext(ctx, ` + select + count(*) + from + minio.rudderstack.tracks + where + context_library_name = 'http' + `).Scan(&count) + if err != nil { + t.Log("select count with where clause: ", err) + return false + } + return true + }, + 60*time.Second, + 1*time.Second, + ) + require.Equal(t, int64(3), count) + + t.Logf("parquet_ignore_statistics=true") + dsnIgnoreStatistics := fmt.Sprintf("http://user@localhost:%d?catalog=minio&schema=default&session_properties=minio.parquet_ignore_statistics=true", + c.Port("trino", 8080), + ) + dbIgnoreStatistics, err := sql.Open("trino", dsnIgnoreStatistics) + require.NoError(t, err) + t.Cleanup(func() { + _ = dbIgnoreStatistics.Close() + }) + + require.Eventually(t, func() bool { + err := dbIgnoreStatistics.QueryRowContext(ctx, ` + select + count(*) + from + minio.rudderstack.tracks + where + context_library_name = 'http' + `).Scan(&count) + if err != nil { + t.Log("select count with where clause: ", err) + return false + } + return true + }, + 60*time.Second, + 1*time.Second, + ) + require.Equal(t, int64(3), count) }) t.Run("Spark", func(t *testing.T) {