diff --git a/docs/s3.md b/docs/s3.md index f7f95556..814e7e4c 100644 --- a/docs/s3.md +++ b/docs/s3.md @@ -1,7 +1,8 @@ -[AWS S3](https://aws.amazon.com/s3/) is an object storage service offering industry-leading scalability, data availability, security, and performance. The S3 wrapper is under development. It is read-only and supports 2 file formats: +[AWS S3](https://aws.amazon.com/s3/) is an object storage service offering industry-leading scalability, data availability, security, and performance. The S3 wrapper is under development. It is read-only and supports below file formats: 1. CSV - with or without header line 2. [JSON Lines](https://jsonlines.org/) +3. [Parquet](https://parquet.apache.org/) The S3 FDW also supports below compression algorithms: @@ -10,7 +11,27 @@ The S3 FDW also supports below compression algorithms: 3. xz 4. zlib -**Note: currently all columns in S3 files must be defined in the foreign table and their types must be `text` type** +**Note for CSV and JSONL files: currently all columns in S3 files must be defined in the foreign table and their types must be `text` type** + +**Note for Parquet files: the whole Parquet file will be loaded into local memory if it is compressed, so keep its size small** + +### Supported Data Types For Parquet File + +The S3 FDW uses Parquet file data types from [arrow_array::types](https://docs.rs/arrow-array/41.0.0/arrow_array/types/index.html), below are their mappings to Postgres data types. + +| Postgres Type | Parquet Type | +| ------------------ | ------------------------ | +| boolean | BooleanType | +| char | Int8Type | +| smallint | Int16Type | +| real | Float32Type | +| integer | Int32Type | +| double precision | Float64Type | +| bigint | Int64Type | +| numeric | Float64Type | +| text | ByteArrayType | +| date | Date64Type | +| timestamp | TimestampNanosecondType | ### Wrapper To get started with the S3 wrapper, create a foreign data wrapper specifying `handler` and `validator` as below. @@ -90,14 +111,17 @@ create server s3_server S3 wrapper is implemented with [ELT](https://hevodata.com/learn/etl-vs-elt/) approach, so the data transformation is encouraged to be performed locally after data is extracted from remote data source. -One file in S3 corresponds a foreign table in Postgres, all columns must be present in the foreign table and type must be `text`. You can do custom transformations, like type conversion, by creating a view on top of the foreign table or using a subquery. +One file in S3 corresponds a foreign table in Postgres. For CSV and JSONL file, all columns must be present in the foreign table and type must be `text`. You can do custom transformations, like type conversion, by creating a view on top of the foreign table or using a subquery. + +For Parquet file, no need to define all columns in the foreign table but column names must match between Parquet file and its foreign table. + #### Foreign Table Options The full list of foreign table options are below: - `uri` - S3 URI, required. For example, `s3://bucket/s3_table.csv` -- `format` - File format, required. `csv` or `jsonl` +- `format` - File format, required. `csv`, `jsonl`, or `parquet` - `has_header` - If the CSV file has header, optional. `true` or `false`, default is `false` - `compress` - Compression algorithm, optional. One of `gzip`, `bzip2`, `xz`, `zlib`, default is no compression @@ -148,5 +172,36 @@ create foreign table s3_table_csv_gzip ( has_header 'true', compress 'gzip' ); + +-- Parquet file, no compression +create foreign table s3_table_parquet ( + id integer, + bool_col boolean, + bigint_col bigint, + float_col real, + date_string_col text, + timestamp_col timestamp +) + server s3_server + options ( + uri 's3://bucket/s3_table.parquet', + format 'parquet' + ); + +-- GZIP compressed Parquet file +create foreign table s3_table_parquet_gz ( + id integer, + bool_col boolean, + bigint_col bigint, + float_col real, + date_string_col text, + timestamp_col timestamp +) + server s3_server + options ( + uri 's3://bucket/s3_table.parquet.gz', + format 'parquet', + compress 'gzip' + ); ``` diff --git a/wrappers/Cargo.lock b/wrappers/Cargo.lock index 4289b31a..9707c103 100644 --- a/wrappers/Cargo.lock +++ b/wrappers/Cargo.lock @@ -8,6 +8,19 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.2.9", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "1.0.1" @@ -17,6 +30,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -39,14 +67,90 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" [[package]] -name = "assert-json-diff" -version = "1.1.0" +name = "arrow-array" +version = "41.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4259cbe96513d2f1073027a259fc2ca917feb3026a5a8d984e3628e490255cc0" +checksum = "6049e031521c4e7789b7530ea5991112c0a375430094191f3b74bdf37517c9a9" dependencies = [ - "extend", - "serde", - "serde_json", + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.2.1", + "hashbrown 0.13.2", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a83450b94b9fe018b65ba268415aaab78757636f68b7f37b6bc1f2a3888af0a0" +dependencies = [ + "half 2.2.1", + "num", +] + +[[package]] +name = "arrow-cast" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "249198411254530414805f77e88e1587b0914735ea180f906506905721f7a44a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-data" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d48dcbed83d741d4af712af17f6d952972b8f6491b24ee2415243a7e37c6438" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.2.1", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea8d7b138c5414aeef5dd08abacf362f87ed9b1168ea38d60a6f67590c3f7d99" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-schema" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b71d8d68d0bc2e648e4e395896dc518be8b90c5f0f763c59083187c3d46184b" + +[[package]] +name = "arrow-select" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470cb8610bdfda56554a436febd4e457e506f3c42e01e545a1ea7ecf2a4c8823" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", ] [[package]] @@ -145,9 +249,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3d1e2a1f1ab3ac6c4b884e37413eaa03eb9d901e4fc68ee8f5c1d49721680e" +checksum = "bcdcf0d683fe9c23d32cf5b53c9918ea0a500375a9fb20109802552658e576c9" dependencies = [ "aws-credential-types", "aws-http", @@ -161,6 +265,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", + "fastrand", "hex", "http", "hyper", @@ -174,12 +279,13 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb0696a0523a39a19087747e4dafda0362dc867531e3d72a3f195564c84e5e08" +checksum = "1fcdb2f7acbc076ff5ad05e7864bdb191ca70a6fd07668dc3a1a8bcd051de5ae" dependencies = [ "aws-smithy-async", "aws-smithy-types", + "fastrand", "tokio", "tracing", "zeroize", @@ -187,9 +293,9 @@ dependencies = [ [[package]] name = "aws-endpoint" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80a4f935ab6a1919fbfd6102a80c4fccd9ff5f47f94ba154074afe1051903261" +checksum = "8cce1c41a6cfaa726adee9ebb9a56fcd2bbfd8be49fd8a04c5e20fd968330b04" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -201,9 +307,9 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82976ca4e426ee9ca3ffcf919d9b2c8d14d0cd80d43cc02173737a8f07f28d4d" +checksum = "aadbc44e7a8f3e71c8b374e03ecd972869eb91dd2bc89ed018954a52ba84bc44" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -220,9 +326,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "0.24.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1533be023eeac69668eb718b1c48af7bd5e26305ed770553d2877ab1f7507b68" +checksum = "fba197193cbb4bcb6aad8d99796b2291f36fa89562ded5d4501363055b0de89f" dependencies = [ "aws-credential-types", "aws-endpoint", @@ -240,8 +346,6 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "bytes-utils", - "fastrand", "http", "http-body", "once_cell", @@ -255,9 +359,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "0.24.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0119bacf0c42f587506769390983223ba834e605f049babe514b2bd646dbb2" +checksum = "c8b812340d86d4a766b2ca73f740dfd47a97c2dff0c06c8517a16d88241957e4" dependencies = [ "aws-credential-types", "aws-endpoint", @@ -275,13 +379,14 @@ dependencies = [ "regex", "tokio-stream", "tower", + "tracing", ] [[package]] name = "aws-sdk-sts" -version = "0.24.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "270b6a33969ebfcb193512fbd5e8ee5306888ad6c6d5d775cdbfb2d50d94de26" +checksum = "265fac131fbfc188e5c3d96652ea90ecc676a934e3174eaaee523c6cec040b3b" dependencies = [ "aws-credential-types", "aws-endpoint", @@ -305,9 +410,9 @@ dependencies = [ [[package]] name = "aws-sig-auth" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660a02a98ab1af83bd8d714afbab2d502ba9b18c49e7e4cddd6bf8837ff778cb" +checksum = "3b94acb10af0c879ecd5c7bdf51cda6679a0a4f4643ce630905a77673bfa3c61" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -320,9 +425,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "0.54.2" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86529e7b64d902efea8fff52c1b2529368d04f90305cf632729e3713f6b57dc0" +checksum = "9d2ce6f507be68e968a33485ced670111d1cbad161ddbbab1e313c03d37d8f4c" dependencies = [ "aws-smithy-eventstream", "aws-smithy-http", @@ -341,9 +446,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63c712a28a4f2f2139759235c08bf98aca99d4fdf1b13c78c5f95613df0a5db9" +checksum = "13bda3996044c202d75b91afeb11a9afae9db9a721c6a7a427410018e286b880" dependencies = [ "futures-util", "pin-project-lite", @@ -353,9 +458,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3875fb4b28606a5368a048016a28c15707f2b21238d5b2e4a23198f590e92c4" +checksum = "07ed8b96d95402f3f6b8b57eb4e0e45ee365f78b1a924faf20ff6e97abf1eae6" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -374,14 +479,13 @@ dependencies = [ [[package]] name = "aws-smithy-client" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "104ca17f56cde00a10207169697dfe9c6810db339d52fb352707e64875b30a44" +checksum = "0a86aa6e21e86c4252ad6a0e3e74da9617295d8d6e374d552be7d3059c41cedd" dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-tower", - "aws-smithy-protocol-test", "aws-smithy-types", "bytes", "fastrand", @@ -391,7 +495,7 @@ dependencies = [ "hyper-rustls 0.23.2", "lazy_static", "pin-project-lite", - "serde", + "rustls 0.20.8", "tokio", "tower", "tracing", @@ -399,9 +503,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac250d8c0e42af0097a6837ffc5a6fb9f8ba4107bb53124c047c91bc2a58878f" +checksum = "460c8da5110835e3d9a717c61f5556b20d03c32a1dec57f8fc559b360f733bb8" dependencies = [ "aws-smithy-types", "bytes", @@ -410,9 +514,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "873f316f1833add0d3aa54ed1b0cd252ddd88c792a0cf839886400099971e844" +checksum = "2b3b693869133551f135e1f2c77cb0b8277d9e3e17feaf2213f735857c4f0d28" dependencies = [ "aws-smithy-eventstream", "aws-smithy-types", @@ -433,9 +537,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-tower" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38231d3f5dac9ac7976f44e12803add1385119ffca9e5f050d8e980733d164" +checksum = "3ae4f6c5798a247fac98a867698197d9ac22643596dc3777f0c76b91917616b9" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -449,33 +553,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bd83ff2b79e9f729746fcc8ad798676b68fe6ea72986571569a5306a277a182" +checksum = "23f9f42fbfa96d095194a632fbac19f60077748eba536eb0b9fecc28659807f8" dependencies = [ "aws-smithy-types", ] -[[package]] -name = "aws-smithy-protocol-test" -version = "0.54.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d1c9bcb35ce11055ec128dab2c66a7ed47e2dfff99883e32c21a1ab6d6bee6" -dependencies = [ - "assert-json-diff 1.1.0", - "http", - "pretty_assertions", - "regex", - "roxmltree", - "serde_json", - "thiserror", -] - [[package]] name = "aws-smithy-query" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2f0445dafe9d2cd50b44339ae3c3ed46549aad8ac696c52ad660b3e7ae8682b" +checksum = "98819eb0b04020a1c791903533b638534ae6c12e2aceda3e6e6fba015608d51d" dependencies = [ "aws-smithy-types", "urlencoding", @@ -483,9 +572,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8161232eda10290f5136610a1eb9de56aceaccd70c963a26a260af20ac24794f" +checksum = "16a3d0bf4f324f4ef9793b86a1701d9700fbcdbd12a846da45eed104c634c6e8" dependencies = [ "base64-simd", "itoa", @@ -496,18 +585,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.54.4" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343ffe9a9bb3f542675f4df0e0d5933513d6ad038ca3907ad1767ba690a99684" +checksum = "b1b9d12875731bd07e767be7baad95700c3137b56730ec9ddeedb52a5e5ca63b" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "0.54.1" +version = "0.55.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8f15b34253b68cde08e39b0627cc6101bcca64351229484b4743392c035d057" +checksum = "6dd209616cc8d7bfb82f87811a5c655dc97537f592689b18743bddf5dc5c4829" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -587,6 +676,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "3.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -651,6 +761,9 @@ name = "cc" version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -821,6 +934,28 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-random" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" +dependencies = [ + "const-random-macro", + "proc-macro-hack", +] + +[[package]] +name = "const-random-macro" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" +dependencies = [ + "getrandom 0.2.9", + "once_cell", + "proc-macro-hack", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.6.0" @@ -946,6 +1081,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -977,16 +1118,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn 1.0.109", -] - [[package]] name = "deadpool" version = "0.9.5" @@ -1006,12 +1137,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" -[[package]] -name = "diff" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" - [[package]] name = "digest" version = "0.10.7" @@ -1117,18 +1242,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "extend" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "eyre" version = "0.6.8" @@ -1160,6 +1273,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "23.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +dependencies = [ + "bitflags", + "rustc_version 0.4.0", +] + [[package]] name = "flate2" version = "1.0.26" @@ -1402,6 +1525,16 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "half" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b4af3693f1b705df946e9fe5631932443781d0aabb423b62fcd4d73f6d2fd0" +dependencies = [ + "crunchy", + "num-traits", +] + [[package]] name = "hash32" version = "0.2.1" @@ -1417,6 +1550,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "heapless" version = "0.7.16" @@ -1645,7 +1784,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -1663,6 +1802,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-lifetimes" version = "1.0.10" @@ -1695,6 +1840,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +[[package]] +name = "jobserver" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.63" @@ -1716,6 +1870,70 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.144" @@ -1732,6 +1950,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "libm" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -1899,6 +2123,40 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" +dependencies = [ + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -1909,6 +2167,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -1916,6 +2197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1988,12 +2270,12 @@ dependencies = [ ] [[package]] -name = "output_vt100" -version = "0.1.3" +name = "ordered-float" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628223faebab4e3e40667ee0b2336d34a5b960ff60ea743ddfdbcf7770bcfb66" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" dependencies = [ - "winapi", + "num-traits", ] [[package]] @@ -2037,6 +2319,39 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "parquet" +version = "41.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6880c32d81884ac4441d9f4b027df8561be23b54f3ac1e62086fa42753dd3faa" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.1", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "hashbrown 0.13.2", + "lz4", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", +] + [[package]] name = "parse-zoneinfo" version = "0.3.0" @@ -2046,6 +2361,12 @@ dependencies = [ "regex", ] +[[package]] +name = "paste" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" + [[package]] name = "pathsearch" version = "0.2.0" @@ -2331,40 +2652,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] -name = "pretty_assertions" -version = "1.3.0" +name = "proc-macro-hack" +version = "0.5.20+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a25e9bcb20aa780fd0bb16b72403a9064d6b3f22f026946029acb941a50af755" -dependencies = [ - "ctor", - "diff", - "output_vt100", - "yansi", -] - -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] +checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" @@ -2640,15 +2931,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "roxmltree" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b" -dependencies = [ - "xmlparser", -] - [[package]] name = "rustc-hash" version = "1.1.0" @@ -2847,7 +3129,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" dependencies = [ - "half", + "half 1.8.2", "serde", ] @@ -2963,6 +3245,12 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +[[package]] +name = "snap" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" + [[package]] name = "socket2" version = "0.4.9" @@ -3010,6 +3298,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.2" @@ -3130,6 +3424,17 @@ dependencies = [ "syn 2.0.16", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + [[package]] name = "time" version = "0.1.45" @@ -3170,6 +3475,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3396,6 +3710,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typenum" version = "1.16.0" @@ -3847,7 +4171,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd7b0b5b253ebc0240d6aac6dd671c495c467420577bf634d3064ae7e6fa2b4c" dependencies = [ - "assert-json-diff 2.0.2", + "assert-json-diff", "async-trait", "base64 0.21.1", "deadpool", @@ -3867,6 +4191,7 @@ dependencies = [ name = "wrappers" version = "0.1.11" dependencies = [ + "arrow-array", "async-compression", "aws-config", "aws-sdk-s3", @@ -3878,6 +4203,7 @@ dependencies = [ "futures", "gcp-bigquery-client", "http", + "parquet", "pgrx", "pgrx-tests", "regex", @@ -3919,12 +4245,6 @@ dependencies = [ "lzma-sys", ] -[[package]] -name = "yansi" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" - [[package]] name = "yup-oauth2" version = "8.3.0" @@ -3957,3 +4277,33 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" + +[[package]] +name = "zstd" +version = "0.12.3+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "6.0.5+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.8+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +dependencies = [ + "cc", + "libc", + "pkg-config", +] diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index f4c97610..56234dbe 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -21,7 +21,11 @@ bigquery_fdw = ["gcp-bigquery-client", "time", "serde_json", "serde", "wiremock" clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "time", "regex"] stripe_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "time"] firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "yup-oauth2", "regex", "time"] -s3_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "tokio", "tokio-util", "csv", "async-compression", "serde_json", "http"] +s3_fdw = [ + "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", + "tokio", "tokio-util", "csv", "async-compression", "serde_json", + "http", "parquet", "futures", "arrow-array", "chrono", "time" +] # TODO: audit dependencies airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url"] @@ -61,13 +65,15 @@ regex = { version = "1", optional = true } url = { version = "2.3", optional = true } # for s3_fdw -aws-config = { version = "0.54.1", optional = true } -aws-sdk-s3 = { version = "0.24.0", optional = true } +aws-config = { version = "0.55.3", optional = true } +aws-sdk-s3 = { version = "0.28.0", optional = true } csv = { version = "1.2", optional = true } tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", optional = true } async-compression = { version = "0.3.15", features = ["tokio", "bzip2", "gzip", "xz", "zlib"], optional = true } http = { version = "0.2", optional = true } +parquet = { version = "41.0.0", features = ["async"], optional = true } +arrow-array = { version = "41.0.0", optional = true } [dev-dependencies] pgrx-tests = "=0.8.3" diff --git a/wrappers/dockerfiles/s3/init/01-init-data.sh b/wrappers/dockerfiles/s3/init/01-init-data.sh index aa857e7b..7021e563 100755 --- a/wrappers/dockerfiles/s3/init/01-init-data.sh +++ b/wrappers/dockerfiles/s3/init/01-init-data.sh @@ -10,4 +10,5 @@ awslocal s3 cp /data/test_data.csv s3://test/test_data.csv awslocal s3 cp /data/test_data.csv.gz s3://test/test_data.csv.gz awslocal s3 cp /data/test_data.jsonl s3://test/test_data.jsonl awslocal s3 cp /data/test_data.jsonl.bz2 s3://test/test_data.jsonl.bz2 - +awslocal s3 cp /data/test_data.parquet s3://test/test_data.parquet +awslocal s3 cp /data/test_data.parquet.gz s3://test/test_data.parquet.gz diff --git a/wrappers/dockerfiles/s3/test_data/test_data.parquet b/wrappers/dockerfiles/s3/test_data/test_data.parquet new file mode 100644 index 00000000..a63f5dca Binary files /dev/null and b/wrappers/dockerfiles/s3/test_data/test_data.parquet differ diff --git a/wrappers/dockerfiles/s3/test_data/test_data.parquet.gz b/wrappers/dockerfiles/s3/test_data/test_data.parquet.gz new file mode 100644 index 00000000..a53a1cfc Binary files /dev/null and b/wrappers/dockerfiles/s3/test_data/test_data.parquet.gz differ diff --git a/wrappers/src/fdw/s3_fdw/README.md b/wrappers/src/fdw/s3_fdw/README.md index ee4fb31a..5d6d5773 100644 --- a/wrappers/src/fdw/s3_fdw/README.md +++ b/wrappers/src/fdw/s3_fdw/README.md @@ -10,4 +10,5 @@ This is a foreign data wrapper for [AWS S3](https://aws.amazon.com/s3/). It is d | Version | Date | Notes | | ------- | ---------- | ---------------------------------------------------- | +| 0.1.1 | 2023-06-05 | Added Parquet file support | | 0.1.0 | 2023-03-01 | Initial version | diff --git a/wrappers/src/fdw/s3_fdw/mod.rs b/wrappers/src/fdw/s3_fdw/mod.rs index 25a1a415..48bd6534 100644 --- a/wrappers/src/fdw/s3_fdw/mod.rs +++ b/wrappers/src/fdw/s3_fdw/mod.rs @@ -1,3 +1,4 @@ #![allow(clippy::module_inception)] +mod parquet; mod s3_fdw; mod tests; diff --git a/wrappers/src/fdw/s3_fdw/parquet.rs b/wrappers/src/fdw/s3_fdw/parquet.rs new file mode 100644 index 00000000..e8eb2d10 --- /dev/null +++ b/wrappers/src/fdw/s3_fdw/parquet.rs @@ -0,0 +1,357 @@ +use arrow_array::{array, Array, RecordBatch}; +use aws_sdk_s3 as s3; +use chrono::NaiveDate; +use futures::TryStreamExt; +use parquet::arrow::async_reader::{ + AsyncFileReader, ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder, +}; +use parquet::arrow::ProjectionMask; +use pgrx::pg_sys; +use pgrx::prelude::PgSqlErrorCode; +use std::cmp::min; +use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use time::OffsetDateTime; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; +use tokio::runtime::Handle; + +use supabase_wrappers::prelude::*; + +// convert an error to IO error +#[inline] +fn to_io_error(err: impl std::error::Error) -> IoError { + IoError::new(ErrorKind::Other, err.to_string()) +} + +// async reader for a single S3 parquet file +pub(super) struct S3ParquetReader { + client: s3::Client, + bucket: String, + object: String, + object_size: Option, + pos: u64, +} + +impl S3ParquetReader { + fn new(client: &s3::Client, bucket: &str, object: &str) -> Self { + S3ParquetReader { + client: client.clone(), + bucket: bucket.to_owned(), + object: object.to_owned(), + object_size: None, + pos: 0, + } + } + + fn get_object_size(&mut self) -> IoResult { + if let Some(object_size) = self.object_size { + return Ok(object_size); + } + + // wait on current thread to get object metadata + futures::executor::block_on( + self.client + .get_object_attributes() + .bucket(&self.bucket) + .key(&self.object) + .object_attributes(s3::types::ObjectAttributes::ObjectSize) + .send(), + ) + .map_err(to_io_error) + .map(|output| { + let object_size = output.object_size() as u64; + self.object_size = Some(object_size); + object_size + }) + } +} + +impl AsyncRead for S3ParquetReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // ensure object size is already available + let object_size = match self.get_object_size() { + Ok(size) => size, + Err(err) => { + return Poll::Ready(Err(err)); + } + }; + + // calculate request range + let object_remaining = object_size - self.pos; + let remaining = min(buf.remaining() as u64, object_remaining); + let range = format!("bytes={}-{}", self.pos, self.pos + remaining - 1); + + // wait on current thread to get object contents + match futures::executor::block_on( + self.client + .get_object() + .bucket(&self.bucket) + .key(&self.object) + .range(range) + .send(), + ) { + Ok(output) => { + let mut rdr = output.body.into_async_read(); + AsyncRead::poll_read(Pin::new(&mut rdr), cx, buf) + } + Err(err) => Poll::Ready(Err(to_io_error(err))), + } + } +} + +impl AsyncSeek for S3ParquetReader { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> IoResult<()> { + let object_size = self.get_object_size()?; + match position { + SeekFrom::Start(pos) => { + self.pos = pos; + } + SeekFrom::End(pos) => { + self.pos = (object_size as i64 + pos) as u64; + } + SeekFrom::Current(pos) => { + self.pos = (self.pos as i64 + pos) as u64; + } + } + + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(self.pos)) + } +} + +// S3 parquet file read manager +#[derive(Default)] +pub(super) struct S3Parquet { + // parquet record batch stream reader + stream: Option>>, + + // a record batch + batch: Option, + batch_idx: usize, +} + +impl S3Parquet { + // open batch stream from local buffer + pub(super) async fn open_local_stream(&mut self, buf: Vec) { + let cursor: Box = Box::new(Cursor::new(buf)); + let builder = ParquetRecordBatchStreamBuilder::new(cursor).await.unwrap(); + let stream = builder.build().unwrap(); + self.stream = Some(stream); + } + + // open async record batch stream + // + // This is done by spawning a thread to create builder and then get read stream + // Note: this function should be called on a tokio runtime executor thread + pub(super) async fn open_async_stream( + &mut self, + client: &s3::Client, + bucket: &str, + object: &str, + tgt_cols: &[Column], + ) { + let handle = Handle::current(); + let rdr = S3ParquetReader::new(client, bucket, object); + + let task = handle + .spawn_blocking(move || { + // we need to create another thread and wait on it to create builder + let handle = Handle::current(); + let task = handle.spawn_blocking(move || { + let boxed_rdr: Box = Box::new(rdr); + Handle::current().block_on(ParquetRecordBatchStreamBuilder::new(boxed_rdr)) + }); + handle.block_on(task) + }) + .await; + + let stream = task + .expect("create parquet batch stream builder failed") + .expect("create parquet batch stream builder failed") + .and_then(|builder| { + // get parquet file metadata + let file_metadata = builder.metadata().file_metadata(); + let schema = file_metadata.schema_descr(); + let cols = schema.columns(); + + // find target column indexes in parquest columns + let project_indexes = tgt_cols + .iter() + .map(|tgt_col| { + cols.iter() + .position(|col| col.name() == tgt_col.name) + .unwrap_or_else(|| { + panic!("column '{}' not found in parquet file", tgt_col.name) + }) + }) + .collect::>(); + + // set up projections for the builder + let mask = ProjectionMask::roots(schema, project_indexes); + builder.with_projection(mask).build() + }) + .map_err(to_io_error) + .unwrap(); + + self.stream = Some(stream); + self.batch = None; + self.batch_idx = 0; + } + + // refill record batch + pub(super) async fn refill(&mut self) -> Option<()> { + // if there are still records in the batch + if let Some(batch) = &self.batch { + if self.batch_idx < batch.num_rows() { + return Some(()); + } + } + + // otherwise, read one moe batch + if let Some(ref mut stream) = &mut self.stream { + match stream.try_next().await { + Ok(result) => { + return result.map(|batch| { + self.batch = Some(batch); + self.batch_idx = 0; + }) + } + Err(err) => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!("read parquet record batch failed: {}", err), + ); + return None; + } + } + } + + None + } + + // read one row from record batch + pub(super) fn read_into_row(&mut self, row: &mut Row, tgt_cols: &Vec) -> Option<()> { + if let Some(batch) = &self.batch { + for tgt_col in tgt_cols { + let col = batch + .column_by_name(&tgt_col.name) + .unwrap_or_else(|| panic!("column {} not found in parquet file", tgt_col.name)); + + macro_rules! col_to_cell { + ($array_type:ident, $cell_type:ident) => {{ + let arr = col + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!("column '{}' data type not match", tgt_col.name) + }); + if arr.is_null(self.batch_idx) { + None + } else { + Some(Cell::$cell_type(arr.value(self.batch_idx))) + } + }}; + } + + let cell = match tgt_col.type_oid { + pg_sys::BOOLOID => col_to_cell!(BooleanArray, Bool), + pg_sys::CHAROID => col_to_cell!(Int8Array, I8), + pg_sys::INT2OID => col_to_cell!(Int16Array, I16), + pg_sys::FLOAT4OID => col_to_cell!(Float32Array, F32), + pg_sys::INT4OID => col_to_cell!(Int32Array, I32), + pg_sys::FLOAT8OID => col_to_cell!(Float64Array, F64), + pg_sys::INT8OID => col_to_cell!(Int64Array, I64), + pg_sys::NUMERICOID => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!("column '{}' data type not match", tgt_col.name) + }); + if arr.is_null(self.batch_idx) { + None + } else { + let value = arr.value(self.batch_idx); + let num = pgrx::AnyNumeric::try_from(value).unwrap(); + Some(Cell::Numeric(num)) + } + } + pg_sys::TEXTOID => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!("column '{}' data type not match", tgt_col.name) + }); + if arr.is_null(self.batch_idx) { + None + } else { + let s = String::from_utf8_lossy(arr.value(self.batch_idx)); + Some(Cell::String(s.to_string())) + } + } + pg_sys::DATEOID => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!("column '{}' data type not match", tgt_col.name) + }); + if arr.is_null(self.batch_idx) { + None + } else { + arr.value_as_date(self.batch_idx).map(|dt| { + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let days_epoch = dt.signed_duration_since(epoch).num_days() as i32; + let dt = pgrx::Date::from_pg_epoch_days( + days_epoch + pg_sys::UNIX_EPOCH_JDATE as i32 + - pg_sys::POSTGRES_EPOCH_JDATE as i32, + ); + Cell::Date(dt) + }) + } + } + pg_sys::TIMESTAMPOID => { + let arr = col + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!("column '{}' data type not match", tgt_col.name) + }); + if arr.is_null(self.batch_idx) { + None + } else { + arr.value_as_datetime(self.batch_idx).map(|ts| { + let dt = OffsetDateTime::from_unix_timestamp_nanos( + ts.timestamp_nanos() as i128, + ) + .unwrap(); + let ts = pgrx::Timestamp::try_from(dt).unwrap(); + Cell::Timestamp(ts) + }) + } + } + _ => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!("column '{}' data type not supported", tgt_col.name), + ); + None + } + }; + row.push(&tgt_col.name, cell); + } + self.batch_idx += 1; + return Some(()); + } + None + } +} diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index 1e1ecc31..c2b6d83c 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -8,19 +8,21 @@ use std::collections::{HashMap, VecDeque}; use std::env; use std::io::Cursor; use std::pin::Pin; -use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; +use super::parquet::*; use supabase_wrappers::prelude::*; -// record parser for S3 text file +// record parser for a S3 file enum Parser { Csv(csv::Reader>>), // JSON lines text file format: https://jsonlines.org/ JsonLine(VecDeque), + Parquet(S3Parquet), } #[wrappers_fdw( - version = "0.1.0", + version = "0.1.1", author = "Supabase", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/s3_fdw" )] @@ -30,6 +32,8 @@ pub(crate) struct S3Fdw { rdr: Option>>>, parser: Parser, tgt_cols: Vec, + + // local string buffer for CSV and JSONL buf: String, } @@ -103,6 +107,7 @@ impl S3Fdw { } } } + _ => unreachable!(), } Some(()) @@ -111,8 +116,9 @@ impl S3Fdw { impl ForeignDataWrapper for S3Fdw { fn new(options: &HashMap) -> Self { + let rt = tokio::runtime::Runtime::new().unwrap(); let mut ret = S3Fdw { - rt: create_async_runtime(), + rt, client: None, rdr: None, parser: Parser::JsonLine(VecDeque::new()), @@ -219,87 +225,119 @@ impl ForeignDataWrapper for S3Fdw { return; }; - // initialise parser according to format option - if let Some(format) = require_option("format", options) { - // create dummy parser - match format.as_str() { - "csv" => self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))), - "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!( - "invalid format option: {}, it can only be 'csv' or 'jsonl'", - format - ), - ); - return; - } - } - } else { - return; - }; - let has_header: bool = options.get("has_header") == Some(&"true".to_string()); + self.tgt_cols = columns.to_vec(); + if let Some(client) = &self.client { - self.tgt_cols = columns.to_vec(); + // initialise parser according to format option + if let Some(format) = require_option("format", options) { + // create dummy parser + match format.as_str() { + "csv" => { + self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))) + } + "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), + "parquet" => self.parser = Parser::Parquet(S3Parquet::default()), + _ => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!( + "invalid format option: {}, it can only be 'csv', 'jsonl' or 'parquet'", + format + ), + ); + return; + } + } + } else { + return; + }; - match self + let stream = match self .rt - .block_on(client.get_object().bucket(bucket).key(object).send()) + .block_on(client.get_object().bucket(&bucket).key(&object).send()) { - Ok(resp) => { - let stream = resp.body.into_async_read(); - - let boxed_stream: Pin> = - // deal with compression - if let Some(compress) = options.get("compress") { - let buf_rdr = BufReader::new(stream); - match compress.as_str() { - "bzip2" => Box::pin(BzDecoder::new(buf_rdr)), - "gzip" => Box::pin(GzipDecoder::new(buf_rdr)), - "xz" => Box::pin(XzDecoder::new(buf_rdr)), - "zlib" => Box::pin(ZlibDecoder::new(buf_rdr)), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("invalid compression option: {}", compress), - ); - return; - } - } - } else { - Box::pin(stream) - }; - - let mut rdr: BufReader>> = BufReader::new(boxed_stream); - - // skip csv header line if needed - if let Parser::Csv(_) = self.parser { - if has_header { - let mut header = String::new(); - if let Err(err) = self.rt.block_on(rdr.read_line(&mut header)) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("fetch csv file failed: {}", err), - ); - return; - } + Ok(resp) => resp.body.into_async_read(), + Err(err) => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!("request s3 failed: {}", err), + ); + return; + } + }; + + let mut boxed_stream: Pin> = + if let Some(compress) = options.get("compress") { + let buf_rdr = BufReader::new(stream); + match compress.as_str() { + "bzip2" => Box::pin(BzDecoder::new(buf_rdr)), + "gzip" => Box::pin(GzipDecoder::new(buf_rdr)), + "xz" => Box::pin(XzDecoder::new(buf_rdr)), + "zlib" => Box::pin(ZlibDecoder::new(buf_rdr)), + _ => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!("invalid compression option: {}", compress), + ); + return; } } + } else { + Box::pin(stream) + }; + + // deal with parquet file, read all its content to local buffer if it is + // compressed, otherwise open async read stream for it + if let Parser::Parquet(ref mut s3parquet) = &mut self.parser { + if options.get("compress").is_some() { + // read all contents to local + let mut buf = Vec::new(); + self.rt + .block_on(boxed_stream.read_to_end(&mut buf)) + .expect("read compressed parquet file failed"); + self.rt.block_on(s3parquet.open_local_stream(buf)); + } else { + // open async read stream + self.rt.block_on(s3parquet.open_async_stream( + client, + &bucket, + &object, + &self.tgt_cols, + )); + } + return; + } + + let mut rdr: BufReader>> = BufReader::new(boxed_stream); - self.rdr = Some(rdr); + // skip csv header line if needed + if let Parser::Csv(_) = self.parser { + if has_header { + let mut header = String::new(); + if let Err(err) = self.rt.block_on(rdr.read_line(&mut header)) { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!("fetch csv file failed: {}", err), + ); + return; + } } - Err(err) => report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!("request s3 failed: {}", err), - ), } + + self.rdr = Some(rdr); } } fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + // read parquet record + if let Parser::Parquet(ref mut s3parquet) = &mut self.parser { + self.rt.block_on(s3parquet.refill())?; + return s3parquet.read_into_row(row, &self.tgt_cols); + } + + // read csv or jsonl record loop { if self.refill().is_none() { break; @@ -367,6 +405,7 @@ impl ForeignDataWrapper for S3Fdw { } } } + _ => unreachable!(), } } @@ -374,7 +413,9 @@ impl ForeignDataWrapper for S3Fdw { } fn end_scan(&mut self) { + // release local resources self.rdr.take(); + self.parser = Parser::JsonLine(VecDeque::new()); } fn validator(options: Vec>, catalog: Option) { diff --git a/wrappers/src/fdw/s3_fdw/tests.rs b/wrappers/src/fdw/s3_fdw/tests.rs index 133ea304..0ef4512c 100644 --- a/wrappers/src/fdw/s3_fdw/tests.rs +++ b/wrappers/src/fdw/s3_fdw/tests.rs @@ -111,6 +111,49 @@ mod tests { ) .unwrap(); + c.update( + r#" + CREATE FOREIGN TABLE s3_test_table_parquet ( + id integer, + bool_col boolean, + bigint_col bigint, + float_col real, + date_string_col text, + timestamp_col timestamp + ) + SERVER s3_server + OPTIONS ( + uri 's3://test/test_data.parquet', + format 'parquet' + ) + "#, + None, + None, + ) + .unwrap(); + + c.update( + r#" + CREATE FOREIGN TABLE s3_test_table_parquet_gz ( + id integer, + bool_col boolean, + bigint_col bigint, + float_col real, + date_string_col text, + timestamp_col timestamp + ) + SERVER s3_server + OPTIONS ( + uri 's3://test/test_data.parquet.gz', + format 'parquet', + compress 'gzip' + ) + "#, + None, + None, + ) + .unwrap(); + let check_test_table = |table| { let sql = format!("SELECT * FROM {} ORDER BY name LIMIT 1", table); let results = c @@ -130,6 +173,23 @@ mod tests { check_test_table("s3_test_table_csv_gz"); check_test_table("s3_test_table_jsonl"); check_test_table("s3_test_table_jsonl_bz"); + + let check_parquet_table = |table| { + let sql = format!("SELECT * FROM {} ORDER BY id LIMIT 1", table); + let results = c + .select(&sql, None, None) + .unwrap() + .filter_map(|r| { + r.get_by_name::("id") + .unwrap() + .zip(r.get_by_name::<&str, _>("date_string_col").unwrap()) + }) + .collect::>(); + assert_eq!(results, vec![(0, "01/01/09")]); + }; + + check_parquet_table("s3_test_table_parquet"); + check_parquet_table("s3_test_table_parquet_gz"); }); } }