Skip to content

Commit

Permalink
Merge branch 'main' into credential-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Nov 12, 2024
2 parents 64d7eca + 8c7b019 commit 40fb89e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ bytes = { version = "1" }
chrono = { version = ">0.4.34", default-features = false, features = ["clock"] }
tracing = { version = "0.1", features = ["log"] }
regex = { version = "1" }
thiserror = { version = "1" }
thiserror = { version = "2" }
url = { version = "2" }
urlencoding = "2.1.3"
uuid = { version = "1" }
Expand Down
18 changes: 13 additions & 5 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -217,11 +218,18 @@ impl DeltaWriter {
/// This will flush all remaining data.
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
let writers = std::mem::take(&mut self.partition_writers);
let mut actions = Vec::new();
for (_, writer) in writers {
let writer_actions = writer.close().await?;
actions.extend(writer_actions);
}
let actions = futures::stream::iter(writers)
.map(|(_, writer)| async move {
let writer_actions = writer.close().await?;
Ok::<_, DeltaTableError>(writer_actions)
})
.buffered(num_cpus::get())
.try_fold(Vec::new(), |mut acc, actions| {
acc.extend(actions);
futures::future::ready(Ok(acc))
})
.await?;

Ok(actions)
}
}
Expand Down

0 comments on commit 40fb89e

Please sign in to comment.