Skip to content

Commit 59af94f

Browse files
committed
fullfill partition spec id
1 parent d82c2f3 commit 59af94f

18 files changed

+82
-29
lines changed

crates/iceberg/src/arrow/record_batch_partition_spliter.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2727

2828
use super::record_batch_projector::RecordBatchProjector;
2929
use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type};
30-
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
30+
use crate::spec::{Literal, PartitionSpec, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
3131
use crate::transform::{create_transform_function, BoxedTransformFunction};
3232
use crate::{Error, ErrorKind, Result};
3333

@@ -186,6 +186,10 @@ impl RecordBatchPartitionSpliter {
186186
})
187187
}
188188

189+
pub(crate) fn partition_spec(&self) -> &PartitionSpec {
190+
self.partition_spec.as_ref()
191+
}
192+
189193
/// Split the record batch into multiple record batches by the partition spec.
190194
pub(crate) fn split(&self, batch: &RecordBatch) -> Result<Vec<(OwnedRow, RecordBatch)>> {
191195
// get array using partition spec

crates/iceberg/src/io/object_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ mod tests {
277277
.status(ManifestStatus::Added)
278278
.data_file(
279279
DataFileBuilder::default()
280+
.partition_spec_id(0)
280281
.content(DataContentType::Data)
281282
.file_path(format!("{}/1.parquet", &self.table_location))
282283
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/scan.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,7 @@ pub mod tests {
13081308
.status(ManifestStatus::Added)
13091309
.data_file(
13101310
DataFileBuilder::default()
1311+
.partition_spec_id(0)
13111312
.content(DataContentType::Data)
13121313
.file_path(format!("{}/1.parquet", &self.table_location))
13131314
.file_format(DataFileFormat::Parquet)
@@ -1330,6 +1331,7 @@ pub mod tests {
13301331
.file_sequence_number(parent_snapshot.sequence_number())
13311332
.data_file(
13321333
DataFileBuilder::default()
1334+
.partition_spec_id(0)
13331335
.content(DataContentType::Data)
13341336
.file_path(format!("{}/2.parquet", &self.table_location))
13351337
.file_format(DataFileFormat::Parquet)
@@ -1351,6 +1353,7 @@ pub mod tests {
13511353
.file_sequence_number(parent_snapshot.sequence_number())
13521354
.data_file(
13531355
DataFileBuilder::default()
1356+
.partition_spec_id(0)
13541357
.content(DataContentType::Data)
13551358
.file_path(format!("{}/3.parquet", &self.table_location))
13561359
.file_format(DataFileFormat::Parquet)
@@ -1521,6 +1524,7 @@ pub mod tests {
15211524
.status(ManifestStatus::Added)
15221525
.data_file(
15231526
DataFileBuilder::default()
1527+
.partition_spec_id(0)
15241528
.content(DataContentType::Data)
15251529
.file_path(format!("{}/1.parquet", &self.table_location))
15261530
.file_format(DataFileFormat::Parquet)
@@ -1544,6 +1548,7 @@ pub mod tests {
15441548
.file_sequence_number(parent_snapshot.sequence_number())
15451549
.data_file(
15461550
DataFileBuilder::default()
1551+
.partition_spec_id(0)
15471552
.content(DataContentType::Data)
15481553
.file_path(format!("{}/2.parquet", &self.table_location))
15491554
.file_format(DataFileFormat::Parquet)
@@ -1566,6 +1571,7 @@ pub mod tests {
15661571
.file_sequence_number(parent_snapshot.sequence_number())
15671572
.data_file(
15681573
DataFileBuilder::default()
1574+
.partition_spec_id(0)
15691575
.content(DataContentType::Data)
15701576
.file_path(format!("{}/3.parquet", &self.table_location))
15711577
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/spec/manifest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,10 @@ impl DataFile {
14601460
pub(crate) fn rewrite_partition(&mut self, partition: Struct) {
14611461
self.partition = partition;
14621462
}
1463+
1464+
pub(crate) fn rewrite_partition_id(&mut self, partition_spec_id: i32) {
1465+
self.partition_spec_id = partition_spec_id;
1466+
}
14631467
}
14641468

14651469
/// Convert data files to avro bytes and write to writer.

crates/iceberg/src/transaction.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,7 @@ mod tests {
13641364

13651365
// check add data file with incompatible partition value
13661366
let data_file = DataFileBuilder::default()
1367+
.partition_spec_id(0)
13671368
.content(DataContentType::Data)
13681369
.file_path("test/3.parquet".to_string())
13691370
.file_format(DataFileFormat::Parquet)
@@ -1375,6 +1376,7 @@ mod tests {
13751376
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
13761377

13771378
let data_file = DataFileBuilder::default()
1379+
.partition_spec_id(0)
13781380
.content(DataContentType::Data)
13791381
.file_path("test/3.parquet".to_string())
13801382
.file_format(DataFileFormat::Parquet)

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ use crate::Result;
3030
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
3131
inner: B,
3232
partition_value: Option<Struct>,
33+
partition_spec_id: Option<i32>,
3334
}
3435

3536
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
3637
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
37-
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
38+
pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: Option<i32>) -> Self {
3839
Self {
3940
inner,
4041
partition_value,
42+
partition_spec_id,
4143
}
4244
}
4345
}
@@ -50,6 +52,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5052
Ok(DataFileWriter {
5153
inner_writer: Some(self.inner.clone().build().await?),
5254
partition_value: self.partition_value.unwrap_or(Struct::empty()),
55+
partition_spec_id: self.partition_spec_id.unwrap_or(0),
5356
})
5457
}
5558
}
@@ -59,6 +62,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5962
pub struct DataFileWriter<B: FileWriterBuilder> {
6063
inner_writer: Option<B::R>,
6164
partition_value: Struct,
65+
partition_spec_id: i32,
6266
}
6367

6468
#[async_trait::async_trait]
@@ -76,6 +80,7 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
7680
.map(|mut res| {
7781
res.content(DataContentType::Data);
7882
res.partition(self.partition_value.clone());
83+
res.partition_spec_id(self.partition_spec_id);
7984
res.build().expect("Guaranteed to be valid")
8085
})
8186
.collect_vec())
@@ -146,7 +151,10 @@ mod test {
146151
file_name_gen,
147152
);
148153

149-
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap();
154+
let mut data_file_writer = DataFileWriterBuilder::new(pw, None, None)
155+
.build()
156+
.await
157+
.unwrap();
150158

151159
let arrow_schema = arrow_schema::Schema::new(vec![
152160
Field::new("foo", DataType::Int32, false),
@@ -215,7 +223,7 @@ mod test {
215223
);
216224

217225
let mut data_file_writer =
218-
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()))
226+
DataFileWriterBuilder::new(parquet_writer_builder, Some(partition_value.clone()), None)
219227
.build()
220228
.await?;
221229

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct EqualityDeleteWriterConfig {
5353
// Projector used to project the data chunk into specific fields.
5454
projector: RecordBatchProjector,
5555
partition_value: Struct,
56+
partition_spec_id: i32,
5657
}
5758

5859
impl EqualityDeleteWriterConfig {
@@ -61,6 +62,7 @@ impl EqualityDeleteWriterConfig {
6162
equality_ids: Vec<i32>,
6263
original_schema: SchemaRef,
6364
partition_value: Option<Struct>,
65+
partition_spec_id: Option<i32>,
6466
) -> Result<Self> {
6567
let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
6668
let projector = RecordBatchProjector::new(
@@ -97,6 +99,7 @@ impl EqualityDeleteWriterConfig {
9799
equality_ids,
98100
projector,
99101
partition_value: partition_value.unwrap_or(Struct::empty()),
102+
partition_spec_id: partition_spec_id.unwrap_or(0),
100103
})
101104
}
102105

@@ -116,6 +119,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuil
116119
projector: self.config.projector,
117120
equality_ids: self.config.equality_ids,
118121
partition_value: self.config.partition_value,
122+
partition_spec_id: self.config.partition_spec_id,
119123
})
120124
}
121125
}
@@ -127,6 +131,7 @@ pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
127131
projector: RecordBatchProjector,
128132
equality_ids: Vec<i32>,
129133
partition_value: Struct,
134+
partition_spec_id: i32,
130135
}
131136

132137
#[async_trait::async_trait]
@@ -153,6 +158,7 @@ impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
153158
res.content(crate::spec::DataContentType::EqualityDeletes);
154159
res.equality_ids(self.equality_ids.iter().copied().collect_vec());
155160
res.partition(self.partition_value.clone());
161+
res.partition_spec_id(self.partition_spec_id);
156162
res.build().expect("msg")
157163
})
158164
.collect_vec())
@@ -387,7 +393,7 @@ mod test {
387393

388394
let equality_ids = vec![0_i32, 8];
389395
let equality_config =
390-
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
396+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, None).unwrap();
391397
let delete_schema =
392398
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
393399
let projector = equality_config.projector.clone();
@@ -484,19 +490,19 @@ mod test {
484490
.unwrap(),
485491
);
486492
// Float and Double are not allowed to be used for equality delete
487-
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err());
488-
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err());
493+
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None, None).is_err());
494+
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, None).is_err());
489495
// Struct is not allowed to be used for equality delete
490-
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err());
496+
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None, None).is_err());
491497
// Nested field of struct is allowed to be used for equality delete
492-
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok());
498+
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None, None).is_ok());
493499
// Nested field of map is not allowed to be used for equality delete
494-
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err());
495-
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err());
496-
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err());
500+
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None, None).is_err());
501+
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None, None).is_err());
502+
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None, None).is_err());
497503
// Nested field of list is not allowed to be used for equality delete
498-
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err());
499-
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err());
504+
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None, None).is_err());
505+
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None, None).is_err());
500506

501507
Ok(())
502508
}
@@ -549,7 +555,8 @@ mod test {
549555
.unwrap(),
550556
);
551557
let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
552-
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap();
558+
let config =
559+
EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None, None).unwrap();
553560
let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
554561
let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();
555562

@@ -740,7 +747,7 @@ mod test {
740747
let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
741748
let equality_ids = vec![0_i32, 2, 5];
742749
let equality_config =
743-
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
750+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None, None).unwrap();
744751
let projector = equality_config.projector.clone();
745752

746753
// check

crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,22 @@ pub struct SortPositionDeleteWriterBuilder<B: FileWriterBuilder> {
3535
inner: B,
3636
cache_num: usize,
3737
partition_value: Option<Struct>,
38+
partition_spec_id: Option<i32>,
3839
}
3940

4041
impl<B: FileWriterBuilder> SortPositionDeleteWriterBuilder<B> {
4142
/// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
42-
pub fn new(inner: B, cache_num: usize, partition_value: Option<Struct>) -> Self {
43+
pub fn new(
44+
inner: B,
45+
cache_num: usize,
46+
partition_value: Option<Struct>,
47+
partition_spec_id: Option<i32>,
48+
) -> Self {
4349
Self {
4450
inner,
4551
cache_num,
4652
partition_value,
53+
partition_spec_id,
4754
}
4855
}
4956
}
@@ -86,6 +93,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput, Vec<DataFil
8693
cache: BTreeMap::new(),
8794
data_files: Vec::new(),
8895
partition_value: self.partition_value.unwrap_or(Struct::empty()),
96+
partition_spec_id: self.partition_spec_id.unwrap_or(0),
8997
})
9098
}
9199
}
@@ -106,6 +114,7 @@ pub struct SortPositionDeleteWriter<B: FileWriterBuilder> {
106114
cache: BTreeMap<String, Vec<i64>>,
107115
data_files: Vec<DataFile>,
108116
partition_value: Struct,
117+
partition_spec_id: i32,
109118
}
110119

111120
impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
@@ -140,6 +149,7 @@ impl<B: FileWriterBuilder> SortPositionDeleteWriter<B> {
140149
.extend(writer.close().await?.into_iter().map(|mut res| {
141150
res.content(crate::spec::DataContentType::PositionDeletes);
142151
res.partition(self.partition_value.clone());
152+
res.partition_spec_id(self.partition_spec_id);
143153
res.build().expect("Guaranteed to be valid")
144154
}));
145155
Ok(())
@@ -204,7 +214,7 @@ mod test {
204214
location_gen,
205215
file_name_gen,
206216
);
207-
let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None)
217+
let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None, None)
208218
.build()
209219
.await?;
210220

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ impl ParquetWriter {
492492
.file_path(file_path)
493493
.file_format(DataFileFormat::Parquet)
494494
.partition(Struct::empty())
495+
.partition_spec_id(0)
495496
.record_count(metadata.file_metadata().num_rows() as u64)
496497
.file_size_in_bytes(written_size as u64)
497498
.column_sizes(column_sizes)
@@ -840,6 +841,7 @@ mod tests {
840841
// Put dummy field for build successfully.
841842
.content(crate::spec::DataContentType::Data)
842843
.partition(Struct::empty())
844+
.partition_spec_id(0)
843845
.build()
844846
.unwrap();
845847

@@ -1035,6 +1037,7 @@ mod tests {
10351037
// Put dummy field for build successfully.
10361038
.content(crate::spec::DataContentType::Data)
10371039
.partition(Struct::empty())
1040+
.partition_spec_id(0)
10381041
.build()
10391042
.unwrap();
10401043

@@ -1225,6 +1228,7 @@ mod tests {
12251228
// Put dummy field for build successfully.
12261229
.content(crate::spec::DataContentType::Data)
12271230
.partition(Struct::empty())
1231+
.partition_spec_id(0)
12281232
.build()
12291233
.unwrap();
12301234

@@ -1373,6 +1377,7 @@ mod tests {
13731377
.unwrap()
13741378
.content(crate::spec::DataContentType::Data)
13751379
.partition(Struct::empty())
1380+
.partition_spec_id(0)
13761381
.build()
13771382
.unwrap();
13781383
assert_eq!(
@@ -1425,6 +1430,7 @@ mod tests {
14251430
.unwrap()
14261431
.content(crate::spec::DataContentType::Data)
14271432
.partition(Struct::empty())
1433+
.partition_spec_id(0)
14281434
.build()
14291435
.unwrap();
14301436
assert_eq!(
@@ -1483,6 +1489,7 @@ mod tests {
14831489
.unwrap()
14841490
.content(crate::spec::DataContentType::Data)
14851491
.partition(Struct::empty())
1492+
.partition_spec_id(0)
14861493
.build()
14871494
.unwrap();
14881495
assert_eq!(
@@ -1537,7 +1544,7 @@ mod tests {
15371544
// .next()
15381545
// .unwrap()
15391546
// .content(crate::spec::DataContentType::Data)
1540-
// .partition(Struct::empty())
1547+
// .partition(Struct::empty()).partition_spec_id(0)
15411548
// .build()
15421549
// .unwrap();
15431550
// assert_eq!(

0 commit comments

Comments
 (0)