Skip to content

Commit 5bf80d6

Browse files
authored
Minor: Add repartition_file.slt end to end test for repartitioning files, and supporting tweaks (#8505)
* Minor: Add repartition_file.slt end to end test for repartitioning files, and supporting tweaks * Sort files, update tests
1 parent 9a322c8 commit 5bf80d6

File tree

5 files changed

+286
-1
lines changed

5 files changed

+286
-1
lines changed

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100;
141141

142142
/// Partition the list of files into `n` groups
143143
pub fn split_files(
144-
partitioned_files: Vec<PartitionedFile>,
144+
mut partitioned_files: Vec<PartitionedFile>,
145145
n: usize,
146146
) -> Vec<Vec<PartitionedFile>> {
147147
if partitioned_files.is_empty() {
148148
return vec![];
149149
}
150+
151+
// ObjectStore::list does not guarantee any consistent order and for some
152+
// implementations such as LocalFileSystem, it may be inconsistent. Thus
153+
// Sort files by path to ensure consistent plans when run more than once.
154+
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
155+
150156
// effectively this is div with rounding up instead of truncating
151157
let chunk_size = (partitioned_files.len() + n - 1) / n;
152158
partitioned_files

datafusion/core/src/datasource/listing/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ impl PartitionedFile {
109109
let size = std::fs::metadata(path.clone())?.len();
110110
Ok(Self::new(path, size))
111111
}
112+
113+
/// Return the path of this partitioned file
114+
pub fn path(&self) -> &Path {
115+
&self.object_meta.location
116+
}
112117
}
113118

114119
impl From<ObjectMeta> for PartitionedFile {

datafusion/core/src/datasource/listing/url.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ impl ListingTableUrl {
131131
if is_directory {
132132
fs::create_dir_all(path)?;
133133
} else {
134+
// ensure parent directory exists
135+
if let Some(parent) = path.parent() {
136+
fs::create_dir_all(parent)?;
137+
}
134138
fs::File::create(path)?;
135139
}
136140
}

datafusion/sqllogictest/bin/sqllogictests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
159159
relative_path,
160160
} = test_file;
161161
info!("Running with Postgres runner: {}", path.display());
162+
setup_scratch_dir(&relative_path)?;
162163
let mut runner =
163164
sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
164165
runner.with_column_validator(strict_column_validator);
@@ -188,6 +189,7 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> {
188189
info!("Skipping: {}", path.display());
189190
return Ok(());
190191
};
192+
setup_scratch_dir(&relative_path)?;
191193
let mut runner = sqllogictest::Runner::new(|| async {
192194
Ok(DataFusion::new(
193195
test_ctx.session_ctx().clone(),
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
##########
19+
# Tests for automatically reading files in parallel during scan
20+
##########
21+
22+
# Set 4 partitions for deterministic output plans
23+
statement ok
24+
set datafusion.execution.target_partitions = 4;
25+
26+
# automatically partition all files over 1 byte
27+
statement ok
28+
set datafusion.optimizer.repartition_file_min_size = 1;
29+
30+
###################
31+
### Parquet tests
32+
###################
33+
34+
# create a single parquet file
35+
# Note filename 2.parquet to test sorting (on local file systems it is often listed before 1.parquet)
36+
statement ok
37+
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/parquet_table/2.parquet'
38+
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
39+
40+
statement ok
41+
CREATE EXTERNAL TABLE parquet_table(column1 int)
42+
STORED AS PARQUET
43+
LOCATION 'test_files/scratch/repartition_scan/parquet_table/';
44+
45+
query I
46+
select * from parquet_table;
47+
----
48+
1
49+
2
50+
3
51+
4
52+
5
53+
54+
## Expect to see the scan read the file as "4" groups with even sizes (offsets)
55+
query TT
56+
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
57+
----
58+
logical_plan
59+
Filter: parquet_table.column1 != Int32(42)
60+
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
61+
physical_plan
62+
CoalesceBatchesExec: target_batch_size=8192
63+
--FilterExec: column1@0 != 42
64+
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
65+
66+
# create a second parquet file
67+
statement ok
68+
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet'
69+
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
70+
71+
## Still expect to see the scan read the file as "4" groups with even sizes. One group should read
72+
## parts of both files.
73+
query TT
74+
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1;
75+
----
76+
logical_plan
77+
Sort: parquet_table.column1 ASC NULLS LAST
78+
--Filter: parquet_table.column1 != Int32(42)
79+
----TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
80+
physical_plan
81+
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
82+
--SortExec: expr=[column1@0 ASC NULLS LAST]
83+
----CoalesceBatchesExec: target_batch_size=8192
84+
------FilterExec: column1@0 != 42
85+
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
86+
87+
88+
## Read the files as though they are ordered
89+
90+
statement ok
91+
CREATE EXTERNAL TABLE parquet_table_with_order(column1 int)
92+
STORED AS PARQUET
93+
LOCATION 'test_files/scratch/repartition_scan/parquet_table'
94+
WITH ORDER (column1 ASC);
95+
96+
# output should be ordered
97+
query I
98+
SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1;
99+
----
100+
1
101+
2
102+
3
103+
4
104+
5
105+
100
106+
200
107+
108+
# explain should not have any groups with more than one file
109+
# https://github.com/apache/arrow-datafusion/issues/8451
110+
query TT
111+
EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1;
112+
----
113+
logical_plan
114+
Sort: parquet_table_with_order.column1 ASC NULLS LAST
115+
--Filter: parquet_table_with_order.column1 != Int32(42)
116+
----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)]
117+
physical_plan
118+
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
119+
--CoalesceBatchesExec: target_batch_size=8192
120+
----FilterExec: column1@0 != 42
121+
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
122+
123+
# Cleanup
124+
statement ok
125+
DROP TABLE parquet_table;
126+
127+
statement ok
128+
DROP TABLE parquet_table_with_order;
129+
130+
131+
###################
132+
### CSV tests
133+
###################
134+
135+
# Since parquet and CSV share most of the same implementation, this test checks
136+
# that the basics are connected properly
137+
138+
# create a single csv file
139+
statement ok
140+
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/csv_table/1.csv'
141+
(FORMAT csv, SINGLE_FILE_OUTPUT true, HEADER true);
142+
143+
statement ok
144+
CREATE EXTERNAL TABLE csv_table(column1 int)
145+
STORED AS csv
146+
WITH HEADER ROW
147+
LOCATION 'test_files/scratch/repartition_scan/csv_table/';
148+
149+
query I
150+
select * from csv_table;
151+
----
152+
1
153+
2
154+
3
155+
4
156+
5
157+
158+
## Expect to see the scan read the file as "4" groups with even sizes (offsets)
159+
query TT
160+
EXPLAIN SELECT column1 FROM csv_table WHERE column1 <> 42;
161+
----
162+
logical_plan
163+
Filter: csv_table.column1 != Int32(42)
164+
--TableScan: csv_table projection=[column1], partial_filters=[csv_table.column1 != Int32(42)]
165+
physical_plan
166+
CoalesceBatchesExec: target_batch_size=8192
167+
--FilterExec: column1@0 != 42
168+
----CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:0..5], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:5..10], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:10..15], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:15..18]]}, projection=[column1], has_header=true
169+
170+
# Cleanup
171+
statement ok
172+
DROP TABLE csv_table;
173+
174+
175+
###################
176+
### JSON tests
177+
###################
178+
179+
# Since parquet and json share most of the same implementation, this test checks
180+
# that the basics are connected properly
181+
182+
# create a single json file
183+
statement ok
184+
COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/json_table/1.json'
185+
(FORMAT json, SINGLE_FILE_OUTPUT true);
186+
187+
statement ok
188+
CREATE EXTERNAL TABLE json_table(column1 int)
189+
STORED AS json
190+
LOCATION 'test_files/scratch/repartition_scan/json_table/';
191+
192+
query I
193+
select * from json_table;
194+
----
195+
1
196+
2
197+
3
198+
4
199+
5
200+
201+
## In the future it would be cool to see the file read as "4" groups with even sizes (offsets)
202+
## but for now it is just one group
203+
## https://github.com/apache/arrow-datafusion/issues/8502
204+
query TT
205+
EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42;
206+
----
207+
logical_plan
208+
Filter: json_table.column1 != Int32(42)
209+
--TableScan: json_table projection=[column1], partial_filters=[json_table.column1 != Int32(42)]
210+
physical_plan
211+
CoalesceBatchesExec: target_batch_size=8192
212+
--FilterExec: column1@0 != 42
213+
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
214+
------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]}, projection=[column1]
215+
216+
217+
# Cleanup
218+
statement ok
219+
DROP TABLE json_table;
220+
221+
222+
###################
223+
### Arrow File tests
224+
###################
225+
226+
## Use pre-existing files we don't have a way to create arrow files yet
227+
## (https://github.com/apache/arrow-datafusion/issues/8504)
228+
statement ok
229+
CREATE EXTERNAL TABLE arrow_table
230+
STORED AS ARROW
231+
LOCATION '../core/tests/data/example.arrow';
232+
233+
234+
# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
235+
# https://github.com/apache/arrow-datafusion/issues/8503
236+
query TT
237+
EXPLAIN SELECT * FROM arrow_table
238+
----
239+
logical_plan TableScan: arrow_table projection=[f0, f1, f2]
240+
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2]
241+
242+
# Cleanup
243+
statement ok
244+
DROP TABLE arrow_table;
245+
246+
###################
247+
### Avro File tests
248+
###################
249+
250+
## Use pre-existing files we don't have a way to create avro files yet
251+
252+
statement ok
253+
CREATE EXTERNAL TABLE avro_table
254+
STORED AS AVRO
255+
WITH HEADER ROW
256+
LOCATION '../../testing/data/avro/simple_enum.avro'
257+
258+
259+
# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
260+
query TT
261+
EXPLAIN SELECT * FROM avro_table
262+
----
263+
logical_plan TableScan: avro_table projection=[f1, f2, f3]
264+
physical_plan AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]
265+
266+
# Cleanup
267+
statement ok
268+
DROP TABLE avro_table;

0 commit comments

Comments
 (0)