Skip to content

Commit 6d8ad87

Browse files
committed
feat(wip): adress issues with reordered / skipped fields
1 parent e866e59 commit 6d8ad87

File tree

1 file changed

+49
-14
lines changed

1 file changed

+49
-14
lines changed

crates/iceberg/src/arrow/record_batch_evolution_processor.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use arrow_array::{
55
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
66
Int32Array, Int64Array, NullArray, RecordBatch, StringArray,
77
};
8-
use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
8+
use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
99
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
1010

1111
use crate::arrow::schema_to_arrow_schema;
@@ -108,22 +108,27 @@ impl RecordBatchEvolutionProcessor {
108108
) -> Result<Vec<EvolutionOp>> {
109109
let mut ops = vec![];
110110

111-
let mapped_arrow_schema = schema_to_arrow_schema(snapshot_schema)?;
111+
let mapped_unprojected_arrow_schema = schema_to_arrow_schema(snapshot_schema)?;
112+
// need to create a new arrow schema here by selecting fields from mapped_unprojected,
113+
// in the order of the field ids in projected_iceberg_field_ids
112114

113-
let mut arrow_schema_index: usize = 0;
114-
for (projected_field_idx, &field_id) in projected_iceberg_field_ids.iter().enumerate() {
115-
let iceberg_field = snapshot_schema.field_by_id(field_id).ok_or_else(|| {
115+
// right now the below is incorrect if projected_iceberg_field_ids skips any iceberg fields
116+
// or re-orders any
117+
118+
for &projected_field_id in projected_iceberg_field_ids {
119+
let iceberg_field = snapshot_schema.field_by_id(projected_field_id).ok_or_else(|| {
116120
Error::new(
117121
ErrorKind::Unexpected,
118122
"projected field id not found in snapshot schema",
119123
)
120124
})?;
121-
let mapped_arrow_field = mapped_arrow_schema.field(projected_field_idx);
125+
let (mapped_arrow_field, _) = Self::get_arrow_field_with_field_id(&mapped_arrow_schema, projected_field_id)?;
126+
let (orig_arrow_field, orig_arrow_field_idx) = Self::get_arrow_field_with_field_id(&source_schema, projected_field_id)?;
122127

123128
let (arrow_field, add_op_required) =
124-
if arrow_schema_index < source_schema.fields().len() {
125-
let arrow_field = source_schema.field(arrow_schema_index);
126-
let arrow_field_id: i32 = arrow_field
129+
if source_schema_idx < source_schema.fields().len() {
130+
let orig_arrow_field = source_schema.field(source_schema_idx);
131+
let arrow_field_id: i32 = orig_arrow_field
127132
.metadata()
128133
.get(PARQUET_FIELD_ID_META_KEY)
129134
.ok_or_else(|| {
@@ -139,7 +144,7 @@ impl RecordBatchEvolutionProcessor {
139144
format!("field id not parseable as an i32: {}", e),
140145
)
141146
})?;
142-
(Some(arrow_field), arrow_field_id != field_id)
147+
(Some(orig_arrow_field), arrow_field_id != projected_field_id)
143148
} else {
144149
(None, true)
145150
};
@@ -160,7 +165,7 @@ impl RecordBatchEvolutionProcessor {
160165
};
161166

162167
ops.push(EvolutionOp {
163-
index: arrow_schema_index,
168+
index: source_schema_idx,
164169
action: EvolutionAction::Add {
165170
value: default_value,
166171
target_type: mapped_arrow_field.data_type().clone(),
@@ -173,20 +178,50 @@ impl RecordBatchEvolutionProcessor {
173178
.equals_datatype(mapped_arrow_field.data_type())
174179
{
175180
ops.push(EvolutionOp {
176-
index: arrow_schema_index,
181+
index: source_schema_idx,
177182
action: EvolutionAction::Promote {
178183
target_type: mapped_arrow_field.data_type().clone(),
179184
},
180185
})
181186
}
182187

183-
arrow_schema_index += 1;
188+
source_schema_idx += 1;
184189
}
185190
}
186191

187192
Ok(ops)
188193
}
189194

195+
fn get_arrow_field_with_field_id(arrow_schema: &ArrowSchema, field_id: i32) -> Result<(FieldRef, usize)> {
196+
for (field, idx) in arrow_schema.fields().enumerate().iter() {
197+
let this_field_id: i32 = field
198+
.metadata()
199+
.get(PARQUET_FIELD_ID_META_KEY)
200+
.ok_or_else(|| {
201+
Error::new(
202+
ErrorKind::DataInvalid,
203+
"field ID not present in parquet metadata",
204+
)
205+
})?
206+
.parse()
207+
.map_err(|e| {
208+
Error::new(
209+
ErrorKind::DataInvalid,
210+
format!("field id not parseable as an i32: {}", e),
211+
)
212+
})?;
213+
214+
if this_field_id == field_id {
215+
return Ok((field.clone(), idx))
216+
}
217+
}
218+
219+
Err(Error::new(
220+
ErrorKind::Unexpected,
221+
format!("field with id {} not found in parquet schema", field_id)
222+
))
223+
}
224+
190225
fn transform_columns(
191226
&self,
192227
columns: &[Arc<dyn ArrowArray>],
@@ -201,7 +236,7 @@ impl RecordBatchEvolutionProcessor {
201236
let mut col_idx = 0;
202237
let mut op_idx = 0;
203238
while op_idx < self.operations.len() || col_idx < columns.len() {
204-
if self.operations[op_idx].index == col_idx {
239+
if op_idx < self.operations.len() && self.operations[op_idx].index == col_idx {
205240
match &self.operations[op_idx].action {
206241
EvolutionAction::Add { target_type, value } => {
207242
result.push(Self::create_column(target_type, value, num_rows)?);

0 commit comments

Comments
 (0)