Skip to content

Commit

Permalink
feat(go/adbc/driver/snowflake): improve bulk ingestion speed (#1456)
Browse files Browse the repository at this point in the history
# What

- Replace Snowflake bulk ingestion with Parquet-based approach with
higher throughput and better type support
- Previously: INSERT bind parameters were uploaded to a CSV-based stage,
once per record batch
- Now: Parquet files written concurrently to stage independently of
record batch size. Parquet logical types are used to infer schema on
COPY.
- Tests to validate type support and consistency through Arrow ->
Parquet -> Snowflake -> Arrow roundtrip
- Improved type mapping between Arrow <-> Snowflake timestamps.
[TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz)
is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ,
which can lead to lossy roundtrips.
- Minor bugfix where Snowflake local timestamps with timezone set to UTC
were being interpreted as non-local.

# Why

- Implements #1327, which comes from improvement request #1322
- BindStream ingestion is significantly faster
- Arrow type support is improved

# Methodology

The general approach for ingestion is most clearly demonstrated by the
path taken when `stmt.Bind()` for a single record is used:
### IngestRecord
```mermaid
flowchart LR
    A(Record) --> B(Write Parquet)
    B --> C(Upload File)
    C --> D(Execute COPY)
    D --> E(Check Row Count)
```
The Arrow record is written to a Parquet file due to its logical type
support, compressibility, and native Snowflake support. The file is then
uploaded to a temporary Snowflake stage via PUT query, and then loaded
into the target table via COPY query. Once the COPY has finished, one
more query to check the resulting row count is dispatched to accurately
return the number of rows affected. This is used instead of counting the
Arrow rows written in case there are any undetected losses when
importing the uploaded file into Snowflake.

A similar approach is taken when ingesting an arbitrarily large stream
of records via `stmt.BindStream()`, but makes use of several
opportunities to parallelize the work involved at different stages:

### IngestStream
```mermaid
flowchart LR

    A(Read Records) --> B(Write Parquet)

    A --> C(Write Parquet)
    A --> D(Write Parquet)
    A --> E(Write Parquet)

    B --> J(Buffer Pool)
    C --> J
    D --> J
    E --> J

    J --> K(Upload File)
    J --> L(Upload File)

    K --> M(Finalize COPY)
    L --> M

    M --> N(Check Row Count)


    O(File Ready) --> P(Execute COPY)
    P --> O
```
The same steps are used, but the stream of records is now distributed
among a pool of Parquet writers. This step is inherently CPU-bound, so
it is desirable for it to scale independently with the availability of
logical cores for writing/compression. These Parquet files are written
to a buffer pool in memory to help decouple the upload stage from
writing, and so that a writer can start working on the next file _while_
the last file it wrote is being uploaded. Uploads from the buffer pool
also benefit from parallelism, but more so to maximize network
utilization by limiting idle time between uploads and amortizing
potential slowdown in any one upload.

Technically, only a single COPY command is required after the last file
is uploaded in order to load the Parquet files into the Snowflake table.
However, on many warehouses this operation takes as long or even longer
than the upload itself but can be made faster by paying for a larger
warehouse. Given the batched approach taken and that the COPY command is
idempotent, we can execute COPY repeatedly as files are uploaded to load
them into the table on an ongoing basis. These COPY queries are executed
asynchronously and listen for an upload-completed callback to ensure at
least one file will be loaded by the query (otherwise it will no-op so
this just prevents spamming Snowflake with a bunch of no-op COPYs).

Empirically, ingestion works reasonably well on an XS warehouse. COPY
speed is no longer a bottleneck with an S warehouse with high-speed home
internet, or on an M warehouse with same-region data center networking.

# Performance

Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m50s
- New: 14s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: Didn't attempt
- New: 1m16s

_This configuration is CPU bound, so I did another attempt with more
cores available..._
Now with GCP e2-standard-4 (4 vCPU, 16GB RAM)

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m17s
- New: 9.5s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: 1h47m
- New: 45s

# Considerations

- Snowflake
[guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques)
indicate that ingestion via CSV is the fastest. Experimentally, it does
appear to be true that a COPY query on staged CSV files executes much
faster than for Parquet files. However by distributing the COPY
workloads _in parallel to_ the batched file uploads, overall performance
is better with Parquet since it can be compressed _much_ more
efficiently allowing the upload to complete in less time and with fewer
bytes transferred than with CSV. Type support is also much better.
- Single-Record ingestion performance is slightly worse than the
previous INSERT-bind approach. As a rough idea, a record that previously
ingested in about 1.7s now ingests in about 2.5s. However, the new
approach does come with expanded type support and better consistency
with the streaming approach.
- An ingestion run that fails part-way through may leave the table with
partial results. Transaction semantics may be added in the future by
overriding the CopyConcurrency parameter to be 0, in which case only the
final COPY will execute.

# Additional Work

### Blocking
- ~Timestamps will roundtrip properly after Arrow
[GH-39466](apache/arrow#39466) is closed. A
test is included but skipped for now.~
- ~Date64 will roundtrip properly after Arrow
[GH-39456](apache/arrow#39456) is closed. A
test is included but skipped for now.~

### Non-Blocking
- Compression codec and level are included in `ingestOptions` but are
not configurable using `stmt.SetOption()`. It is trivial to add this,
but it would be nice to be able to use the currently internal
[CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399)
method to automatically pick up support for any other codecs added in
the future. Captured in #1473.
- List and Map types have some issues on ingestion. Snowflake returns
`SQL execution internal error` whenever repetition level is greater than
0. Still some more investigation to do here. This is non-blocking
because neither type was previously supported for ingestion.
- Context cancelation is supported for all goroutines and queries
executed as part of ingestion, _except_ for the PUT query (i.e. file
uploads). This issue is being tracked in gosnowflake
[1028](snowflakedb/gosnowflake#1028). In
practice, it likely takes just a few seconds for in-progress uploads to
complete and properly conclude cancelation. Once this issue is fixed,
the queries would be canceled in Snowflake, allowing the process to exit
faster and reduce unnecessary work.
- ~The code previously meant to map Snowflake types to Go types is no
longer used. It may still be useful for binding an Arrow record to an
arbitrary Update query, but `stmt.Prepare` should be implemented first
to follow ADBC spec for binding parameters.~
  • Loading branch information
joellubi authored Jan 26, 2024
1 parent c6472ab commit b57e19b
Show file tree
Hide file tree
Showing 8 changed files with 1,768 additions and 265 deletions.
90 changes: 60 additions & 30 deletions docs/source/driver/snowflake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,37 +259,61 @@ Bulk Ingestion
Bulk ingestion is supported. The mapping from Arrow types to Snowflake types
is provided below.

Partitioned Result Sets
-----------------------
Bulk ingestion is implemented by writing Arrow data to Parquet file(s) and uploading (via PUT) to a temporary internal stage.
One or more COPY queries are executed in order to load the data into the target table.

Partitioned result sets are not currently supported.
In order for the driver to leverage this temporary stage, the user must have
the `CREATE STAGE <https://docs.snowflake.com/en/sql-reference/sql/create-stage>` privilege on the schema. In addition,
the current database and schema for the session must be set. If these are not set, the ``CREATE TEMPORARY STAGE`` command
executed by the driver can fail with the following error:

Performance
-----------
.. code-block:: sql
Formal benchmarking is forthcoming. Snowflake does provide an Arrow native
format for requesting results, but bulk ingestion is still currently executed
using the REST API. As described in the `Snowflake Documentation
<https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Batch_Inserts_and_Binding_Parameters>`
the driver will potentially attempt to improve performance by streaming the data
(without creating files on the local machine) to a temporary stage for ingestion
if the number of values exceeds some threshold.
CREATE TEMPORARY STAGE ADBC$BIND FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE BINARY_AS_TEXT = FALSE)
CANNOT perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA' or use a qualified name.
In order for the driver to leverage this temporary stage, the user must have
the ``CREATE STAGE`` privilege on the schema. If the user does not have this
privilege, the driver will fall back to sending the data with the query
to the snowflake database.
The following informal benchmark demonstrates expected performance using default ingestion settings::

In addition, the current database and schema for the session must be set. If
these are not set, the ``CREATE TEMPORARY STAGE`` command executed by the driver
can fail with the following error:
Running on GCP e2-standard-4 (4 vCPU, 16GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

.. code-block:: sql
TPC-H Lineitem (16 Columns):
Scale Factor 1 (6M Rows): 9.5s
Scale Factor 10 (60M Rows): 45s

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
CANNOT perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA' or use a qualified name.
The default settings for ingestion should be well balanced for many real-world configurations. If required, performance
and resource usage may be tuned with the following options on the :cpp:class:`AdbcStatement` object:

``adbc.snowflake.rpc.ingest_writer_concurrency``
Number of Parquet files to write in parallel. Default attempts to maximize workers based on logical cores detected,
but may need to be adjusted if running in a constrained environment. If set to 0, default value is used. Cannot be negative.

``adbc.snowflake.rpc.ingest_upload_concurrency``
Number of Parquet files to upload in parallel. Greater concurrency can smooth out TCP congestion and help make
use of available network bandwith, but will increase memory utilization. Default is 8. If set to 0, default value is used.
Cannot be negative.

``adbc.snowflake.rpc.ingest_copy_concurrency``
Maximum number of COPY operations to run concurrently. Bulk ingestion performance is optimized by executing COPY
queries as files are still being uploaded. Snowflake COPY speed scales with warehouse size, so smaller warehouses
may benefit from setting this value higher to ensure long-running COPY queries do not block newly uploaded files
from being loaded. Default is 4. If set to 0, only a single COPY query will be executed as part of ingestion,
once all files have finished uploading. Cannot be negative.

``adbc.snowflake.rpc.ingest_target_file_size``
Approximate size of Parquet files written during ingestion. Actual size will be slightly larger, depending on
size of footer/metadata. Default is 10 MB. If set to 0, file size has no limit. Cannot be negative.

Partitioned Result Sets
-----------------------

Partitioned result sets are not currently supported.

Performance
-----------

In addition, results are potentially fetched in parallel from multiple endpoints.
When querying Snowflake data, results are potentially fetched in parallel from multiple endpoints.
A limited number of batches are queued per endpoint, though data is always
returned to the client in the order of the endpoints.

Expand Down Expand Up @@ -490,16 +514,19 @@ indicated are done to ensure consistency of the stream of record batches.
- Notes

* - integral types
- int64
- All integral types in Snowflake are stored as 64-bit integers.
- number(38, 0)
- All integral types in Snowflake are stored as numbers for which neither
precision nor scale can be specified.

* - float/double
- float64
- Snowflake does not distinguish between float or double. Both are 64-bit values.

* - decimal/numeric
- int64/float64
- If scale == 0, then int64 is used, else float64.
- numeric
- Snowflake will respect the precision/scale of the Arrow type. See the
``adbc.snowflake.sql.client_option.use_high_precision`` for exceptions to this
behavior.

* - time
- time64[ns]
Expand All @@ -513,8 +540,9 @@ indicated are done to ensure consistency of the stream of record batches.
| timestamp_ntz
| timestamp_tz
- timestamp[ns]
- Local time zone will be used. No timezone will be specified in
the Arrow type. Values will be converted to UTC.
- Local time zone will be used, except for timestamp_ntz which is not an instant.
In this case no timezone will be present in the type. Physical values will be
UTC-normalized.

* - | variant
| object
Expand All @@ -523,7 +551,9 @@ indicated are done to ensure consistency of the stream of record batches.
- Snowflake does not provide information about nested
types. Values will be strings in a format similar to JSON that
can be parsed. The Arrow type will contain a metadata key
``logicalType`` with the Snowflake field type.
``logicalType`` with the Snowflake field type. Arrow Struct and
Map types will be stored as objects when ingested. List types will
be stored as arrays.

* - | geography
| geometry
Expand Down
Loading

0 comments on commit b57e19b

Please sign in to comment.