diff --git a/native/Cargo.lock b/native/Cargo.lock index cb0e315c88..9de887c616 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "addr2line" @@ -123,9 +123,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaf3437355979f1e93ba84ba108c38be5767713051f3c8ffbf07c094e2e61f9f" +checksum = "d2ccdcc8fb14508ca20aaec7076032e5c0b0751b906036d4496786e2f227a37a" dependencies = [ "arrow-arith", "arrow-array", @@ -144,24 +144,23 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31dce77d2985522288edae7206bffd5fc4996491841dda01a13a58415867e681" +checksum = "a1aad8e27f32e411a0fc0bf5a625a35f0bf9b9f871cf4542abe31f7cef4beea2" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "half", "num", ] [[package]] name = "arrow-array" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d45fe6d3faed0435b7313e59a02583b14c6c6339fa7729e94c32a20af319a79" +checksum = "bd6ed90c28c6f73a706c55799b8cc3a094e89257238e5b1d65ca7c70bd3ae23f" dependencies = [ "ahash", "arrow-buffer", @@ -176,9 +175,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b02656a35cc103f28084bc80a0159668e0a680d919cef127bd7e0aaccb06ec1" +checksum = "fe4a40bdc1552ea10fbdeae4e5a945d8572c32f66bce457b96c13d9c46b80447" dependencies = [ "bytes", "half", @@ -187,9 +186,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c73c6233c5b5d635a56f6010e6eb1ab9e30e94707db21cea03da317f67d84cf3" +checksum = "430c0a21aa7f81bcf0f97c57216d7127795ea755f494d27bae2bd233be43c2cc" dependencies = [ "arrow-array", "arrow-buffer", @@ -208,28 +207,25 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec222848d70fea5a32af9c3602b08f5d740d5e2d33fbd76bf6fd88759b5b13a7" +checksum = "b4444c8f8c57ac00e6a679ede67d1ae8872c170797dc45b46f75702437a77888" dependencies = [ "arrow-array", - "arrow-buffer", "arrow-cast", - "arrow-data", "arrow-schema", "chrono", "csv", "csv-core", "lazy_static", - "lexical-core", "regex", ] [[package]] name = "arrow-data" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f2861ffa86f107b8ab577d86cff7c7a490243eabe961ba1e1af4f27542bb79" +checksum = "09af476cfbe9879937e50b1334c73189de6039186e025b1b1ac84b283b87b20e" dependencies = [ "arrow-buffer", "arrow-schema", @@ -239,13 +235,12 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0270dc511f11bb5fa98a25020ad51a99ca5b08d8a8dfbd17503bb9dba0388f0b" +checksum = "136296e8824333a8a4c4a6e508e4aa65d5678b801246d0408825ae7b2523c628" dependencies = [ "arrow-array", "arrow-buffer", - "arrow-cast", "arrow-data", "arrow-schema", "flatbuffers", @@ -254,9 +249,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eff38eeb8a971ad3a4caf62c5d57f0cff8a48b64a55e3207c4fd696a9234aad" +checksum = "e222ad0e419ab8276818c5605a5bb1e35ed86fa8c5e550726433cc63b09c3c78" dependencies = [ "arrow-array", "arrow-buffer", @@ -274,26 +269,23 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f202a879d287099139ff0d121e7f55ae5e0efe634b8cf2106ebc27a8715dee" +checksum = "eddf14c5f03b679ec8ceac4dfac43f63cdc4ed54dab3cc120a4ef46af38481eb" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "half", - "num", ] [[package]] name = "arrow-row" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f936954991c360ba762dff23f5dda16300774fafd722353d9683abd97630ae" +checksum = "e9acdc58da19f383f4ba381fa0e3583534ae2ceb31269aaf4a03f08ff13e8443" dependencies = [ - "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -303,18 +295,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9579b9d8bce47aa41389fe344f2c6758279983b7c0ebb4013e283e3e91bb450e" +checksum = "3a1822a1a952955637e85e8f9d6b0e04dd75d65492b87ec548dd593d3a1f772b" dependencies = [ "bitflags 2.8.0", ] [[package]] name = "arrow-select" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7471ba126d0b0aaa24b50a36bc6c25e4e74869a1fd1a5553357027a0b1c8d1f1" +checksum = "5c4172e9a12dfe15303d3926269f9ead471ea93bdd067d113abc65cb6c48e246" dependencies = [ "ahash", "arrow-array", @@ -326,9 +318,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72993b01cb62507b06f1fb49648d7286c8989ecfabdb7b77a750fcb54410731b" +checksum = "73683040445f4932342781926189901c9521bb1a787c35dbe628a3ce51372d3c" dependencies = [ "arrow-array", "arrow-buffer", @@ -695,9 +687,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" dependencies = [ "libc", ] @@ -828,8 +820,7 @@ dependencies = [ [[package]] name = "datafusion" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "014fc8c384ecacedaabb3bc8359c2a6c6e9d8f7bea65be3434eccacfc37f52d9" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "arrow-array", @@ -856,7 +847,7 @@ dependencies = [ "datafusion-sql", "futures", "glob", - "itertools 0.13.0", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -873,8 +864,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee60d33e210ef96070377ae667ece7caa0e959c8387496773d4a1a72f1a5012e" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow-schema", "async-trait", @@ -974,14 +964,15 @@ dependencies = [ [[package]] name = "datafusion-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b42b7d720fe21ed9cca2ebb635f3f13a12cfab786b41e0fba184fb2e620525b" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", + "arrow-ipc", "arrow-schema", + "base64", "half", "hashbrown 0.14.5", "indexmap", @@ -998,8 +989,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72fbf14d4079f7ce5306393084fe5057dddfdc2113577e0049310afa12e94281" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "log", "tokio", @@ -1008,14 +998,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c278dbd64860ed0bb5240fc1f4cb6aeea437153910aea69bcf7d5a8d6d0454f3" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" [[package]] name = "datafusion-execution" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22cb02af47e756468b3cbfee7a83e3d4f2278d452deb4b033ba933c75169486" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "dashmap", @@ -1033,8 +1021,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62298eadb1d15b525df1315e61a71519ffc563d41d5c3b2a30fda2d70f77b93c" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "chrono", @@ -1053,19 +1040,17 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dda7f73c5fc349251cd3dcb05773c5bf55d2505a698ef9d38dfc712161ea2f55" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "datafusion-common", - "itertools 0.13.0", + "itertools 0.14.0", ] [[package]] name = "datafusion-functions" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd197f3b2975424d3a4898ea46651be855a46721a56727515dbd5c9e2fb597da" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "arrow-buffer", @@ -1081,7 +1066,7 @@ dependencies = [ "datafusion-macros", "hashbrown 0.14.5", "hex", - "itertools 0.13.0", + "itertools 0.14.0", "log", "md-5", "rand", @@ -1094,11 +1079,11 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabbe48fba18f9981b134124381bee9e46f93518b8ad2f9721ee296cef5affb9" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", + "arrow-buffer", "arrow-schema", "datafusion-common", "datafusion-doc", @@ -1116,8 +1101,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a3fefed9c8c11268d446d924baca8cabf52fe32f73fdaa20854bac6473590c" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", @@ -1129,8 +1113,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6360f27464fab857bec698af39b2ae331dc07c8bf008fb4de387a19cdc6815a5" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "arrow-array", @@ -1138,12 +1121,14 @@ dependencies = [ "arrow-ord", "arrow-schema", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-macros", "datafusion-physical-expr-common", - "itertools 0.13.0", + "itertools 0.14.0", "log", "paste", ] @@ -1151,8 +1136,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c35c070eb705c12795dab399c3809f4dfbc290678c624d3989490ca9b8449c1" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "async-trait", @@ -1167,8 +1151,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52229bca26b590b140900752226c829f15fc1a99840e1ca3ce1a9534690b82a8" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "datafusion-common", "datafusion-doc", @@ -1184,8 +1167,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "367befc303b64a668a10ae6988a064a9289e1999e71a7f8e526b6e14d6bdd9d6" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1194,9 +1176,9 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5de3c8f386ea991696553afe241a326ecbc3c98a12c562867e4be754d3a060c" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ + "datafusion-expr", "quote", "syn 2.0.96", ] @@ -1204,8 +1186,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b520413906f755910422b016fb73884ae6e9e1b376de4f9584b6c0e031da75" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "chrono", @@ -1213,7 +1194,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "regex", "regex-syntax", @@ -1222,8 +1203,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd6ddc378f6ad19af95ccd6790dec8f8e1264bc4c70e99ddc1830c1a1c78ccd" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", @@ -1238,47 +1218,50 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "paste", - "petgraph", + "petgraph 0.7.1", ] [[package]] name = "datafusion-physical-expr-common" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e6c05458eccd74b4c77ed6a1fe63d52434240711de7f6960034794dad1caf5" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "itertools 0.13.0", + "itertools 0.14.0", ] [[package]] name = "datafusion-physical-optimizer" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dc3a82190f49c37d377f31317e07ab5d7588b837adadba8ac367baad5dc2351" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", + "arrow-schema", "datafusion-common", "datafusion-execution", + "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools 0.13.0", + "futures", + "itertools 0.14.0", "log", + "url", ] [[package]] name = "datafusion-physical-plan" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6608bc9844b4ddb5ed4e687d173e6c88700b1d0482f43894617d18a1fe75da" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "ahash", "arrow", @@ -1299,7 +1282,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap", - "itertools 0.13.0", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -1309,8 +1292,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "44.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a884061c79b33d0c8e84a6f4f4be8bdc12c0f53f5af28ddf5d6d95ac0b15fdc" +source = "git+https://github.com/findepi/datafusion?branch=findepi/fix-float-and-decimal-coercion-b58e78#bed8a43b2da09fb3fbe0eafd50e3e5f2182026a8" dependencies = [ "arrow", "arrow-array", @@ -1438,6 +1420,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flatbuffers" version = "24.12.23" @@ -1907,6 +1895,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -2361,9 +2358,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.4.0" +version = "54.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8957c0c95a6a1804f3e51a18f69df29be53856a8c5768cc9b6d00fcafcd2917c" +checksum = "3334c50239d9f4951653d84fa6f636da86f53742e5e5849a30fbe852f3ff4383" dependencies = [ "ahash", "arrow-array", @@ -2422,7 +2419,17 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ - "fixedbitset", + "fixedbitset 0.4.2", + "indexmap", +] + +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset 0.5.7", "indexmap", ] @@ -2582,7 +2589,7 @@ dependencies = [ "lazy_static", "log", "multimap", - "petgraph", + "petgraph 0.6.5", "prost 0.9.0", "prost-types", "regex", diff --git a/native/Cargo.toml b/native/Cargo.toml index b16a802d67..8ad9a93d30 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -22,7 +22,7 @@ resolver = "2" [workspace.package] version = "0.6.0" homepage = "https://datafusion.apache.org/comet" -repository = "https://github.com/apache/datafusion-comet" +repository = "https://github.com/findepi/datafusion-comet" authors = ["Apache DataFusion "] description = "Apache DataFusion Comet: High performance accelerator for Apache Spark" readme = "README.md" @@ -30,24 +30,24 @@ license = "Apache-2.0" edition = "2021" # Comet uses the same minimum Rust version as DataFusion -rust-version = "1.79" +rust-version = "1.81" [workspace.dependencies] -arrow = { version = "53.4.0", features = ["prettyprint", "ffi", "chrono-tz"] } -arrow-array = { version = "53.4.0" } -arrow-buffer = { version = "53.4.0" } -arrow-data = { version = "53.4.0" } -arrow-schema = { version = "53.4.0" } -parquet = { version = "53.4.0", default-features = false, features = ["experimental"] } -datafusion = { version = "44.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -datafusion-common = { version = "44.0.0", default-features = false } -datafusion-functions = { version = "44.0.0", default-features = false, features = ["crypto_expressions"] } -datafusion-functions-nested = { version = "44.0.0", default-features = false } -datafusion-expr = { version = "44.0.0", default-features = false } -datafusion-expr-common = { version = "44.0.0", default-features = false } -datafusion-execution = { version = "44.0.0", default-features = false } -datafusion-physical-plan = { version = "44.0.0", default-features = false } -datafusion-physical-expr = { version = "44.0.0", default-features = false } +arrow = { version = "54.0.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow-array = { version = "54.0.0" } +arrow-buffer = { version = "54.0.0" } +arrow-data = { version = "54.0.0" } +arrow-schema = { version = "54.0.0" } +parquet = { version = "54.0.0", default-features = false, features = ["experimental"] } +datafusion = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +datafusion-common = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-functions = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false, features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-expr = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-expr-common = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-execution = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-physical-plan = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } +datafusion-physical-expr = { git = "https://github.com/findepi/datafusion", branch = "findepi/fix-float-and-decimal-coercion-b58e78", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" } datafusion-comet-proto = { path = "proto", version = "0.6.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index 0312f869e7..c8ca240773 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -32,25 +32,28 @@ use arrow::record_batch::RecordBatch; use arrow_array::{make_array, Array, ArrayRef, BooleanArray, RecordBatchOptions}; use arrow_data::transform::MutableArrayData; use arrow_schema::ArrowError; +use datafusion::physical_plan::common::can_project; +use datafusion::physical_plan::execution_plan::CardinalityEffect; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; -use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; +use datafusion_common::{internal_err, plan_err, project_schema, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, + analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, + PhysicalExpr, }; - use futures::stream::{Stream, StreamExt}; use log::trace; /// This is a copy of DataFusion's FilterExec with one modification to ensure that input /// batches are never passed through unchanged. The changes are between the comments /// `BEGIN Comet change` and `END Comet change`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc, @@ -62,6 +65,8 @@ pub struct FilterExec { default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, + /// The projection indices of the columns in the output schema of join + projection: Option>, } impl FilterExec { @@ -73,13 +78,15 @@ impl FilterExec { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { let default_selectivity = 20; - let cache = Self::compute_properties(&input, &predicate, default_selectivity)?; + let cache = + Self::compute_properties(&input, &predicate, default_selectivity, None)?; Ok(Self { predicate, input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, + projection: None, }) } other => { @@ -101,6 +108,35 @@ impl FilterExec { Ok(self) } + /// Return new instance of [FilterExec] with the given projection. + pub fn with_projection(&self, projection: Option>) -> Result { + // Check if the projection is valid + can_project(&self.schema(), projection.as_ref())?; + + let projection = match projection { + Some(projection) => match &self.projection { + Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), + None => Some(projection), + }, + None => None, + }; + + let cache = Self::compute_properties( + &self.input, + &self.predicate, + self.default_selectivity, + projection.as_ref(), + )?; + Ok(Self { + predicate: Arc::clone(&self.predicate), + input: Arc::clone(&self.input), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache, + projection, + }) + } + /// The expression to filter on. This expression must evaluate to a boolean value. pub fn predicate(&self) -> &Arc { &self.predicate @@ -116,6 +152,11 @@ impl FilterExec { self.default_selectivity } + /// Projection + pub fn projection(&self) -> Option<&Vec> { + self.projection.as_ref() + } + /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( input: &Arc, @@ -168,11 +209,21 @@ impl FilterExec { if binary.op() == &Operator::Eq { // Filter evaluates to single value for all partitions if input_eqs.is_expr_constant(binary.left()) { - res_constants - .push(ConstExpr::from(binary.right()).with_across_partitions(true)) + let (expr, across_parts) = ( + binary.right(), + input_eqs.get_expr_constant_value(binary.right()), + ); + res_constants.push( + ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts), + ); } else if input_eqs.is_expr_constant(binary.right()) { - res_constants - .push(ConstExpr::from(binary.left()).with_across_partitions(true)) + let (expr, across_parts) = ( + binary.left(), + input_eqs.get_expr_constant_value(binary.left()), + ); + res_constants.push( + ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts), + ); } } } @@ -184,6 +235,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, + projection: Option<&Vec>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -199,17 +251,32 @@ impl FilterExec { .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| { + let value = stats.column_statistics[column.index()] + .min_value + .get_value(); let expr = Arc::new(column) as _; - ConstExpr::new(expr).with_across_partitions(true) + ConstExpr::new(expr) + .with_across_partitions(AcrossPartitions::Uniform(value.cloned())) }); - // this is for statistics + // This is for statistics eq_properties = eq_properties.with_constants(constants); - // this is for logical constant (for example: a = '1', then a could be marked as a constant) + // This is for logical constant (for example: a = '1', then a could be marked as a constant) // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) eq_properties = eq_properties.with_constants(Self::extend_constants(input, predicate)); + + let mut output_partitioning = input.output_partitioning().clone(); + // If contains projection, update the PlanProperties. + if let Some(projection) = projection { + let schema = eq_properties.schema(); + let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; + let out_schema = project_schema(schema, Some(projection))?; + output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); + eq_properties = eq_properties.project(&projection_mapping, out_schema); + } + Ok(PlanProperties::new( eq_properties, - input.output_partitioning().clone(), // Output Partitioning + output_partitioning, input.pipeline_behavior(), input.boundedness(), )) @@ -220,7 +287,27 @@ impl DisplayAs for FilterExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FilterExec: {}", self.predicate) + let display_projections = if let Some(projection) = self.projection.as_ref() { + format!( + ", projection=[{}]", + projection + .iter() + .map(|index| format!( + "{}@{}", + self.input.schema().fields().get(*index).unwrap().name(), + index + )) + .collect::>() + .join(", ") + ) + } else { + "".to_string() + }; + write!( + f, + "CometFilterExec: {}{}", + self.predicate, display_projections + ) } } } @@ -245,7 +332,7 @@ impl ExecutionPlan for FilterExec { } fn maintains_input_order(&self) -> Vec { - // tell optimizer this operator doesn't reorder its input + // Tell optimizer this operator doesn't reorder its input vec![true] } @@ -258,6 +345,7 @@ impl ExecutionPlan for FilterExec { let selectivity = e.default_selectivity(); e.with_default_selectivity(selectivity) }) + .and_then(|e| e.with_projection(self.projection().cloned())) .map(|e| Arc::new(e) as _) } @@ -274,10 +362,11 @@ impl ExecutionPlan for FilterExec { ); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { - schema: self.input.schema(), + schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, + projection: self.projection.clone(), })) } @@ -288,7 +377,13 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + let stats = + Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?; + Ok(stats.project(self.projection.as_ref())) + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::LowerEqual } } @@ -332,28 +427,41 @@ fn collect_new_statistics( /// The FilterExec streams wraps the input iterator and applies the predicate expression to /// determine which rows to include in its output batches struct FilterExecStream { - /// Output schema, which is the same as the input schema for this operator + /// Output schema after the projection schema: SchemaRef, /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc, /// The input partition to filter. input: SendableRecordBatchStream, - /// runtime metrics recording + /// Runtime metrics recording baseline_metrics: BaselineMetrics, + /// The projection indices of the columns in the input schema + projection: Option>, } -pub(crate) fn batch_filter( +fn filter_and_project( batch: &RecordBatch, predicate: &Arc, + projection: Option<&Vec>, + output_schema: &SchemaRef, ) -> Result { predicate .evaluate(batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match as_boolean_array(&array) { - // apply filter array to record batch - Ok(filter_array) => comet_filter_record_batch(batch, filter_array)?, - Err(_) => { + Ok(match (as_boolean_array(&array), projection) { + // Apply filter array to record batch + (Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?, + (Ok(filter_array), Some(projection)) => { + let projected_columns = projection + .iter() + .map(|i| Arc::clone(batch.column(*i))) + .collect(); + let projected_batch = + RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?; + comet_filter_record_batch(&projected_batch, filter_array)? + } + (Err(_), _) => { return internal_err!("Cannot create filter_array from non-boolean predicates"); } }) @@ -397,9 +505,14 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = batch_filter(&batch, &self.predicate)?; + let filtered_batch = filter_and_project( + &batch, + &self.predicate, + self.projection.as_ref(), + &self.schema, + )?; timer.done(); - // skip entirely filtered batches + // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } @@ -416,7 +529,7 @@ impl Stream for FilterExecStream { } fn size_hint(&self) -> (usize, Option) { - // same number of record batches + // Same number of record batches self.input.size_hint() } } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 888cd2fdb5..81dfccefb6 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -304,11 +304,7 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef { .map(|(idx, c)| { let datatype = ScanExec::unpack_dictionary_type(c.data_type()); // We don't use the field name. Put a placeholder. - if matches!(datatype, DataType::Dictionary(_, _)) { - Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false) - } else { - Field::new(format!("col_{}", idx), datatype, true) - } + Field::new(format!("col_{}", idx), datatype, true) }) .collect::>() } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 95926bfee9..75b64bbab4 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -53,7 +53,6 @@ use datafusion::{ }, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, - physical_optimizer::join_selection::swap_hash_join, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, @@ -105,8 +104,9 @@ use datafusion_common::{ JoinType as DFJoinType, ScalarValue, }; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression; use datafusion_expr::{ - AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, + AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_nested::array_has::ArrayHas; @@ -584,15 +584,13 @@ impl PhysicalPlanner { let else_phy_expr = match &case_when.else_expr { None => None, - Some(_) => { - Some(self.create_expr(case_when.else_expr.as_ref().unwrap(), input_schema)?) - } + Some(_) => Some(self.create_expr( + case_when.else_expr.as_ref().unwrap(), + Arc::clone(&input_schema), + )?), }; - Ok(Arc::new(CaseExpr::try_new( - None, - when_then_pairs, - else_phy_expr, - )?)) + + create_case_expr(when_then_pairs, else_phy_expr, &input_schema) } ExprStruct::In(expr) => { let value = @@ -710,12 +708,11 @@ impl PhysicalPlanner { let null_literal_expr: Arc = Arc::new(Literal::new(ScalarValue::Null)); - let case_expr = CaseExpr::try_new( - None, + create_case_expr( vec![(is_null_expr, null_literal_expr)], Some(array_append_expr), - )?; - Ok(Arc::new(case_expr)) + &input_schema, + ) } ExprStruct::ArrayInsert(expr) => { let src_array_expr = self.create_expr( @@ -768,13 +765,11 @@ impl PhysicalPlanner { let null_literal_expr: Arc = Arc::new(Literal::new(ScalarValue::Null)); - let case_expr = CaseExpr::try_new( - None, + create_case_expr( vec![(is_null_expr, null_literal_expr)], Some(array_remove_expr), - )?; - - Ok(Arc::new(case_expr)) + &input_schema, + ) } ExprStruct::ArrayIntersect(expr) => { let left_expr = @@ -1494,7 +1489,7 @@ impl PhysicalPlanner { )) } else { let swapped_hash_join = - swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?; + hash_join.as_ref().swap_inputs(PartitionMode::Partitioned)?; let mut additional_native_plans = vec![]; if swapped_hash_join.as_any().is::() { @@ -1682,7 +1677,7 @@ impl PhysicalPlanner { Some(JoinFilter::new( rewritten_physical_expr, column_indices, - filter_schema, + filter_schema.into(), )) } else { None @@ -2297,16 +2292,31 @@ impl PhysicalPlanner { .coerce_types(&input_expr_types) .unwrap_or_else(|_| input_expr_types.clone()); - let data_type = match fun_name { - // workaround for https://github.com/apache/datafusion/issues/13716 - "datepart" => DataType::Int32, - _ => { - // TODO need to call `return_type_from_exprs` instead - #[allow(deprecated)] - func.inner().return_type(&coerced_types)? - } + // TODO this should try and find scalar + let arguments = args + .iter() + .map(|e| { + e.as_ref() + .as_any() + .downcast_ref::() + .map(|lit| lit.value()) + }) + .collect::>(); + + let nullables = arguments.iter().map(|_| true).collect::>(); + + let args = ReturnTypeArgs { + arg_types: &coerced_types, + scalar_arguments: &arguments, + nullables: &nullables, }; + let data_type = func + .inner() + .return_type_from_args(args)? + .return_type() + .clone(); + (data_type, coerced_types) } }; @@ -2541,6 +2551,57 @@ fn convert_spark_types_to_arrow_schema( arrow_schema } +/// Create CASE WHEN expression and add casting as needed +fn create_case_expr( + when_then_pairs: Vec<(Arc, Arc)>, + else_expr: Option>, + input_schema: &Schema, +) -> Result, ExecutionError> { + let then_types: Vec = when_then_pairs + .iter() + .map(|x| x.1.data_type(input_schema)) + .collect::, _>>()?; + + let else_type: Option = else_expr + .as_ref() + .map(|x| Arc::clone(x).data_type(input_schema)) + .transpose()? + .or(Some(DataType::Null)); + + if let Some(coerce_type) = get_coerce_type_for_case_expression(&then_types, else_type.as_ref()) + { + let cast_options = SparkCastOptions::new_without_timezone(EvalMode::Legacy, false); + + let when_then_pairs = when_then_pairs + .iter() + .map(|x| { + let t: Arc = Arc::new(Cast::new( + Arc::clone(&x.1), + coerce_type.clone(), + cast_options.clone(), + )); + (Arc::clone(&x.0), t) + }) + .collect::, Arc)>>(); + + let else_phy_expr: Option> = else_expr.clone().map(|x| { + Arc::new(Cast::new(x, coerce_type.clone(), cast_options.clone())) + as Arc + }); + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_phy_expr, + )?)) + } else { + Ok(Arc::new(CaseExpr::try_new( + None, + when_then_pairs, + else_expr.clone(), + )?)) + } +} + #[cfg(test)] mod tests { use std::{sync::Arc, task::Poll}; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 54a9bb31fb..f9ecf47900 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3435,24 +3435,10 @@ fn builder_to_array( } fn make_batch(arrays: Vec, row_count: usize) -> Result { - let mut dict_id = 0; let fields = arrays .iter() .enumerate() - .map(|(i, array)| match array.data_type() { - DataType::Dictionary(_, _) => { - let field = Field::new_dict( - format!("c{}", i), - array.data_type().clone(), - true, - dict_id, - false, - ); - dict_id += 1; - field - } - _ => Field::new(format!("c{}", i), array.data_type().clone(), true), - }) + .map(|(i, array)| Field::new(format!("c{}", i), array.data_type().clone(), true)) .collect::>(); let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 0f59761325..22b09f6106 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -988,6 +988,9 @@ fn is_datafusion_spark_compatible( return true; } match from_type { + DataType::Null => { + matches!(to_type, DataType::List(_)) + } DataType::Boolean => matches!( to_type, DataType::Int8