diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/setup-builder/action.yaml new file mode 100644 index 00000000..13a3008b --- /dev/null +++ b/.github/actions/setup-builder/action.yaml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Prepare Rust Builder +description: 'Prepare Rust Build Environment' +inputs: + rust-version: + description: 'version of rust to install (e.g. stable)' + required: true + default: 'stable' +runs: + using: "composite" + steps: + - name: Install Build Dependencies + shell: bash + run: | + apt-get update + apt-get install -y protobuf-compiler + - name: Setup Rust toolchain + shell: bash + run: | + echo "Installing ${{ inputs.rust-version }}" + rustup toolchain install ${{ inputs.rust-version }} + rustup default ${{ inputs.rust-version }} + rustup component add rustfmt diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 868fd895..0c56e4a9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -60,11 +60,6 @@ jobs: with: toolchain: stable - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install maturin==0.14.2 - - run: rm LICENSE.txt - name: Download LICENSE.txt uses: actions/download-artifact@v3 @@ -73,7 +68,7 @@ jobs: path: . - name: Build Python package - run: maturin build --release --strip --locked + run: maturin build --release --strip - name: List Windows wheels if: matrix.os == 'windows-latest' @@ -103,12 +98,12 @@ jobs: path: . - run: cat LICENSE.txt - name: Build wheels - run: | - export RUSTFLAGS='-C target-cpu=skylake' - docker run --rm -v $(pwd):/io \ - --workdir /io \ - ghcr.io/pyo3/maturin:v0.14.2 \ - build --release --manylinux 2014 --locked + uses: PyO3/maturin-action@v1 + with: + rust-toolchain: nightly + target: x86_64 + manylinux: auto + args: --release --manylinux 2014 - name: Archive wheels uses: actions/upload-artifact@v3 with: @@ -129,12 +124,11 @@ jobs: path: . - run: cat LICENSE.txt - name: Build sdist - run: | - docker run --rm -v $(pwd):/io \ - --workdir /io \ - ghcr.io/pyo3/maturin:v0.14.2 \ - build --release --sdist --locked - find target/wheels/ + uses: PyO3/maturin-action@v1 + with: + rust-toolchain: stable + manylinux: auto + args: --release --sdist --out dist - name: Archive wheels uses: actions/upload-artifact@v2 with: diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 327b1cf4..ffa98325 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -53,6 +53,19 @@ jobs: toolchain: ${{ matrix.toolchain }} override: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-linux-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + export PROTOC=$HOME/d/protoc/bin + sudo chown -R $(whoami) $HOME/d/protoc + protoc --version + - name: Setup Python uses: actions/setup-python@v4 with: @@ -99,9 +112,22 @@ jobs: flake8 --exclude venv --ignore=E501,W503 black --line-length 79 --diff --check . + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + command: build + args: --release --out dist + - name: Run tests run: | git submodule update --init - source venv/bin/activate - maturin develop --locked + export PATH=$PATH:$HOME/d/protoc/bin + export PROTOC=$HOME/d/protoc/bin + sudo chown -R $(whoami) $HOME/d/protoc + ls -l $HOME/d/protoc/ + ls -l $HOME/d/protoc/bin + pip install datafusion-python --no-index --find-links dist --force-reinstall + pip install pytest + cargo clean + maturin develop RUST_BACKTRACE=1 pytest -v . diff --git a/Cargo.lock b/Cargo.lock index ba432f99..918631ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" + [[package]] name = "apache-avro" version = "0.14.0" @@ -101,11 +107,12 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fe17dc0113da7e2eaeaedbd304d347aa8ea64916d225b79a5c3f3b6b5d8da4c" +checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945" dependencies = [ "ahash", + "arrow-arith", "arrow-array", "arrow-buffer", "arrow-cast", @@ -114,25 +121,35 @@ dependencies = [ "arrow-ipc", "arrow-json", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", "bitflags", - "chrono", "comfy-table", + "pyo3", +] + +[[package]] +name = "arrow-arith" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", "half", - "hashbrown 0.13.2", - "multiversion", "num", - "pyo3", - "regex", ] [[package]] name = "arrow-array" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9452131e027aec3276e43449162af084db611c42ef875e54d231e6580bc6254" +checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1" dependencies = [ "ahash", "arrow-buffer", @@ -146,9 +163,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a301001e8ed7da638a12fa579ac5f3f154c44c0655f2ca6ed0f8586b418a779" +checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748" dependencies = [ "half", "num", @@ -156,9 +173,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048c91d067f2eb8cc327f086773e5b0f0d7714780807fc4db09366584e23bac8" +checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04" dependencies = [ "arrow-array", "arrow-buffer", @@ -172,9 +189,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed914cd0006a3bb9cac8136b3098ac7796ad26b82362f00d4f2e7c1a54684b86" +checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448" dependencies = [ "arrow-array", "arrow-buffer", @@ -183,6 +200,7 @@ dependencies = [ "arrow-schema", "chrono", "csv", + "csv-core", "lazy_static", "lexical-core", "regex", @@ -190,9 +208,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59619d9d102e4e6b22087b2bd60c07df76fcb68683620841718f6bc8e8f02cb" +checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477" dependencies = [ "arrow-buffer", "arrow-schema", @@ -202,9 +220,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7ad6d2fa06a1cebdaa213c59fc953b9230e560d8374aba133b572b864ec55e" +checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190" dependencies = [ "arrow-array", "arrow-buffer", @@ -216,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e22efab3ad70336057660c5e5f2b72e2417e3444c27cb42dc477d678ddd6979" +checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a" dependencies = [ "arrow-array", "arrow-buffer", @@ -234,9 +252,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23b623332804a65ad11e7732c351896dcb132c19f8e25d99fdb13b00aae5206" +checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3" dependencies = [ "arrow-array", "arrow-buffer", @@ -246,17 +264,32 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-row" +version = "31.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", + "hashbrown 0.13.2", +] + [[package]] name = "arrow-schema" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69ef17c144f1253b9864f5a3e8f4c6f1e436bdd52394855d5942f132f776b64e" +checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723" [[package]] name = "arrow-select" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2accaf218ff107e3df0ee8f1e09b092249a1cc741c4377858a1470fd27d7096" +checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c" dependencies = [ "arrow-array", "arrow-buffer", @@ -267,9 +300,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a0954f9e1f45b04815ddacbde72899bf3c03a08fa6c0375f42178c4a01a510" +checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8" dependencies = [ "arrow-array", "arrow-buffer", @@ -296,6 +329,17 @@ dependencies = [ "xz2", ] +[[package]] +name = "async-recursion" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b015a331cc64ebd1774ba119538573603427eaace0a1950c423ab971f903796" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.61" @@ -313,12 +357,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.20.0" @@ -640,8 +678,7 @@ dependencies = [ [[package]] name = "datafusion" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f166d67281ee90d0b35d93a9d13b3b32c26f35348da9e5101a12e9de6093bd06" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "ahash", "apache-avro", @@ -690,8 +727,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cb2144c73ca974b00bf735a6e8692efe22c65731097bc49018a0edfbd1d0120" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "apache-avro", "arrow", @@ -706,8 +742,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6b988765372fdee77d805dda00cb7ffb28dfda831cc0b79aff9e09527b70402" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "ahash", "arrow", @@ -719,8 +754,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b44aa5128e3e6f76d7ae8efc3a595133abd74aa60c3f50f39f93ea00ef302c" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "arrow", "async-trait", @@ -736,8 +770,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fd7e12ce4d61eb698a83fde3bf14417fe67a23d7131b1f00c28dc70183aeed7" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "ahash", "arrow", @@ -773,6 +806,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-optimizer", + "datafusion-substrait", "futures", "mimalloc", "object_store", @@ -787,8 +821,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e34f6c09f1458190bb90305d70c2075bf2dd4cf8c51a65d5635e5217a3bb8bff" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "arrow", "datafusion-common", @@ -799,8 +832,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "16.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42f29fd2c98e0e0030db4638f971c91145ef5e67ab139f0426b2891e14b9bf5" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" dependencies = [ "arrow-schema", "datafusion-common", @@ -809,6 +841,20 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "datafusion-substrait" +version = "16.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=6dce728a3c7130ca3590a16f413c7c6ccb7209b7#6dce728a3c7130ca3590a16f413c7c6ccb7209b7" +dependencies = [ + "async-recursion", + "datafusion", + "prost 0.11.6", + "prost-build 0.9.0", + "prost-types 0.11.6", + "substrait", + "tokio", +] + [[package]] name = "digest" version = "0.10.6" @@ -826,6 +872,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dyn-clone" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9b0705efd4599c15a38151f4721f7bc388306f61084d3bfd50bd07fbca5cb60" + [[package]] name = "either" version = "1.8.0" @@ -850,6 +902,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flatbuffers" version = "22.9.29" @@ -1045,6 +1103,15 @@ dependencies = [ "ahash", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.0" @@ -1060,6 +1127,15 @@ dependencies = [ "libc", ] +[[package]] +name = "home" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "747309b4b440c06d57b0b25f2aee03ee9b5e5397d288c60e21fc709bb98a7408" +dependencies = [ + "winapi", +] + [[package]] name = "http" version = "0.2.8" @@ -1473,24 +1549,10 @@ dependencies = [ ] [[package]] -name = "multiversion" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "025c962a3dd3cc5e0e520aa9c612201d127dcdf28616974961a649dca64f5373" -dependencies = [ - "multiversion-macros", -] - -[[package]] -name = "multiversion-macros" -version = "0.6.1" +name = "multimap" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "num" @@ -1519,9 +1581,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ae39348c8bc5fbd7f40c727a9925f03517afd2ab27d46702108b6a7e5414c19" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" dependencies = [ "num-traits", ] @@ -1647,9 +1709,9 @@ dependencies = [ [[package]] name = "parquet" -version = "29.0.0" +version = "31.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d906343fd18ace6b998d5074697743e8e9358efa8c3c796a1381b98cba813338" +checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45" dependencies = [ "ahash", "arrow-array", @@ -1659,7 +1721,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.13.1", + "base64 0.21.0", "brotli", "bytes", "chrono", @@ -1690,6 +1752,26 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pest" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4257b4a04d91f7e9e6290be5d3da4804dd5784fafde3a497d73eb2b4a158c30a" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1729,6 +1811,112 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" +dependencies = [ + "bytes", + "prost-derive 0.9.0", +] + +[[package]] +name = "prost" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" +dependencies = [ + "bytes", + "prost-derive 0.11.6", +] + +[[package]] +name = "prost-build" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" +dependencies = [ + "bytes", + "heck 0.3.3", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost 0.9.0", + "prost-types 0.9.0", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-build" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" +dependencies = [ + "bytes", + "heck 0.4.0", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost 0.11.6", + "prost-types 0.11.6", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-derive" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" +dependencies = [ + "bytes", + "prost 0.9.0", +] + +[[package]] +name = "prost-types" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" +dependencies = [ + "bytes", + "prost 0.11.6", +] + [[package]] name = "pyo3" version = "0.17.3" @@ -1876,6 +2064,15 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "regress" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a92ff21fe8026ce3f2627faaf43606f0b67b014dbc9ccf027181a804f75d92e" +dependencies = [ + "memchr", +] + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1887,11 +2084,11 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" +checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9" dependencies = [ - "base64 0.13.1", + "base64 0.21.0", "bytes", "encoding_rs", "futures-core", @@ -1920,6 +2117,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "winreg", @@ -1946,6 +2144,19 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "rustfmt-wrapper" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed729e3bee08ec2befd593c27e90ca9fdd25efdc83c94c3b82eaef16e4f7406e" +dependencies = [ + "serde", + "tempfile", + "thiserror", + "toml", + "toolchain_find", +] + [[package]] name = "rustls" version = "0.20.8" @@ -1988,6 +2199,30 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schemars" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a5fb6c61f29e723026dc8e923d94c694313212abbecbbe5f55a7748eec5b307" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f188d036977451159430f3b8dc82ec76364a42b7e289c2b18a9a18f4470058e9" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2010,6 +2245,24 @@ dependencies = [ "untrusted", ] +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + [[package]] name = "seq-macro" version = "0.3.2" @@ -2036,6 +2289,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_json" version = "1.0.91" @@ -2047,6 +2311,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_tokenstream" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "274f512d6748a01e67cbcde5b4307ab2c9d52a98a2b870a980ef0793a351deff" +dependencies = [ + "proc-macro2", + "serde", + "syn", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2059,6 +2334,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834" +dependencies = [ + "indexmap", + "itoa 1.0.5", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha2" version = "0.10.6" @@ -2101,7 +2389,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "syn", @@ -2168,13 +2456,31 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "rustversion", "syn", ] +[[package]] +name = "substrait" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2feb96a6a106e21161551af32dc4e0fdab3aceb926b940d7e92a086b640fc7c" +dependencies = [ + "heck 0.4.0", + "prost 0.11.6", + "prost-build 0.11.6", + "prost-types 0.11.6", + "schemars", + "serde", + "serde_json", + "serde_yaml", + "typify", + "walkdir", +] + [[package]] name = "subtle" version = "2.4.1" @@ -2342,6 +2648,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f" +dependencies = [ + "serde", +] + +[[package]] +name = "toolchain_find" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e85654a10e7a07a47c6f19d93818f3f343e22927f2fa280c84f7c8042743413" +dependencies = [ + "home", + "lazy_static", + "regex", + "semver", + "walkdir", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -2413,6 +2741,57 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "typify" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8486352f3c946e69f983558cfc09b295250b01e01b381ec67a05a812d01d63" +dependencies = [ + "typify-impl", + "typify-macro", +] + +[[package]] +name = "typify-impl" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7624d0b911df6e2bbf34a236f76281f93b294cdde1d4df1dbdb748e5a7fefa5" +dependencies = [ + "heck 0.4.0", + "log", + "proc-macro2", + "quote", + "regress", + "rustfmt-wrapper", + "schemars", + "serde_json", + "syn", + "thiserror", + "unicode-ident", +] + +[[package]] +name = "typify-macro" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c42802aa033cee7650a4e1509ba7d5848a56f84be7c4b31e4385ee12445e942" +dependencies = [ + "proc-macro2", + "quote", + "schemars", + "serde", + "serde_json", + "serde_tokenstream", + "syn", + "typify-impl", +] + +[[package]] +name = "ucd-trie" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81" + [[package]] name = "unicode-bidi" version = "0.3.9" @@ -2452,6 +2831,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" +[[package]] +name = "unsafe-libyaml" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" + [[package]] name = "untrusted" version = "0.7.1" @@ -2587,6 +2972,19 @@ version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.60" @@ -2616,6 +3014,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "which" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b" +dependencies = [ + "either", + "libc", + "once_cell", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 5ab6d79f..0ab8e98d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,10 +34,11 @@ default = ["mimalloc"] tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.8" pyo3 = { version = "~0.17.3", features = ["extension-module", "abi3", "abi3-py37"] } -datafusion = { version = "16.0.0", features = ["pyarrow", "avro"] } -datafusion-expr = { version = "16.0.0" } -datafusion-optimizer = { version = "16.0.0" } -datafusion-common = { version = "16.0.0", features = ["pyarrow"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6dce728a3c7130ca3590a16f413c7c6ccb7209b7", features = ["pyarrow", "avro"] } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "6dce728a3c7130ca3590a16f413c7c6ccb7209b7" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion", rev = "6dce728a3c7130ca3590a16f413c7c6ccb7209b7" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion", rev = "6dce728a3c7130ca3590a16f413c7c6ccb7209b7", features = ["pyarrow"] } +datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion", rev = "6dce728a3c7130ca3590a16f413c7c6ccb7209b7" } uuid = { version = "0.8", features = ["v4"] } mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" @@ -55,4 +56,4 @@ name = "datafusion._internal" [profile.release] lto = true -codegen-units = 1 \ No newline at end of file +codegen-units = 1 diff --git a/README.md b/README.md index 17e872af..ab89ff6d 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,43 @@ This produces the following chart: ![Chart](examples/chart.png) +## Substrait Support + +`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed. + +### Example of Serializing/Deserializing Substrait Plans + +```python +from datafusion import SessionContext +from datafusion import substrait as ss + +# Create a DataFusion context +ctx = SessionContext() + +# Register table with context +ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv') + +substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) +# type(substrait_plan) -> + +# Alternative serialization approaches +# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely +# where they could subsequently be deserialized on the receiving end. +substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) + +# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused +# type(substrait_plan) -> +substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) + +# type(df_logical_plan) -> +df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan) + +# Back to Substrait Plan just for demonstration purposes +# type(substrait_plan) -> +substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) + +``` + ## How to install (from pip) ### Pip diff --git a/datafusion/substrait.py b/datafusion/substrait.py new file mode 100644 index 00000000..eff809a0 --- /dev/null +++ b/datafusion/substrait.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from ._internal import substrait + + +def __getattr__(name): + return getattr(substrait, name) diff --git a/datafusion/tests/test_substrait.py b/datafusion/tests/test_substrait.py new file mode 100644 index 00000000..7f31c5a9 --- /dev/null +++ b/datafusion/tests/test_substrait.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pyarrow.dataset as ds + +from datafusion import column, literal, SessionContext +from datafusion import substrait as ss +import pytest + + +@pytest.fixture +def ctx(): + return SessionContext() + + +def test_substrait_serialization(ctx): + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3]), pa.array([4, 5, 6])], + names=["a", "b"], + ) + + ctx.register_record_batches("t", [[batch]]) + + assert ctx.tables() == {"t"} + + # For now just make sure the method calls blow up + substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM t", ctx) + substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM t", ctx) + substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes) + df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan) + substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan) diff --git a/src/context.rs b/src/context.rs index 2ffa3da3..71f99f55 100644 --- a/src/context.rs +++ b/src/context.rs @@ -48,8 +48,9 @@ use datafusion_common::ScalarValue; /// It has a powerful optimizer, a physical planner for local execution, and a /// multi-threaded execution engine to perform the execution. #[pyclass(name = "SessionContext", module = "datafusion", subclass, unsendable)] +#[derive(Clone)] pub(crate) struct PySessionContext { - ctx: SessionContext, + pub(crate) ctx: SessionContext, } #[pymethods] @@ -481,3 +482,15 @@ fn convert_table_partition_cols( }) .collect::, _>>() } + +impl From for SessionContext { + fn from(ctx: PySessionContext) -> SessionContext { + ctx.ctx + } +} + +impl From for PySessionContext { + fn from(ctx: SessionContext) -> PySessionContext { + PySessionContext { ctx } + } +} diff --git a/src/errors.rs b/src/errors.rs index 29d3e8f2..69a476d8 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -17,6 +17,7 @@ use core::fmt; use std::error::Error; +use std::fmt::Debug; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError as InnerDataFusionError; @@ -69,3 +70,15 @@ impl From for PyErr { } impl Error for DataFusionError {} + +pub fn py_type_err(e: impl Debug) -> PyErr { + PyErr::new::(format!("{:?}", e)) +} + +pub fn py_runtime_err(e: impl Debug) -> PyErr { + PyErr::new::(format!("{:?}", e)) +} + +pub fn py_datafusion_err(e: impl Debug) -> PyErr { + PyErr::new::(format!("{:?}", e)) +} diff --git a/src/lib.rs b/src/lib.rs index 28544f5d..21b47f44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,8 +34,10 @@ pub mod errors; mod expression; #[allow(clippy::borrow_deref_ref)] mod functions; +pub mod logical; mod pyarrow_filter_expression; pub mod store; +pub mod substrait; #[allow(clippy::borrow_deref_ref)] mod udaf; #[allow(clippy::borrow_deref_ref)] @@ -62,6 +64,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Register the functions as a submodule let funcs = PyModule::new(py, "functions")?; @@ -72,5 +75,10 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { store::init_module(store)?; m.add_submodule(store)?; + // Register substrait as a submodule + let substrait = PyModule::new(py, "substrait")?; + substrait::init_module(substrait)?; + m.add_submodule(substrait)?; + Ok(()) } diff --git a/src/logical.rs b/src/logical.rs new file mode 100644 index 00000000..8c3acf71 --- /dev/null +++ b/src/logical.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_expr::LogicalPlan; +use pyo3::prelude::*; + +#[pyclass(name = "LogicalPlan", module = "substrait", subclass)] +#[derive(Debug, Clone)] +pub struct PyLogicalPlan { + pub(crate) plan: Arc, +} + +impl PyLogicalPlan { + /// creates a new PyLogicalPlan + pub fn new(plan: LogicalPlan) -> Self { + Self { + plan: Arc::new(plan), + } + } +} + +impl From for LogicalPlan { + fn from(logical_plan: PyLogicalPlan) -> LogicalPlan { + logical_plan.plan.as_ref().clone() + } +} + +impl From for PyLogicalPlan { + fn from(logical_plan: LogicalPlan) -> PyLogicalPlan { + PyLogicalPlan { + plan: Arc::new(logical_plan), + } + } +} diff --git a/src/substrait.rs b/src/substrait.rs new file mode 100644 index 00000000..0c42e210 --- /dev/null +++ b/src/substrait.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; + +use crate::context::PySessionContext; +use crate::errors::py_datafusion_err; +use crate::{logical::PyLogicalPlan, utils::wait_for_future}; + +use crate::errors::DataFusionError; +use datafusion_substrait::{consumer, producer, serializer, substrait::proto::Plan}; + +#[pyclass(name = "plan", module = "datafusion.substrait", subclass, unsendable)] +#[derive(Debug, Clone)] +pub(crate) struct PyPlan { + pub(crate) plan: Plan, +} + +impl From for Plan { + fn from(plan: PyPlan) -> Plan { + plan.plan + } +} + +impl From for PyPlan { + fn from(plan: Plan) -> PyPlan { + PyPlan { plan } + } +} + +/// A PySubstraitSerializer is a representation of a Serializer that is capable of both serializing +/// a `LogicalPlan` instance to Substrait Protobuf bytes and also deserialize Substrait Protobuf bytes +/// to a valid `LogicalPlan` instance. +#[pyclass(name = "serde", module = "datafusion.substrait", subclass, unsendable)] +#[derive(Debug, Clone)] +pub(crate) struct PySubstraitSerializer; + +#[pymethods] +impl PySubstraitSerializer { + #[staticmethod] + pub fn serialize(sql: &str, ctx: PySessionContext, path: &str, py: Python) -> PyResult<()> { + wait_for_future(py, serializer::serialize(sql, &ctx.ctx, path)) + .map_err(DataFusionError::from)?; + Ok(()) + } + + #[staticmethod] + pub fn serialize_to_plan(sql: &str, ctx: PySessionContext, py: Python) -> PyResult { + match PySubstraitSerializer::serialize_bytes(sql, ctx, py) { + Ok(proto_bytes) => PySubstraitSerializer::deserialize_bytes(proto_bytes, py), + Err(e) => Err(py_datafusion_err(e)), + } + } + + #[staticmethod] + pub fn serialize_bytes(sql: &str, ctx: PySessionContext, py: Python) -> PyResult> { + let proto_bytes: Vec = wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx)) + .map_err(DataFusionError::from)?; + Ok(proto_bytes) + } + + #[staticmethod] + pub fn deserialize(path: &str, py: Python) -> PyResult { + let plan = + wait_for_future(py, serializer::deserialize(path)).map_err(DataFusionError::from)?; + Ok(PyPlan { plan: *plan }) + } + + #[staticmethod] + pub fn deserialize_bytes(proto_bytes: Vec, py: Python) -> PyResult { + let plan = wait_for_future(py, serializer::deserialize_bytes(proto_bytes)) + .map_err(DataFusionError::from)?; + Ok(PyPlan { plan: *plan }) + } +} + +#[pyclass( + name = "producer", + module = "datafusion.substrait", + subclass, + unsendable +)] +#[derive(Debug, Clone)] +pub(crate) struct PySubstraitProducer; + +#[pymethods] +impl PySubstraitProducer { + /// Convert DataFusion LogicalPlan to Substrait Plan + #[staticmethod] + pub fn to_substrait_plan(plan: PyLogicalPlan) -> PyResult { + match producer::to_substrait_plan(&plan.plan) { + Ok(plan) => Ok(PyPlan { plan: *plan }), + Err(e) => Err(py_datafusion_err(e)), + } + } +} + +#[pyclass( + name = "consumer", + module = "datafusion.substrait", + subclass, + unsendable +)] +#[derive(Debug, Clone)] +pub(crate) struct PySubstraitConsumer; + +#[pymethods] +impl PySubstraitConsumer { + /// Convert Substrait Plan to DataFusion DataFrame + #[staticmethod] + pub fn from_substrait_plan( + ctx: &mut PySessionContext, + plan: PyPlan, + py: Python, + ) -> PyResult { + let result = consumer::from_substrait_plan(&mut ctx.ctx, &plan.plan); + let logical_plan = wait_for_future(py, result).map_err(DataFusionError::from)?; + Ok(PyLogicalPlan::new(logical_plan)) + } +} + +pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + Ok(()) +}