Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Implement large bucket uploads split across multiple bulk KV API requests #601

Merged
merged 5 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions src/commands/kv/bucket/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,98 @@ use crate::settings::global_user::GlobalUser;
use crate::settings::target::Target;
use crate::terminal::message;

const KEY_MAX_SIZE: usize = 512;
const VALUE_MAX_SIZE: usize = 2 * 1024 * 1024;
const PAIRS_MAX_COUNT: usize = 10000;
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
const UPLOAD_MAX_SIZE: usize = 100 * 1024 * 1024;
gabbifish marked this conversation as resolved.
Show resolved Hide resolved

pub fn upload(
target: &Target,
user: GlobalUser,
namespace_id: &str,
path: &Path,
) -> Result<(), failure::Error> {
let pairs: Result<Vec<KeyValuePair>, failure::Error> = match &metadata(path) {
let mut pairs: Vec<KeyValuePair> = match &metadata(path) {
Ok(file_type) if file_type.is_dir() => directory_keys_values(path),
Ok(_file_type) => {
// any other file types (files, symlinks)
failure::bail!("wrangler kv:bucket upload takes a directory")
}
Err(e) => failure::bail!("{}", e),
};
}?;

validate_file_uploads(pairs.clone())?;

// Iterate over all key-value pairs and create batches of uploads, each of which are
// maximum 10K key-value pairs in size OR maximum ~50MB in size. Upload each batch
// as it is created.
let mut key_count = 0;
let mut key_pair_bytes = 0;
let mut key_value_batch: Vec<KeyValuePair> = Vec::new();

while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
// Last batch to upload
call_put_bulk_api(target, user.clone(), namespace_id, &mut key_value_batch)?;
} else {
let pair = pairs.pop().unwrap();
ashleymichal marked this conversation as resolved.
Show resolved Hide resolved
if key_count + pair.key.len() > PAIRS_MAX_COUNT
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
// Keep upload size small to keep KV bulk API happy
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE * 1/2
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
{
call_put_bulk_api(target, user.clone(), namespace_id, &mut key_value_batch)?;
gabbifish marked this conversation as resolved.
Show resolved Hide resolved

// If upload successful, reset counters
key_count = 0;
key_pair_bytes = 0;
}

// Add the popped key-value pair to the running batch of key-value pair uploads
key_count = key_count + pair.key.len();
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len();
key_value_batch.push(pair);
}
}

message::success("Success");
Ok(())
}

match put_bulk(target, user, namespace_id, pairs?) {
Ok(_) => message::success("Success"),
Err(e) => print!("{}", e),
fn call_put_bulk_api(
target: &Target,
user: GlobalUser,
namespace_id: &str,
key_value_batch: &mut Vec<KeyValuePair>,
) -> Result<(), failure::Error> {
message::info("Uploading...");
// If partial upload fails (e.g. server error), return that error message
put_bulk(target, user.clone(), namespace_id, key_value_batch.clone())?;

// Can clear batch now that we've uploaded it
key_value_batch.clear();
Ok(())
}

// Ensure that all key-value pairs being uploaded have valid sizes (this ensures that
// no partial uploads happen). I don't like this function because it duplicates the
// size checking the API already does--but doing a preemptive check like this (before
// calling the API) will prevent partial bucket uploads from happening.
pub fn validate_file_uploads(pairs: Vec<KeyValuePair>) -> Result<(), failure::Error> {
for pair in pairs {
if pair.key.len() > KEY_MAX_SIZE {
failure::bail!(
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
"Path `{}` exceeds the maximum key size limit of {} bytes",
pair.key,
KEY_MAX_SIZE
);
}
if pair.key.len() > KEY_MAX_SIZE {
failure::bail!(
"File `{}` exceeds the maximum value size limit of {} bytes",
pair.key,
VALUE_MAX_SIZE
);
}
}
Ok(())
}
1 change: 1 addition & 0 deletions src/commands/kv/bulk/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub fn put(
Ok(())
}

//todo(gabbi): Let's make sure to split very large payloads into multiple requests.
gabbifish marked this conversation as resolved.
Show resolved Hide resolved
pub fn put_bulk(
target: &Target,
user: GlobalUser,
Expand Down