Skip to content

Commit 50ab243

Browse files
committed
add parquet writer
1 parent d9d6cfc commit 50ab243

File tree

8 files changed

+369
-12
lines changed

8 files changed

+369
-12
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-schema = { version = ">=46" }
3636
async-trait = "0.1"
3737
bimap = "0.6"
3838
bitvec = "1.0.1"
39+
bytes = "1.5"
3940
chrono = "0.4"
4041
derive_builder = "0.12.0"
4142
either = "1"
@@ -53,6 +54,7 @@ opendal = "0.44"
5354
ordered-float = "4.0.0"
5455
pretty_assertions = "1.4.0"
5556
port_scanner = "0.1.5"
57+
parquet = { version = ">=46", features = ["async"] }
5658
reqwest = { version = "^0.11", features = ["json"] }
5759
rust_decimal = "1.31.0"
5860
serde = { version = "^1.0", features = ["rc"] }

crates/iceberg/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-schema = { workspace = true }
3636
async-trait = { workspace = true }
3737
bimap = { workspace = true }
3838
bitvec = { workspace = true }
39+
bytes ={ workspace = true }
3940
chrono = { workspace = true }
4041
derive_builder = { workspace = true }
4142
either = { workspace = true }
@@ -59,9 +60,10 @@ typed-builder = { workspace = true }
5960
url = { workspace = true }
6061
urlencoding = { workspace = true }
6162
uuid = { workspace = true }
63+
parquet ={ workspace = true }
64+
tokio = { workspace = true }
6265

6366
[dev-dependencies]
6467
pretty_assertions = { workspace = true }
6568
tempfile = { workspace = true }
6669
tera = { workspace = true }
67-
tokio = { workspace = true }

crates/iceberg/src/io.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use std::{collections::HashMap, sync::Arc};
5353
use crate::{error::Result, Error, ErrorKind};
5454
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
5555
use once_cell::sync::Lazy;
56-
use opendal::{Operator, Scheme};
56+
use opendal::{Operator, Scheme, Writer};
5757
use url::Url;
5858

5959
/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
@@ -245,7 +245,7 @@ pub trait FileWrite: AsyncWrite {}
245245
impl<T> FileWrite for T where T: AsyncWrite {}
246246

247247
/// Output file is used for writing to files..
248-
#[derive(Debug)]
248+
#[derive(Debug, Clone)]
249249
pub struct OutputFile {
250250
op: Operator,
251251
// Absolution path of file.
@@ -268,6 +268,16 @@ impl OutputFile {
268268
.await?)
269269
}
270270

271+
/// Delete file.
272+
pub async fn delete(&self) -> Result<()> {
273+
// #TODO
274+
// Do we need to check if file exists?
275+
if self.exists().await? {
276+
self.op.delete(&self.path[self.relative_path_pos..]).await?
277+
}
278+
Ok(())
279+
}
280+
271281
/// Converts into [`InputFile`].
272282
pub fn to_input_file(self) -> InputFile {
273283
InputFile {
@@ -278,7 +288,7 @@ impl OutputFile {
278288
}
279289

280290
/// Creates output file for writing.
281-
pub async fn writer(&self) -> Result<impl FileWrite> {
291+
pub async fn writer(&self) -> Result<Writer> {
282292
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
283293
}
284294
}

crates/iceberg/src/scan.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl FileScanTask {
188188
mod tests {
189189
use crate::io::{FileIO, OutputFile};
190190
use crate::spec::{
191-
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
191+
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
192192
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
193193
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
194194
};
@@ -351,14 +351,15 @@ mod tests {
351351
ManifestEntry::builder()
352352
.status(ManifestStatus::Added)
353353
.data_file(
354-
DataFile::builder()
354+
DataFileBuilder::default()
355355
.content(DataContentType::Data)
356356
.file_path(format!("{}/1.parquet", &fixture.table_location))
357357
.file_format(DataFileFormat::Parquet)
358358
.file_size_in_bytes(100)
359359
.record_count(1)
360360
.partition(Struct::from_iter([Some(Literal::long(100))]))
361-
.build(),
361+
.build()
362+
.unwrap(),
362363
)
363364
.build(),
364365
ManifestEntry::builder()
@@ -367,14 +368,15 @@ mod tests {
367368
.sequence_number(parent_snapshot.sequence_number())
368369
.file_sequence_number(parent_snapshot.sequence_number())
369370
.data_file(
370-
DataFile::builder()
371+
DataFileBuilder::default()
371372
.content(DataContentType::Data)
372373
.file_path(format!("{}/2.parquet", &fixture.table_location))
373374
.file_format(DataFileFormat::Parquet)
374375
.file_size_in_bytes(100)
375376
.record_count(1)
376377
.partition(Struct::from_iter([Some(Literal::long(200))]))
377-
.build(),
378+
.build()
379+
.unwrap(),
378380
)
379381
.build(),
380382
ManifestEntry::builder()
@@ -383,14 +385,15 @@ mod tests {
383385
.sequence_number(parent_snapshot.sequence_number())
384386
.file_sequence_number(parent_snapshot.sequence_number())
385387
.data_file(
386-
DataFile::builder()
388+
DataFileBuilder::default()
387389
.content(DataContentType::Data)
388390
.file_path(format!("{}/3.parquet", &fixture.table_location))
389391
.file_format(DataFileFormat::Parquet)
390392
.file_size_in_bytes(100)
391393
.record_count(1)
392394
.partition(Struct::from_iter([Some(Literal::long(300))]))
393-
.build(),
395+
.build()
396+
.unwrap(),
394397
)
395398
.build(),
396399
],

crates/iceberg/src/spec/manifest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ impl TryFrom<i32> for ManifestStatus {
924924
}
925925

926926
/// Data file carries data file path, partition tuple, metrics, …
927-
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
927+
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
928928
pub struct DataFile {
929929
/// field id: 134
930930
///

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ use crate::Result;
2222
use arrow_array::RecordBatch;
2323
use futures::Future;
2424

25+
pub mod parquet_writer;
26+
mod track_writer;
27+
2528
/// File writer builder trait.
2629
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
2730
/// The associated file writer type.

0 commit comments

Comments
 (0)