Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(writer): retry storage.put on temporary network errors #2207

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, CommitInfo, ReaderFeatures, WriterFeatures};
use crate::logstore::LogStore;
use crate::protocol::DeltaOperation;
use crate::storage::ObjectStoreRetryExt;
use crate::table::state::DeltaTableState;

pub use self::protocol::INSTANCE as PROTOCOL;
Expand Down Expand Up @@ -242,13 +243,19 @@ pub async fn commit_with_retries(
attempt_number += 1;
}
Err(err) => {
log_store.object_store().delete(&tmp_commit).await?;
log_store
.object_store()
.delete_with_retries(&tmp_commit, 15)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
}
Err(err) => {
log_store.object_store().delete(&tmp_commit).await?;
log_store
.object_store()
.delete_with_retries(&tmp_commit, 15)
.await?;
return Err(err.into());
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use url::Url;

pub mod file;
pub mod retry_ext;
pub mod utils;

use crate::{DeltaResult, DeltaTableError};
Expand All @@ -22,6 +23,7 @@ pub use object_store::{
DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result as ObjectStoreResult,
};
pub use retry_ext::ObjectStoreRetryExt;
pub use utils::*;

lazy_static! {
Expand Down
82 changes: 82 additions & 0 deletions crates/core/src/storage/retry_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Retry extension for [`ObjectStore`]

use bytes::Bytes;
use object_store::{path::Path, Error, ObjectStore, PutResult, Result};
use tracing::log::*;

/// Retry extension for [`ObjectStore`]
///
/// Read-only operations are retried by [`ObjectStore`] internally. However, PUT/DELETE operations
/// are not retried even thought they are technically idempotent. [`ObjectStore`] does not retry
/// those operations because having preconditions may produce different results for the same
/// request. PUT/DELETE operations without preconditions are idempotent and can be retried.
/// Unfortunately, [`ObjectStore`]'s retry mechanism only works on HTTP request level, thus there
/// is no way to distinguish whether a request has preconditions or not.
///
/// This trait provides additional methods for working with [`ObjectStore`] that automatically retry
/// unconditional operations when they fail.
///
/// See also:
/// - https://github.com/apache/arrow-rs/pull/5278
#[async_trait::async_trait]
pub trait ObjectStoreRetryExt: ObjectStore {
/// Save the provided bytes to the specified location
///
/// The operation is guaranteed to be atomic, it will either successfully write the entirety of
/// bytes to location, or fail. No clients should be able to observe a partially written object
///
/// Note that `put_with_opts` may have precondition semantics, and thus may not be retriable.
async fn put_with_retries(
&self,
location: &Path,
bytes: Bytes,
max_retries: usize,
) -> Result<PutResult> {
let mut attempt_number = 1;
while attempt_number <= max_retries {
match self.put(location, bytes.clone()).await {
Ok(result) => return Ok(result),
Err(err) if attempt_number == max_retries => {
return Err(err);
}
Err(Error::Generic { store, source }) => {
debug!(
"put_with_retries attempt {} failed: {} {}",
attempt_number, store, source
);
attempt_number += 1;
}
Err(err) => {
return Err(err);
}
}
}
unreachable!("loop yields Ok or Err in body when attempt_number = max_retries")
}

/// Delete the object at the specified location
async fn delete_with_retries(&self, location: &Path, max_retries: usize) -> Result<()> {
let mut attempt_number = 1;
while attempt_number <= max_retries {
match self.delete(location).await {
Ok(()) | Err(Error::NotFound { .. }) => return Ok(()),
Err(err) if attempt_number == max_retries => {
return Err(err);
}
Err(Error::Generic { store, source }) => {
debug!(
"delete_with_retries attempt {} failed: {} {}",
attempt_number, store, source
);
attempt_number += 1;
}
Err(err) => {
return Err(err);
}
}
}
unreachable!("loop yields Ok or Err in body when attempt_number = max_retries")
}
}

impl<T: ObjectStore> ObjectStoreRetryExt for T {}
3 changes: 2 additions & 1 deletion crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::utils::{
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{Add, PartitionsExt, Scalar, StructType};
use crate::storage::ObjectStoreRetryExt;
use crate::table::builder::DeltaTableBuilder;
use crate::writer::utils::ShareableBuffer;
use crate::DeltaTable;
Expand Down Expand Up @@ -360,7 +361,7 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
self.storage.put_with_retries(&path, obj_bytes, 15).await?;

actions.push(create_add(
&writer.partition_values,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::utils::{
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType};
use crate::storage::ObjectStoreRetryExt;
use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;

Expand Down Expand Up @@ -215,7 +216,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
self.storage.put_with_retries(&path, obj_bytes, 15).await?;

actions.push(create_add(
&writer.partition_values,
Expand Down
Loading