diff --git a/Cargo.lock b/Cargo.lock index 29d27bb7c9..b567121abd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if 1.0.0", + "const-random", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy", @@ -169,6 +171,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator" version = "0.0.0" @@ -242,6 +259,99 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "arrow-array" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.4.1", + "hashbrown 0.14.5", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d" +dependencies = [ + "bytes", + "half 2.4.1", + "num", +] + +[[package]] +name = "arrow-cast" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi 2.0.0", + "base64 0.22.1", + "chrono", + "half 2.4.1", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.4.1", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-schema" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858" + +[[package]] +name = "arrow-select" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + [[package]] name = "assemble" version = "0.0.0" @@ -392,6 +502,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -689,6 +808,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "1.10.0" @@ -1068,6 +1208,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1256,6 +1416,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -1791,6 +1957,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "24.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + [[package]] name = "flate2" version = "1.0.31" @@ -2193,6 +2369,17 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if 1.0.0", + "crunchy", + "num-traits", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2658,6 +2845,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "integer-sqrt" version = "0.1.5" @@ -2871,6 +3064,70 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.155" @@ -3035,6 +3292,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "matchers" version = "0.1.0" @@ -3252,6 +3518,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -3262,6 +3542,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -3277,6 +3566,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -3379,6 +3690,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -3455,6 +3775,40 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parquet" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.22.1", + "brotli", + "bytes", + "chrono", + "flate2", + "half 2.4.1", + "hashbrown 0.14.5", + "lz4_flex", + "num", + "num-bigint", + "paste 1.0.15", + "seq-macro", + "serde_json", + "snap", + "thrift", + "twox-hash", + "zstd 0.13.2", + "zstd-sys", +] + [[package]] name = "parser" version = "0.0.0" @@ -3479,6 +3833,7 @@ dependencies = [ "lazy_static", "mime", "num-bigint", + "parquet", "protobuf", "protobuf-json-mapping", "protobuf-parse", @@ -4760,6 +5115,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.207" @@ -4795,7 +5156,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" dependencies = [ - "half", + "half 1.8.3", "serde", ] @@ -5130,7 +5491,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa8241483a83a3f33aa5fff7e7d9def398ff9990b2752b6c6112b83c6d246029" dependencies = [ "ahash 0.7.8", - "atoi", + "atoi 1.0.0", "base64 0.13.1", "bitflags 1.3.2", "byteorder", @@ -5207,6 +5568,12 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "str-buf" version = "1.0.6" @@ -5455,6 +5822,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + [[package]] name = "time" version = "0.1.45" @@ -5497,6 +5875,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -5854,6 +6241,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if 1.0.0", + "static_assertions", +] + [[package]] name = "typed-builder" version = "0.16.2" diff --git a/Cargo.toml b/Cargo.toml index 81b5805846..edba6f2dc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ num-bigint = "0.4" open = "3" +parquet = { version = "53", features = ["json"] } pathfinding = "3.0" pbjson = "0.7" pbjson-types = "0.7" @@ -215,4 +216,4 @@ librocksdb-sys = { git = "https://github.com/jgraettinger/rust-rocksdb", branch # Recommended by sqlx authors to speed up build-time checks. # https://github.com/launchbadge/sqlx#compile-time-verification [profile.dev.package.sqlx-macros] -opt-level = 3 \ No newline at end of file +opt-level = 3 diff --git a/crates/parser/Cargo.toml b/crates/parser/Cargo.toml index bdca5d3136..e5db7cd521 100644 --- a/crates/parser/Cargo.toml +++ b/crates/parser/Cargo.toml @@ -34,6 +34,7 @@ flate2 = { workspace = true } itertools = { workspace = true } mime = { workspace = true } num-bigint = { workspace = true } +parquet = { workspace = true } protobuf = { workspace = true } protobuf-json-mapping = { workspace = true } protobuf-parse = { workspace = true } diff --git a/crates/parser/src/config/mod.rs b/crates/parser/src/config/mod.rs index 2cff21420a..ffcd2bb380 100644 --- a/crates/parser/src/config/mod.rs +++ b/crates/parser/src/config/mod.rs @@ -319,12 +319,14 @@ pub enum Format { #[schemars(title = "W3C Extended Log")] W3cExtendedLog, + #[serde(rename="parquet")] + #[schemars(title = "Parquet")] + Parquet, + /// Placeholders for files types that are unsupported at this time. #[schemars(skip)] Excel, #[schemars(skip)] - Parquet, - #[schemars(skip)] Xml, #[schemars(skip)] Ods, // Open-document spreadsheet, for LibreOffice, etc. diff --git a/crates/parser/src/format/mod.rs b/crates/parser/src/format/mod.rs index 80b6dbeebf..7a25507562 100644 --- a/crates/parser/src/format/mod.rs +++ b/crates/parser/src/format/mod.rs @@ -3,12 +3,14 @@ pub mod character_separated; pub mod json; pub mod protobuf; pub mod sanitize; +pub mod parquet; use crate::config::ErrorThreshold; use crate::decorate::{AddFieldError, Decorator}; use crate::input::{detect_compression, CompressionError, Input}; use crate::{Compression, Format, JsonPointer, ParseConfig}; +use ::parquet::errors::ParquetError; use serde_json::Value; use std::io::{self, Write}; use std::path::Path; @@ -50,6 +52,12 @@ pub enum ParseError { #[error("failed to sanitize documents: {0}")] SanitizeError(#[from] sanitize::SanitizeError), + + #[error("failed to parse parquet: {0}")] + Parquet(#[from] ParquetError), + + #[error("file is too large, maximum size is 1GB")] + FileTooLarge, } /// Runs format inference if the config does not specify a `format`. The expectation is that more @@ -185,6 +193,7 @@ fn parser_for(format: Format) -> Box { Format::Protobuf(proto_config) => protobuf::new_protobuf_parser(proto_config), Format::W3cExtendedLog => character_separated::new_w3c_extended_log_parser(), Format::Avro => avro::new_parser(), + Format::Parquet => parquet::new_parser(), unsupported => Box::new(UnsupportedParser(unsupported)), } } @@ -285,6 +294,7 @@ fn format_for_content_type(content_type: &str) -> Option { "text/json" => Some(Format::Json), "text/csv" => Some(Format::Csv(Default::default())), "text/tab-separated-values" => Some(Format::Csv(Default::default())), + "application/vnd.apache.parquet" => Some(Format::Parquet), _ => None, } } diff --git a/crates/parser/src/format/parquet.rs b/crates/parser/src/format/parquet.rs new file mode 100644 index 0000000000..929e95d10d --- /dev/null +++ b/crates/parser/src/format/parquet.rs @@ -0,0 +1,103 @@ +//! Parser for the parquet format. This will accept any stream of JSON values separated by whitespace. +//! It allows any amount of whitespace (including newlines) within and in between records. +use super::{Input, Output, ParseError, Parser}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::record::reader::RowIter; +use std::convert::TryFrom; +use serde_json::Value; + +struct ParquetParser; + +pub fn new_parser() -> Box { + Box::new(ParquetParser) +} + +const MAX_FILE_SIZE: i64 = 1024 * 1024 * 1024; + +impl Parser for ParquetParser { + fn parse(&self, content: Input) -> Result { + let file = content.into_file()?; + let file_reader = SerializedFileReader::try_from(file)?; + + let mut total_size: i64 = 0; + for rg in file_reader.metadata().row_groups() { + total_size += rg.total_byte_size() + } + + // Files larger than 1GB are not allowed due to memory constraints + if total_size > MAX_FILE_SIZE { + return Err(ParseError::FileTooLarge) + } + + let iter = file_reader.into_iter(); + + let wrapped = ParquetIter { + inner: Box::new(iter), + }; + Ok(Box::new(wrapped)) + } +} + +struct ParquetIter<'a> { + inner: Box>, +} + +impl Iterator for ParquetIter<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + let next_elem = self.inner.next()?; + match next_elem { + Ok(row) => Some(Ok(row.to_json_value())), + Err(e) => Some(Err(e.into())), + } + } +} + + +#[cfg(test)] +mod test { + use std::fs::File; + + use super::*; + use serde_json::json; + + fn input_for_file(rel_path: impl AsRef) -> Input { + let file = File::open(rel_path).expect("failed to open file"); + Input::File(file) + } + + #[test] + fn parse_sample_file() { + let input = input_for_file("tests/examples/iris.parquet"); + let mut output = ParquetParser + .parse(input) + .expect("must return output iterator"); + + let first = output + .next() + .expect("expected a result") + .expect("must parse object Ok"); + assert_eq!(json!({ + "petal.length": 1.4, + "petal.width": 0.2, + "sepal.length": 5.1, + "sepal.width": 3.5, + "variety": "Setosa" + }), first); + let second = output + .next() + .expect("expected a result") + .expect("must parse object Ok"); + assert_eq!(json!({ + "petal.length": 1.4, + "petal.width": 0.2, + "sepal.length": 4.9, + "sepal.width": 3.0, + "variety": "Setosa" + }), second); + + // 50 total items + assert_eq!(output.count(), 148); + } +} diff --git a/crates/parser/tests/examples/iris.parquet b/crates/parser/tests/examples/iris.parquet new file mode 100644 index 0000000000..9224dead94 Binary files /dev/null and b/crates/parser/tests/examples/iris.parquet differ