Skip to content

Commit

Permalink
fix: use uuid as upload_id in obkv object store (#970)
Browse files Browse the repository at this point in the history
## Rationale
There is a possibility for the `upload_id` in the `obkv object store` to
become corrupted after restarting the server. This can result in errors
in the object's data.

## Detailed Changes
* use uuid as `upload_id`
  • Loading branch information
MichaelLeeHZ authored Jun 6, 2023
1 parent eba2293 commit e1b9e58
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 26 deletions.
33 changes: 17 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ table_kv = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] }
uuid = { version = "1.3.3", features = ["v4"] }

[dev-dependencies]
tempfile = { workspace = true }
5 changes: 1 addition & 4 deletions components/object_store/src/obkv/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ pub struct ObkvObjectMeta {
/// The paths of multi upload parts.
#[serde(rename = "parts")]
pub parts: Vec<String>,
/// The version of object, we use the upload_id as version.
/// TODO: Since `upload_id` is used by multiple objects, it may become very
/// large. Should we assign a version number to each object to avoid
/// this issue?
/// The version of object, Now we use the upload_id as version.
#[serde(rename = "version")]
pub version: String,
}
Expand Down
11 changes: 5 additions & 6 deletions components/object_store/src/obkv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use upstream::{
path::{Path, DELIMITER},
Error as StoreError, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};
use uuid::Uuid;

use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
Expand Down Expand Up @@ -167,7 +168,6 @@ pub struct ObkvObjectStore<T> {
/// The manager to manage object store meta, which persist in obkv
meta_manager: Arc<MetaManager<T>>,
client: Arc<T>,
current_upload_id: AtomicU64,
/// The size of one object part persited in obkv
/// It may cause problem to save huge data in one obkv value, so we
/// need to split data into small parts.
Expand Down Expand Up @@ -212,7 +212,6 @@ impl<T: TableKv> ObkvObjectStore<T> {
shard_manager,
meta_manager: Arc::new(meta_manager),
client,
current_upload_id: AtomicU64::new(0),
part_size,
max_object_size,
max_upload_concurrency,
Expand Down Expand Up @@ -347,13 +346,12 @@ impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let instant = Instant::now();

let upload_id = self.current_upload_id.fetch_add(1, Ordering::Relaxed);
let multi_part_id = format!("{upload_id}");
let upload_id = Uuid::new_v4().to_string();
let table_name = self.pick_shard_table(location);

let upload = ObkvMultiPartUpload {
location: location.clone(),
upload_id: multi_part_id.clone(),
upload_id: upload_id.clone(),
table_name: table_name.to_string(),
size: AtomicU64::new(0),
client: Arc::clone(&self.client),
Expand All @@ -367,7 +365,7 @@ impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
"ObkvObjectStore put_multipart operation, location:{location}, table_name:{table_name}, cost:{:?}",
instant.elapsed()
);
Ok((multi_part_id, Box::new(multi_part_upload)))
Ok((upload_id, Box::new(multi_part_upload)))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
Expand Down Expand Up @@ -754,6 +752,7 @@ impl<T: TableKv> CloudMultiPartUploadImpl for ObkvMultiPartUpload<T> {
};

// Save meta info to specify obkv table.
// TODO: We should remove the previous object data when update object.
self.meta_manager
.save(meta)
.await
Expand Down

0 comments on commit e1b9e58

Please sign in to comment.