Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(go/adbc/driver/snowflake): Keep track of all files copied and skip empty files in bulk_ingestion #2106

Merged
merged 8 commits into from
Aug 30, 2024

Conversation

joellubi
Copy link
Member

@joellubi joellubi commented Aug 27, 2024

Fixes: #2094

Two primary changes:

  • Skip uploading parquet files that never had any rows written to them
    • This reduces number of files Snowflake must keep track of, improving success rate of the COPY command
  • Keep track of all files uploaded to stage and ensure they are actually copied
    • Retry the COPY command up to 5 times with backoff if files any files haven't been copied

Snowflake Test Output

➜  adbc git:(gh-2094) ✗ go test -v ./driver/snowflake/...
=== RUN   TestIngestBatchedParquetWithFileLimit
--- PASS: TestIngestBatchedParquetWithFileLimit (0.00s)
=== RUN   TestValidation
=== RUN   TestValidation/TestNewDatabase
=== RUN   TestValidation/TestAutocommitDefault
=== RUN   TestValidation/TestAutocommitToggle
=== RUN   TestValidation/TestCloseConnTwice
=== RUN   TestValidation/TestConcurrent
=== RUN   TestValidation/TestGetSetOptions
=== RUN   TestValidation/TestMetadataCurrentCatalog
=== RUN   TestValidation/TestMetadataCurrentDbSchema
=== RUN   TestValidation/TestMetadataGetInfo
=== RUN   TestValidation/TestMetadataGetObjectsColumns
=== RUN   TestValidation/TestMetadataGetObjectsColumns/depth_catalog_no_filter
=== RUN   TestValidation/TestMetadataGetObjectsColumns/depth_dbSchema_no_filter
=== RUN   TestValidation/TestMetadataGetObjectsColumns/depth_table_no_filter
=== RUN   TestValidation/TestMetadataGetObjectsColumns/depth_column_no_filter
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_catalog_valid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_catalog_invalid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_valid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_invalid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_table_valid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_table_invalid
=== RUN   TestValidation/TestMetadataGetObjectsColumns/filter_column:_in%
=== RUN   TestValidation/TestMetadataGetStatistics
=== RUN   TestValidation/TestMetadataGetTableSchema
=== RUN   TestValidation/TestMetadataGetTableTypes
=== RUN   TestValidation/TestNewConn
=== RUN   TestValidation/TestNewStatement
=== RUN   TestValidation/TestSQLPrepareGetParameterSchema
=== RUN   TestValidation/TestSQLPrepareSelectNoParams
=== RUN   TestValidation/TestSQLPrepareSelectParams
=== RUN   TestValidation/TestSqlExecuteSchema
=== RUN   TestValidation/TestSqlExecuteSchema/no_query
=== RUN   TestValidation/TestSqlExecuteSchema/query
=== RUN   TestValidation/TestSqlExecuteSchema/prepared
=== RUN   TestValidation/TestSqlIngestAppend
=== RUN   TestValidation/TestSqlIngestCreateAppend
=== RUN   TestValidation/TestSqlIngestErrors
=== RUN   TestValidation/TestSqlIngestErrors/ingest_without_bind
=== RUN   TestValidation/TestSqlIngestErrors/append_to_nonexistent_table
time="2024-08-28T10:58:21-04:00" level=error msg="error: 002003 (42S02): SQL compilation error:\nObject '\"bulk_ingest\"' does not exist or not authorized." func="gosnowflake.(*snowflakeConn).queryContextInternal" file="connection.go:398"
=== RUN   TestValidation/TestSqlIngestErrors/overwrite_and_incompatible_schema
=== RUN   TestValidation/TestSqlIngestInts
=== RUN   TestValidation/TestSqlIngestReplace
=== RUN   TestValidation/TestSqlPartitionedInts
=== RUN   TestValidation/TestSqlPrepareErrorParamCountMismatch
--- PASS: TestValidation (80.67s)
    --- PASS: TestValidation/TestNewDatabase (0.00s)
    --- PASS: TestValidation/TestAutocommitDefault (0.33s)
    --- PASS: TestValidation/TestAutocommitToggle (1.03s)
    --- PASS: TestValidation/TestCloseConnTwice (0.33s)
    --- PASS: TestValidation/TestConcurrent (0.95s)
    --- PASS: TestValidation/TestGetSetOptions (0.29s)
    --- PASS: TestValidation/TestMetadataCurrentCatalog (0.45s)
    --- PASS: TestValidation/TestMetadataCurrentDbSchema (0.44s)
    --- PASS: TestValidation/TestMetadataGetInfo (0.29s)
    --- PASS: TestValidation/TestMetadataGetObjectsColumns (45.51s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_catalog_no_filter (0.88s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_dbSchema_no_filter (0.92s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_table_no_filter (3.05s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_column_no_filter (5.61s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_valid (4.90s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_invalid (4.23s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_valid (4.92s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_invalid (3.70s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_table_valid (5.59s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_table_invalid (4.42s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_column:_in% (5.65s)
    --- PASS: TestValidation/TestMetadataGetStatistics (0.33s)
    --- PASS: TestValidation/TestMetadataGetTableSchema (1.92s)
    --- PASS: TestValidation/TestMetadataGetTableTypes (0.29s)
    --- PASS: TestValidation/TestNewConn (0.30s)
    --- PASS: TestValidation/TestNewStatement (0.39s)
    --- PASS: TestValidation/TestSQLPrepareGetParameterSchema (0.30s)
    --- PASS: TestValidation/TestSQLPrepareSelectNoParams (0.38s)
    --- SKIP: TestValidation/TestSQLPrepareSelectParams (0.29s)
    --- PASS: TestValidation/TestSqlExecuteSchema (0.52s)
        --- PASS: TestValidation/TestSqlExecuteSchema/no_query (0.00s)
        --- PASS: TestValidation/TestSqlExecuteSchema/query (0.07s)
        --- PASS: TestValidation/TestSqlExecuteSchema/prepared (0.06s)
    --- PASS: TestValidation/TestSqlIngestAppend (5.82s)
    --- PASS: TestValidation/TestSqlIngestCreateAppend (5.72s)
    --- PASS: TestValidation/TestSqlIngestErrors (1.08s)
        --- PASS: TestValidation/TestSqlIngestErrors/ingest_without_bind (0.00s)
        --- PASS: TestValidation/TestSqlIngestErrors/append_to_nonexistent_table (0.48s)
        --- SKIP: TestValidation/TestSqlIngestErrors/overwrite_and_incompatible_schema (0.00s)
    --- PASS: TestValidation/TestSqlIngestInts (3.46s)
    --- PASS: TestValidation/TestSqlIngestReplace (6.82s)
    --- PASS: TestValidation/TestSqlPartitionedInts (0.29s)
    --- SKIP: TestValidation/TestSqlPrepareErrorParamCountMismatch (0.31s)
=== RUN   TestSnowflake
=== RUN   TestSnowflake/TestAdditionalDriverInfo
=== RUN   TestSnowflake/TestDecimalHighPrecision
=== RUN   TestSnowflake/TestDescribeOnly
=== RUN   TestSnowflake/TestEmptyResultSet
=== RUN   TestSnowflake/TestIngestEmptyChunk
=== RUN   TestSnowflake/TestIntDecimalLowPrecision
=== RUN   TestSnowflake/TestJwtPrivateKey
    driver_test.go:1888: apache/arrow-adbc#1364
=== RUN   TestSnowflake/TestMetadataGetObjectsColumnsXdbc
=== RUN   TestSnowflake/TestMetadataOnlyQuery
=== RUN   TestSnowflake/TestNewDatabaseGetSetOptions
=== RUN   TestSnowflake/TestNonIntDecimalLowPrecision
=== RUN   TestSnowflake/TestSqlIngestDate64Type
=== RUN   TestSnowflake/TestSqlIngestHighPrecision
=== RUN   TestSnowflake/TestSqlIngestListType
=== RUN   TestSnowflake/TestSqlIngestLowPrecision
=== RUN   TestSnowflake/TestSqlIngestMapType
=== RUN   TestSnowflake/TestSqlIngestRecordAndStreamAreEquivalent
=== RUN   TestSnowflake/TestSqlIngestRoundtripTypes
=== RUN   TestSnowflake/TestSqlIngestStructType
=== RUN   TestSnowflake/TestSqlIngestTimestamp
=== RUN   TestSnowflake/TestSqlIngestTimestampTypes
=== RUN   TestSnowflake/TestStatementEmptyResultSet
=== RUN   TestSnowflake/TestTimestampSnow
=== RUN   TestSnowflake/TestUseHighPrecision
--- PASS: TestSnowflake (110.63s)
    --- PASS: TestSnowflake/TestAdditionalDriverInfo (0.29s)
    --- PASS: TestSnowflake/TestDecimalHighPrecision (27.19s)
    --- PASS: TestSnowflake/TestDescribeOnly (0.53s)
    --- PASS: TestSnowflake/TestEmptyResultSet (0.47s)
    --- PASS: TestSnowflake/TestIngestEmptyChunk (4.00s)
    --- PASS: TestSnowflake/TestIntDecimalLowPrecision (7.97s)
    --- SKIP: TestSnowflake/TestJwtPrivateKey (0.50s)
    --- PASS: TestSnowflake/TestMetadataGetObjectsColumnsXdbc (10.51s)
    --- PASS: TestSnowflake/TestMetadataOnlyQuery (1.56s)
    --- PASS: TestSnowflake/TestNewDatabaseGetSetOptions (0.30s)
    --- PASS: TestSnowflake/TestNonIntDecimalLowPrecision (9.56s)
    --- PASS: TestSnowflake/TestSqlIngestDate64Type (3.50s)
    --- PASS: TestSnowflake/TestSqlIngestHighPrecision (3.63s)
    --- PASS: TestSnowflake/TestSqlIngestListType (3.67s)
    --- PASS: TestSnowflake/TestSqlIngestLowPrecision (3.46s)
    --- PASS: TestSnowflake/TestSqlIngestMapType (3.21s)
    --- PASS: TestSnowflake/TestSqlIngestRecordAndStreamAreEquivalent (6.89s)
    --- PASS: TestSnowflake/TestSqlIngestRoundtripTypes (5.33s)
    --- PASS: TestSnowflake/TestSqlIngestStructType (4.11s)
    --- PASS: TestSnowflake/TestSqlIngestTimestamp (5.05s)
    --- PASS: TestSnowflake/TestSqlIngestTimestampTypes (4.17s)
    --- PASS: TestSnowflake/TestStatementEmptyResultSet (0.66s)
    --- PASS: TestSnowflake/TestTimestampSnow (0.79s)
    --- PASS: TestSnowflake/TestUseHighPrecision (2.36s)
=== RUN   TestJwtAuthenticationUnencryptedValue
    driver_test.go:1810: Cannot find the `SNOWFLAKE_TEST_PKCS8_VALUE` value
--- SKIP: TestJwtAuthenticationUnencryptedValue (0.00s)
=== RUN   TestJwtAuthenticationEncryptedValue
    driver_test.go:1826: Cannot find the `SNOWFLAKE_TEST_PKCS8_EN_VALUE` value
--- SKIP: TestJwtAuthenticationEncryptedValue (0.00s)
=== RUN   TestIngestCancelContext
--- PASS: TestIngestCancelContext (6.40s)
PASS
ok      github.com/apache/arrow-adbc/go/adbc/driver/snowflake   198.525s

@joellubi joellubi changed the title feat(go/adbc/driver/snowflake): Skip uploading empty files in bulk_ingestion feat(go/adbc/driver/snowflake): Keep track of all files copied and skip empty files in bulk_ingestion Aug 28, 2024
@joellubi joellubi marked this pull request as ready for review August 28, 2024 15:33
@github-actions github-actions bot added this to the ADBC Libraries 15 milestone Aug 28, 2024
Comment on lines 364 to 371
// write first record
bytesWritten, err := writeRecordToParquet(pqWriter, rec)
if err != nil {
return err
}
if targetSize > 0 && bytesWritten >= int64(targetSize) {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here as to why we're doing an entire separate writing of the first record instead of letting it be handled by the channel and split?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revisited this and cleaned it up a bit. Between the refactor and added comments it should be more clear what's going on here.

Basically I wanted to return early and avoid any of the parquet file setup if there were no records left in the channel. Initially I grabbed the first record in order to check, but then I needed to write that to the file before I could continue to range over the remaining records.

Same idea after the refactor, just less duplication and explicitly initializing the parquet writer on the first loop iteration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified this further. The parquet writer is always initialized, we just check the bytesWritten to see if we should discard the buffer or not.

Comment on lines 682 to 685
type fileSet struct {
mu sync.RWMutex
data map[string]struct{}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this better / worse than just using sync.Map?

sync.Map is generally best for primarily append workflows with few delete/removes, but for our workflows here I doubt it would make too much of a difference other than avoiding us having to maintain this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually started with sync.Map but changed to map + sync.RMutex because I thought it was causing a bug. Turned out to be something else and sync.Map works just fine.

I just pushed a commit. I left it wrapped in the fileSet struct because using a naked sync.Map and having to remember to use the basename everywhere + writing the Len() iteration inline feels a little awkward IMHO. Let me know what you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to a type definition type fileSet sync.Map.

@lidavidm lidavidm merged commit 84b96df into apache:main Aug 30, 2024
37 of 40 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

adbc_ingest for snowflake dropping rows when called repeatedly
3 participants