Skip to content

Commit af5d27f

Browse files
committed
fullfill partition spec id
1 parent d82c2f3 commit af5d27f

15 files changed

+41
-12
lines changed

crates/iceberg/src/arrow/record_batch_partition_spliter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2727

2828
use super::record_batch_projector::RecordBatchProjector;
2929
use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type};
30-
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
30+
use crate::spec::{Literal, PartitionSpec, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
3131
use crate::transform::{create_transform_function, BoxedTransformFunction};
3232
use crate::{Error, ErrorKind, Result};
3333

@@ -186,6 +186,10 @@ impl RecordBatchPartitionSpliter {
186186
})
187187
}
188188

189+
pub(crate) fn partition_spec(&self) -> &PartitionSpec {
190+
self.partition_spec.as_ref()
191+
}
192+
189193
/// Split the record batch into multiple record batches by the partition spec.
190194
pub(crate) fn split(&self, batch: &RecordBatch) -> Result<Vec<(OwnedRow, RecordBatch)>> {
191195
// get array using partition spec

crates/iceberg/src/io/object_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ mod tests {
277277
.status(ManifestStatus::Added)
278278
.data_file(
279279
DataFileBuilder::default()
280+
.partition_spec_id(0)
280281
.content(DataContentType::Data)
281282
.file_path(format!("{}/1.parquet", &self.table_location))
282283
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/scan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,7 @@ pub mod tests {
13081308
.status(ManifestStatus::Added)
13091309
.data_file(
13101310
DataFileBuilder::default()
1311+
.partition_spec_id(0)
13111312
.content(DataContentType::Data)
13121313
.file_path(format!("{}/1.parquet", &self.table_location))
13131314
.file_format(DataFileFormat::Parquet)
@@ -1330,6 +1331,7 @@ pub mod tests {
13301331
.file_sequence_number(parent_snapshot.sequence_number())
13311332
.data_file(
13321333
DataFileBuilder::default()
1334+
.partition_spec_id(0)
13331335
.content(DataContentType::Data)
13341336
.file_path(format!("{}/2.parquet", &self.table_location))
13351337
.file_format(DataFileFormat::Parquet)
@@ -1351,6 +1353,7 @@ pub mod tests {
13511353
.file_sequence_number(parent_snapshot.sequence_number())
13521354
.data_file(
13531355
DataFileBuilder::default()
1356+
.partition_spec_id(0)
13541357
.content(DataContentType::Data)
13551358
.file_path(format!("{}/3.parquet", &self.table_location))
13561359
.file_format(DataFileFormat::Parquet)
@@ -1521,6 +1524,7 @@ pub mod tests {
15211524
.status(ManifestStatus::Added)
15221525
.data_file(
15231526
DataFileBuilder::default()
1527+
.partition_spec_id(0)
15241528
.content(DataContentType::Data)
15251529
.file_path(format!("{}/1.parquet", &self.table_location))
15261530
.file_format(DataFileFormat::Parquet)
@@ -1544,6 +1548,7 @@ pub mod tests {
15441548
.file_sequence_number(parent_snapshot.sequence_number())
15451549
.data_file(
15461550
DataFileBuilder::default()
1551+
.partition_spec_id(0)
15471552
.content(DataContentType::Data)
15481553
.file_path(format!("{}/2.parquet", &self.table_location))
15491554
.file_format(DataFileFormat::Parquet)
@@ -1566,6 +1571,7 @@ pub mod tests {
15661571
.file_sequence_number(parent_snapshot.sequence_number())
15671572
.data_file(
15681573
DataFileBuilder::default()
1574+
.partition_spec_id(0)
15691575
.content(DataContentType::Data)
15701576
.file_path(format!("{}/3.parquet", &self.table_location))
15711577
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/spec/manifest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,10 @@ impl DataFile {
14601460
pub(crate) fn rewrite_partition(&mut self, partition: Struct) {
14611461
self.partition = partition;
14621462
}
1463+
1464+
pub(crate) fn rewrite_partition_id(&mut self, partition_spec_id: i32) {
1465+
self.partition_spec_id = partition_spec_id;
1466+
}
14631467
}
14641468

14651469
/// Convert data files to avro bytes and write to writer.

crates/iceberg/src/transaction.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,7 @@ mod tests {
13641364

13651365
// check add data file with incompatible partition value
13661366
let data_file = DataFileBuilder::default()
1367+
.partition_spec_id(0)
13671368
.content(DataContentType::Data)
13681369
.file_path("test/3.parquet".to_string())
13691370
.file_format(DataFileFormat::Parquet)
@@ -1375,6 +1376,7 @@ mod tests {
13751376
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
13761377

13771378
let data_file = DataFileBuilder::default()
1379+
.partition_spec_id(0)
13781380
.content(DataContentType::Data)
13791381
.file_path("test/3.parquet".to_string())
13801382
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ use crate::Result;
3030
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
3131
inner: B,
3232
partition_value: Option<Struct>,
33+
partition_spec_id: i32,
3334
}
3435

3536
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
3637
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
37-
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
38+
pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: i32) -> Self {
3839
Self {
3940
inner,
4041
partition_value,
42+
partition_spec_id,
4143
}
4244
}
4345
}
@@ -50,6 +52,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5052
Ok(DataFileWriter {
5153
inner_writer: Some(self.inner.clone().build().await?),
5254
partition_value: self.partition_value.unwrap_or(Struct::empty()),
55+
partition_spec_id: self.partition_spec_id,
5356
})
5457
}
5558
}
@@ -59,6 +62,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5962
pub struct DataFileWriter<B: FileWriterBuilder> {
6063
inner_writer: Option<B::R>,
6164
partition_value: Struct,
65+
partition_spec_id: i32,
6266
}
6367

6468
#[async_trait::async_trait]
@@ -76,6 +80,7 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
7680
.map(|mut res| {
7781
res.content(DataContentType::Data);
7882
res.partition(self.partition_value.clone());
83+
res.partition_spec_id(self.partition_spec_id);
7984
res.build().expect("Guaranteed to be valid")
8085
})
8186
.collect_vec())
@@ -146,7 +151,10 @@ mod test {
146151
file_name_gen,
147152
);
148153

149-
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap();
154+
let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0)
155+
.build()
156+
.await
157+
.unwrap();
150158

151159
let arrow_schema = arrow_schema::Schema::new(vec![
152160
Field::new("foo", DataType::Int32, false),
@@ -215,7 +223,7 @@ mod test {
215223
);
216224

217225
let mut data_file_writer =
218-
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()))
226+
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()), 0)
219227
.build()
220228
.await?;
221229

crates/iceberg/src/writer/function_writer/equality_delta_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ mod test {
318318
location_gen.clone(),
319319
file_name_gen.clone(),
320320
);
321-
DataFileWriterBuilder::new(pw.clone(), None)
321+
DataFileWriterBuilder::new(pw.clone(), None, 0)
322322
};
323323
let position_delete_writer_builder = {
324324
let pw = ParquetWriterBuilder::new(

crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ impl<B: IcebergWriterBuilder> IcebergWriter for FanoutPartitionWriter<B> {
134134
let mut data_files = writer.close().await?;
135135
for data_file in data_files.iter_mut() {
136136
data_file.rewrite_partition(partition_value.clone());
137+
data_file.rewrite_partition_id(self.partition_splitter.partition_spec().spec_id());
137138
}
138139
result.append(&mut data_files);
139140
}
@@ -211,7 +212,7 @@ mod test {
211212
location_gen,
212213
file_name_gen,
213214
);
214-
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None);
215+
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None, 0);
215216
let mut fanout_partition_writer = FanoutPartitionWriterBuilder::new(
216217
data_file_writer_builder,
217218
Arc::new(partition_spec),

crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ mod test {
207207
location_gen,
208208
file_name_gen,
209209
);
210-
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None);
210+
let data_file_writer_builder = DataFileWriterBuilder::new(pw, None, 0);
211211
let mut precompute_partition_writer = PrecomputePartitionWriterBuilder::new(
212212
data_file_writer_builder,
213213
Arc::new(partition_spec),

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async fn test_append_data_file() {
7575
location_generator.clone(),
7676
file_name_generator.clone(),
7777
);
78-
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
78+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
7979
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
8080
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
8181
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);

0 commit comments

Comments
 (0)