Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
Implement batching of bucket uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
gabbifish committed Sep 17, 2019
1 parent 77a8ded commit c4bc47e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 6 deletions.
86 changes: 85 additions & 1 deletion src/commands/kv/bucket/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ use crate::commands::kv::bucket::directory_keys_values;
use crate::commands::kv::bulk::put::put_bulk;
use crate::settings::global_user::GlobalUser;
use crate::settings::target::Target;
use crate::terminal::message;

const KEY_MAX_SIZE: usize = 512;
const VALUE_MAX_SIZE: usize = 2 * 1024 * 1024;
const PAIRS_MAX_COUNT: usize = 10000;
const UPLOAD_MAX_SIZE: usize = 100 * 1024 * 1024;

pub fn upload(
target: &Target,
Expand All @@ -23,5 +29,83 @@ pub fn upload(
Err(e) => failure::bail!("{}", e),
};

put_bulk(target, user, namespace_id, pairs?)
let mut pairs = pairs?;
validate_file_uploads(pairs.clone())?;

// Create a vector of uploads; that is, a vector of vectors of key-value pairs, each of which are
// maximum 10K key-value pairs in size OR maximum 100MB in size.
let mut key_count = 0;
let mut key_pair_bytes = 0;
let mut key_value_batch: Vec<KeyValuePair> = Vec::new();

while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
// Last batch to upload
call_put_bulk_api(target, user.clone(), namespace_id, &mut key_value_batch)?;
} else {
let pair = pairs.pop().unwrap();
if key_count + pair.key.len() > PAIRS_MAX_COUNT // Max KV pairs for request met
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE * 80 / 100
// key+value sums nearly at UPLOAD_MAX_SIZE
{
call_put_bulk_api(target, user.clone(), namespace_id, &mut key_value_batch)?;

// If upload successful, reset counters
key_count = 0;
key_pair_bytes = 0;
}

// Add the popped key-value pair to the running batch of key-value pair uploads
key_count = key_count + pair.key.len();
key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len();
key_value_batch.push(pair);
}
}

message::success("Success");
Ok(())
}

fn call_put_bulk_api(
target: &Target,
user: GlobalUser,
namespace_id: &str,
key_value_batch: &mut Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
message::info("Uploading...");
// If partial upload fails (e.g. server error), return that error message
put_bulk(
target,
user.clone(),
namespace_id,
key_value_batch.clone(),
false,
)?;

key_value_batch.clear();
Ok(())
}

// Ensure that all key-value pairs being uploaded have valid sizes (this ensures that
// no partial uploads happen). I don't like this function because it duplicates the
// size checking the API already does--but doing a preemptive check like this (before
// calling the API) will prevent partial bucket uploads from happening.
pub fn validate_file_uploads(pairs: Vec<KeyValuePair>) -> Result<(), failure::Error> {
for pair in pairs {
if pair.key.len() > KEY_MAX_SIZE {
failure::bail!(
"Path `{}` exceeds the maximum key size limit of {} bytes",
pair.key,
KEY_MAX_SIZE
);
}
if pair.key.len() > KEY_MAX_SIZE {
failure::bail!(
"File `{}` exceeds the maximum value size limit of {} bytes",
pair.key,
VALUE_MAX_SIZE
);
}
}
Ok(())
}
29 changes: 24 additions & 5 deletions src/commands/kv/bulk/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ pub fn put(
Err(e) => failure::bail!("{}", e),
};

put_bulk(target, user, namespace_id, pairs?)
put_bulk(target, user, namespace_id, pairs?, true)
}

//todo(gabbi): Let's make sure to split very large payloads into multiple requests.
pub fn put_bulk(
target: &Target,
user: GlobalUser,
namespace_id: &str,
pairs: Vec<KeyValuePair>,
print_result: bool,
) -> Result<(), failure::Error> {
let client = kv::api_client(user)?;

Expand All @@ -61,9 +63,26 @@ pub fn put_bulk(
});

match response {
Ok(_) => message::success("Success"),
Err(e) => kv::print_error(e),
Ok(_) => {
// todo(gabbi): I DO NOT LIKE THIS print_result LOGIC!
// In a future refactor, we should not print out error messages here,
// but return a Result that is handled by shared logic above this module.
// We can print out success in the caller to put_bulk, and as for errors,
// we should treat them like usual failure::Error and use
// our default error handing instead of printing out in kv::print_error.
// (We can use print_error for formatting though!)
if print_result {
message::success("Success");
}
Ok(())
}
Err(e) => {
if print_result {
kv::print_error(e);
Ok(())
} else {
failure::bail!(e)
}
}
}

Ok(())
}

0 comments on commit c4bc47e

Please sign in to comment.