Skip to content

Commit

Permalink
refactor: Bump arrow to 52 (#15943)
Browse files Browse the repository at this point in the history
* refactor: Bump arrow to 52

Signed-off-by: Xuanwo <github@xuanwo.io>

* Remove not needed deps

Signed-off-by: Xuanwo <github@xuanwo.io>

* Format cargo

Signed-off-by: Xuanwo <github@xuanwo.io>

* Bump arrow

Signed-off-by: Xuanwo <github@xuanwo.io>

* Try fix delta

Signed-off-by: Xuanwo <github@xuanwo.io>

* trigger ci

Signed-off-by: Xuanwo <github@xuanwo.io>

* try fix delta

Signed-off-by: Xuanwo <github@xuanwo.io>

* Format taplo

Signed-off-by: Xuanwo <github@xuanwo.io>

* Use tagged opendal

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix object_store

Signed-off-by: Xuanwo <github@xuanwo.io>

* Allow slower CI runner

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
Co-authored-by: Bohu <overred.shuttler@gmail.com>
  • Loading branch information
Xuanwo and BohuTANG authored Jul 2, 2024
1 parent d62710d commit 28dc42e
Show file tree
Hide file tree
Showing 33 changed files with 382 additions and 707 deletions.
398 changes: 277 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

35 changes: 18 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,17 @@ databend-storages-common-txn = { path = "src/query/storages/common/txn" }
# Crates.io dependencies
anyerror = { version = "=0.1.10" }
anyhow = { version = "1.0.65" }
arrow = { version = "51" }
arrow-array = { version = "51" }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51", features = ["prettyprint"] }
arrow-data = { version = "51" }
arrow-flight = { version = "51", features = ["flight-sql-experimental", "tls"] }
arrow = { version = "52" }
arrow-array = { version = "52" }
arrow-buffer = { version = "52" }
arrow-cast = { version = "52", features = ["prettyprint"] }
arrow-data = { version = "52" }
arrow-flight = { version = "52", features = ["flight-sql-experimental", "tls"] }
arrow-format = { version = "0.8.1", features = ["flight-data", "flight-service", "ipc"] }
arrow-ipc = { version = "51" }
arrow-ord = { version = "51" }
arrow-schema = { version = "51", features = ["serde"] }
arrow-select = { version = "51" }
arrow-ipc = { version = "52" }
arrow-ord = { version = "52" }
arrow-schema = { version = "52", features = ["serde"] }
arrow-select = { version = "52" }
arrow-udf-js = "0.3.1"
arrow-udf-python = "0.2.1"
arrow-udf-wasm = "0.2.2"
Expand All @@ -233,13 +233,13 @@ clap = { version = "4.4.2", features = ["derive"] }
criterion = "0.5"
dashmap = "5.4.0"
deepsize = { version = "0.2.0" }
deltalake = "0.17"
deltalake = "0.18"
derive-visitor = { version = "0.4.0", features = ["std-types-drive"] }
derive_more = "0.99.17"
enumflags2 = { version = "0.7.7", features = ["serde"] }
ethnum = { version = "1.5.0" }
feature-set = { version = "0.1.1" }
flatbuffers = "23" # Must use the same version with arrow-ipc
flatbuffers = "24" # Must use the same version with arrow-ipc
futures = "0.3.24"
futures-async-stream = { version = "0.2.7" }
futures-util = "0.3.24"
Expand All @@ -254,7 +254,7 @@ match-template = "0.0.1"
mysql_async = { version = "0.34", default-features = false, features = ["rustls-tls"] }
once_cell = "1.15.0"
openai_api_rust = "0.1"
opendal = { version = "0.47.0", features = [
opendal = { version = "0.47.1", features = [
"layers-minitrace",
"layers-prometheus-client",
"layers-async-backtrace",
Expand Down Expand Up @@ -283,8 +283,8 @@ orc-rust = "0.3"
ordered-float = { version = "4.1.0", default-features = false }
ordq = "0.2.0"
parking_lot = "0.12.1"
parquet = { version = "51", features = ["async"] }
parquet_rs = { package = "parquet", version = "51" }
parquet = { version = "52", features = ["async"] }
paste = "1.0.15"
poem = { version = "3.0", features = ["rustls", "multipart", "compression"] }
prometheus-client = "0.22"
prost = { version = "0.12.1" }
Expand Down Expand Up @@ -393,11 +393,12 @@ arrow-udf-wasm = { git = "https://github.com/datafuse-extras/arrow-udf", rev = "
async-backtrace = { git = "https://github.com/zhang2014/async-backtrace.git", rev = "dea4553" }
async-recursion = { git = "https://github.com/zhang2014/async-recursion.git", rev = "a353334" }
backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "6145fe6b" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "81593e9" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
ethnum = { git = "https://github.com/ariesdevil/ethnum-rs", rev = "4cb05f1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "be8b2c2" }
object_store_opendal = { package = "object_store_opendal", git = "https://github.com/Xuanwo/opendal", rev = "d6baf68" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
orc-rust = { git = "https://github.com/youngsofun/datafusion-orc", branch = "pub" }
orc-rust = { git = "https://github.com/youngsofun/datafusion-orc", rev = "51733de" }
recursive = { git = "https://github.com/zhang2014/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }
2 changes: 1 addition & 1 deletion src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ lz4 = { version = "1.24" }
num = { version = "0.4", default-features = false, features = ["std"] }
num-traits = "0.2"
opendal = { workspace = true }
ordered-float = "3.7.0"
ordered-float = { workspace = true }
parquet2 = { package = "databend-common-parquet2", path = "../parquet2", default_features = false, features = [
"serde_types",
"async",
Expand Down
2 changes: 1 addition & 1 deletion src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ geozero = { workspace = true }
http = { workspace = true }
opendal = { workspace = true }
parquet = { workspace = true }
paste = "1.0.9"
paste = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/meta/api/src/txn_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ mod tests {
}

let elapsed = now.elapsed().as_secs_f64();
println!("elapsed: {elapsed}");
assert!(
(0.041..0.070).contains(&elapsed),
(0.041..0.080).contains(&elapsed),
"{} is expected to be 2 + 5 + 10 + 14 + 20 milliseconds",
elapsed
);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ maplit = "1.0.2"
num-derive = "0.3.3"
num-traits = "0.2.15"
opendal = { workspace = true }
paste = "1.0.9"
paste = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha1 = "0.10.5"
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ databend-storages-common-txn = { workspace = true }
dyn-clone = "1.0.9"
log = { workspace = true }
parking_lot = { workspace = true }
parquet_rs = { workspace = true }
parquet = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
26 changes: 13 additions & 13 deletions src/query/catalog/src/plan/datasource/datasource_info/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use databend_common_meta_app::schema::TableInfo;
use databend_common_storage::StageFileInfo;
use databend_common_storage::StageFilesInfo;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use parquet_rs::file::metadata::ParquetMetaData;
use parquet_rs::format::SchemaElement;
use parquet_rs::schema::types;
use parquet_rs::schema::types::SchemaDescPtr;
use parquet_rs::schema::types::SchemaDescriptor;
use parquet_rs::thrift::TSerializable;
use parquet::file::metadata::ParquetMetaData;
use parquet::format::SchemaElement;
use parquet::schema::types;
use parquet::schema::types::SchemaDescPtr;
use parquet::schema::types::SchemaDescriptor;
use parquet::thrift::TSerializable;
use serde::Deserialize;
use thrift::protocol::TCompactInputProtocol;
use thrift::protocol::TCompactOutputProtocol;
Expand Down Expand Up @@ -139,13 +139,13 @@ mod tests {
use arrow_schema::Schema as ArrowSchema;
use databend_common_base::base::tokio::sync::Mutex;
use databend_common_storage::StageFilesInfo;
use parquet_rs::basic::ConvertedType;
use parquet_rs::basic::Repetition;
use parquet_rs::basic::Type as PhysicalType;
use parquet_rs::errors::ParquetError;
use parquet_rs::schema::types::SchemaDescPtr;
use parquet_rs::schema::types::SchemaDescriptor;
use parquet_rs::schema::types::Type;
use parquet::basic::ConvertedType;
use parquet::basic::Repetition;
use parquet::basic::Type as PhysicalType;
use parquet::errors::ParquetError;
use parquet::schema::types::SchemaDescPtr;
use parquet::schema::types::SchemaDescriptor;
use parquet::schema::types::Type;

use super::ParquetTableInfo;

Expand Down
4 changes: 2 additions & 2 deletions src/query/catalog/src/plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use databend_common_storage::parquet_rs::build_parquet_schema_tree;
use databend_common_storage::parquet_rs::traverse_parquet_schema_tree;
use databend_common_storage::ColumnNode;
use databend_common_storage::ColumnNodes;
use parquet_rs::arrow::ProjectionMask;
use parquet_rs::schema::types::SchemaDescriptor;
use parquet::arrow::ProjectionMask;
use parquet::schema::types::SchemaDescriptor;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub enum Projection {
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/tests/it/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use databend_common_expression::types::NumberDataType;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use parquet_rs::arrow::arrow_to_parquet_schema;
use parquet::arrow::arrow_to_parquet_schema;

#[test]
fn test_to_projection_mask() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1.0.9"
paste = { workspace = true }
petgraph = { version = "0.6.2", features = ["serde-1"] }
pin-project-lite = "0.2.9"
poem = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use arrow_flight::sql::CommandPreparedStatementUpdate;
use arrow_flight::sql::CommandStatementQuery;
use arrow_flight::sql::CommandStatementSubstraitPlan;
use arrow_flight::sql::CommandStatementUpdate;
use arrow_flight::sql::DoPutPreparedStatementResult;
use arrow_flight::sql::DoPutUpdateResult;
use arrow_flight::sql::ProstMessageExt;
use arrow_flight::sql::SqlInfo;
Expand Down Expand Up @@ -525,7 +526,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
query: CommandPreparedStatementQuery,
request: Request<PeekableFlightDataStream>,
) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
let session = self.get_session(&request)?;
let handle = Uuid::from_slice(query.prepared_statement_handle.as_ref())
.map_err(|e| Status::internal(format!("Error decoding handle: {e}")))?;
Expand All @@ -541,8 +542,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
let result = PutResult {
app_metadata: result.as_any().encode_to_vec().into(),
};
let result = futures::stream::iter(vec![Ok(result)]);
return Ok(Response::new(Box::pin(result)));

Ok(DoPutPreparedStatementResult {
prepared_statement_handle: Some(result.encode_to_vec().into()),
})
}

// called by JDBC
Expand Down
2 changes: 1 addition & 1 deletion src/query/sharing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub async fn parse_error(er: Response<Buffer>) -> Error {
_ => (ErrorKind::Unexpected, false),
};

let mut err = Error::new(kind, &String::from_utf8_lossy(&message));
let mut err = Error::new(kind, String::from_utf8_lossy(&message));

if retryable {
err = err.set_temporary();
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/common/blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ test = true
databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
parquet_rs = { workspace = true }
parquet = { workspace = true }

[build-dependencies]

Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use databend_common_expression::converts::arrow::table_schema_to_arrow_schema;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::table::TableCompression;
use parquet_rs::arrow::ArrowWriter;
use parquet_rs::basic::Encoding;
use parquet_rs::file::properties::EnabledStatistics;
use parquet_rs::file::properties::WriterProperties;
use parquet_rs::format::FileMetaData;
use parquet::arrow::ArrowWriter;
use parquet::basic::Encoding;
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;

/// Serialize data blocks to parquet format.
pub fn blocks_to_parquet(
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/common/table_meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
databend-common-io = { workspace = true }
enum-as-inner = "0.5"
parquet_rs = { workspace = true }
parquet = { workspace = true }
rmp-serde = "1.1.1"
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

use databend_common_arrow::native;
use databend_common_arrow::parquet;
use databend_common_arrow::parquet as databend_parquet;
use databend_common_exception::ErrorCode;
use parquet_rs::basic::Compression as ParquetCompression;
use parquet_rs::basic::GzipLevel;
use parquet_rs::basic::ZstdLevel;
use parquet::basic::Compression as ParquetCompression;
use parquet::basic::GzipLevel;
use parquet::basic::ZstdLevel;

use crate::meta;

Expand Down Expand Up @@ -50,13 +50,15 @@ impl TryFrom<&str> for TableCompression {
}

/// Convert to parquet CompressionOptions.
impl From<TableCompression> for parquet::compression::CompressionOptions {
impl From<TableCompression> for databend_parquet::compression::CompressionOptions {
fn from(value: TableCompression) -> Self {
match value {
TableCompression::None => parquet::compression::CompressionOptions::Uncompressed,
TableCompression::LZ4 => parquet::compression::CompressionOptions::Lz4Raw,
TableCompression::Snappy => parquet::compression::CompressionOptions::Snappy,
TableCompression::Zstd => parquet::compression::CompressionOptions::Zstd(None),
TableCompression::None => {
databend_parquet::compression::CompressionOptions::Uncompressed
}
TableCompression::LZ4 => databend_parquet::compression::CompressionOptions::Lz4Raw,
TableCompression::Snappy => databend_parquet::compression::CompressionOptions::Snappy,
TableCompression::Zstd => databend_parquet::compression::CompressionOptions::Zstd(None),
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/query/storages/delta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ publish = false
arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
databend-common-base = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-exception = { workspace = true }
Expand All @@ -21,12 +20,9 @@ databend-common-storage = { workspace = true }
databend-common-storages-parquet = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
deltalake = { workspace = true }
flagset = "0.4"
futures = "0.3"
match-template = "0.0.1"
minitrace = { workspace = true }
object_store = "0.9"
opendal = { workspace = true }
object_store_opendal = "0.44"
ordered-float = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
Expand Down
Loading

0 comments on commit 28dc42e

Please sign in to comment.