Skip to content

Commit 85b655a

Browse files
Merge branch 'main' into aa/string_builder_char
2 parents b7e6d28 + 6844e56 commit 85b655a

File tree

276 files changed

+6105
-2409
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

276 files changed

+6105
-2409
lines changed

Cargo.lock

Lines changed: 21 additions & 31 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ rstest = "0.25.0"
168168
serde_json = "1"
169169
sqlparser = { version = "0.55.0", features = ["visitor"] }
170170
tempfile = "3"
171-
tokio = { version = "1.44", features = ["macros", "rt", "sync"] }
171+
tokio = { version = "1.45", features = ["macros", "rt", "sync"] }
172172
url = "2.5.4"
173173

174174
[profile.release]

benchmarks/src/tpch/run.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,29 +109,32 @@ impl RunOpt {
109109
};
110110

111111
let mut benchmark_run = BenchmarkRun::new();
112-
for query_id in query_range {
113-
benchmark_run.start_new_case(&format!("Query {query_id}"));
114-
let query_run = self.benchmark_query(query_id).await?;
115-
for iter in query_run {
116-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
117-
}
118-
}
119-
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
120-
Ok(())
121-
}
122-
123-
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
124112
let mut config = self
125113
.common
126114
.config()?
127115
.with_collect_statistics(!self.disable_statistics);
128116
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
129117
let rt_builder = self.common.runtime_env_builder()?;
130118
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
131-
132119
// register tables
133120
self.register_tables(&ctx).await?;
134121

122+
for query_id in query_range {
123+
benchmark_run.start_new_case(&format!("Query {query_id}"));
124+
let query_run = self.benchmark_query(query_id, &ctx).await?;
125+
for iter in query_run {
126+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
127+
}
128+
}
129+
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
130+
Ok(())
131+
}
132+
133+
async fn benchmark_query(
134+
&self,
135+
query_id: usize,
136+
ctx: &SessionContext,
137+
) -> Result<Vec<QueryResult>> {
135138
let mut millis = vec![];
136139
// run benchmark
137140
let mut query_results = vec![];
@@ -146,14 +149,14 @@ impl RunOpt {
146149
if query_id == 15 {
147150
for (n, query) in sql.iter().enumerate() {
148151
if n == 1 {
149-
result = self.execute_query(&ctx, query).await?;
152+
result = self.execute_query(ctx, query).await?;
150153
} else {
151-
self.execute_query(&ctx, query).await?;
154+
self.execute_query(ctx, query).await?;
152155
}
153156
}
154157
} else {
155158
for query in sql {
156-
result = self.execute_query(&ctx, query).await?;
159+
result = self.execute_query(ctx, query).await?;
157160
}
158161
}
159162

benchmarks/src/util/options.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{num::NonZeroUsize, sync::Arc};
1919

2020
use datafusion::{
2121
execution::{
22-
disk_manager::DiskManagerConfig,
22+
disk_manager::DiskManagerBuilder,
2323
memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool},
2424
runtime_env::RuntimeEnvBuilder,
2525
},
@@ -110,7 +110,7 @@ impl CommonOpt {
110110
};
111111
rt_builder = rt_builder
112112
.with_memory_pool(pool)
113-
.with_disk_manager(DiskManagerConfig::NewOs);
113+
.with_disk_manager_builder(DiskManagerBuilder::default());
114114
}
115115
Ok(rt_builder)
116116
}

datafusion-cli/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ arrow = { workspace = true }
3939
async-trait = { workspace = true }
4040
aws-config = "1.6.2"
4141
aws-credential-types = "1.2.0"
42-
clap = { version = "4.5.37", features = ["derive", "cargo"] }
42+
clap = { version = "4.5.39", features = ["derive", "cargo"] }
4343
datafusion = { workspace = true, features = [
4444
"avro",
4545
"crypto_expressions",
@@ -60,7 +60,7 @@ object_store = { workspace = true, features = ["aws", "gcp", "http"] }
6060
parking_lot = { workspace = true }
6161
parquet = { workspace = true, default-features = false }
6262
regex = { workspace = true }
63-
rustyline = "15.0"
63+
rustyline = "16.0"
6464
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] }
6565
url = { workspace = true }
6666

datafusion-cli/src/main.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::execution::memory_pool::{
2828
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
2929
};
3030
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
31-
use datafusion::execution::DiskManager;
3231
use datafusion::prelude::SessionContext;
3332
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3433
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -43,7 +42,7 @@ use datafusion_cli::{
4342
use clap::Parser;
4443
use datafusion::common::config_err;
4544
use datafusion::config::ConfigOptions;
46-
use datafusion::execution::disk_manager::DiskManagerConfig;
45+
use datafusion::execution::disk_manager::{DiskManagerBuilder, DiskManagerMode};
4746
use mimalloc::MiMalloc;
4847

4948
#[global_allocator]
@@ -200,15 +199,10 @@ async fn main_inner() -> Result<()> {
200199

201200
// set disk limit
202201
if let Some(disk_limit) = args.disk_limit {
203-
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
204-
205-
DiskManager::set_arc_max_temp_directory_size(
206-
&mut disk_manager,
207-
disk_limit.try_into().unwrap(),
208-
)?;
209-
210-
let disk_config = DiskManagerConfig::new_existing(disk_manager);
211-
rt_builder = rt_builder.with_disk_manager(disk_config);
202+
let builder = DiskManagerBuilder::default()
203+
.with_mode(DiskManagerMode::OsTmpDirectory)
204+
.with_max_temp_directory_size(disk_limit.try_into().unwrap());
205+
rt_builder = rt_builder.with_disk_manager_builder(builder);
212206
}
213207

214208
let runtime_env = rt_builder.build_arc()?;

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ tonic = "0.12.1"
7777
tracing = { version = "0.1" }
7878
tracing-subscriber = { version = "0.3" }
7979
url = { workspace = true }
80-
uuid = "1.16"
80+
uuid = "1.17"
8181

8282
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
8383
nix = { version = "0.30.1", features = ["fs"] }

datafusion-examples/examples/advanced_udaf.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow::array::{
2525
};
2626
use arrow::datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type};
2727
use arrow::record_batch::RecordBatch;
28+
use arrow_schema::FieldRef;
2829
use datafusion::common::{cast::as_float64_array, ScalarValue};
2930
use datafusion::error::Result;
3031
use datafusion::logical_expr::{
@@ -92,10 +93,10 @@ impl AggregateUDFImpl for GeoMeanUdaf {
9293
}
9394

9495
/// This is the description of the state. accumulator's state() must match the types here.
95-
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
96+
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
9697
Ok(vec![
97-
Field::new("prod", args.return_type().clone(), true),
98-
Field::new("n", DataType::UInt32, true),
98+
Field::new("prod", args.return_type().clone(), true).into(),
99+
Field::new("n", DataType::UInt32, true).into(),
99100
])
100101
}
101102

@@ -401,7 +402,7 @@ impl AggregateUDFImpl for SimplifiedGeoMeanUdaf {
401402
unimplemented!("should not be invoked")
402403
}
403404

404-
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
405+
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
405406
unimplemented!("should not be invoked")
406407
}
407408

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::{
2323
array::{ArrayRef, AsArray, Float64Array},
2424
datatypes::Float64Type,
2525
};
26+
use arrow_schema::FieldRef;
2627
use datafusion::common::ScalarValue;
2728
use datafusion::error::Result;
2829
use datafusion::functions_aggregate::average::avg_udaf;
@@ -87,8 +88,8 @@ impl WindowUDFImpl for SmoothItUdf {
8788
Ok(Box::new(MyPartitionEvaluator::new()))
8889
}
8990

90-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
91-
Ok(Field::new(field_args.name(), DataType::Float64, true))
91+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
92+
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
9293
}
9394
}
9495

@@ -205,8 +206,8 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
205206
Some(Box::new(simplify))
206207
}
207208

208-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
209-
Ok(Field::new(field_args.name(), DataType::Float64, true))
209+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
210+
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
210211
}
211212
}
212213

0 commit comments

Comments
 (0)