Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upload multiple asset chunks in the same call #3947

Merged
merged 8 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ Valid settings are:
- `--log-viewer` flag with `dfx canister create`
- `canisters[].initialization_values.log_visibility.allowed_viewers` in `dfx.json`

### feat: batch upload assets

The frontend canister sync now tries to batch multiple small content chunks into a single call using the `create_chunks` method added earlier.
This should lead to significantly faster upload times for frontends with many small files.

## Dependencies

### Frontend canister

Bumped `api_version` to `2` for the previous addition of `create_chunks` since the improved file sync relies on it.

- Module hash: 9e4485d4358dd910aebcc025843547d05604cf28c6dc7c2cc2f8c76d083112e8
- https://github.com/dfinity/sdk/pull/3947

# 0.24.1

### feat: More PocketIC flags supported
Expand Down
27 changes: 20 additions & 7 deletions src/canisters/frontend/ic-asset/src/batch_upload/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use crate::canister_api::types::batch_upload::v1::{BatchOperationKind, CommitBat
use candid::Nat;
use std::collections::HashMap;

use super::plumbing::ChunkUploader;

pub(crate) const BATCH_UPLOAD_API_VERSION: u16 = 1;

pub(crate) fn assemble_batch_operations(
pub(crate) async fn assemble_batch_operations(
chunk_uploader: Option<&ChunkUploader<'_>>,
project_assets: &HashMap<String, ProjectAsset>,
canister_assets: HashMap<String, AssetDetails>,
asset_deletion_reason: AssetDeletionReason,
Expand All @@ -30,25 +33,28 @@ pub(crate) fn assemble_batch_operations(
);
create_new_assets(&mut operations, project_assets, &canister_assets);
unset_obsolete_encodings(&mut operations, project_assets, &canister_assets);
set_encodings(&mut operations, project_assets);
set_encodings(&mut operations, chunk_uploader, project_assets).await;
update_properties(&mut operations, project_assets, &canister_asset_properties);

operations
}

pub(crate) fn assemble_commit_batch_arguments(
pub(crate) async fn assemble_commit_batch_arguments(
chunk_uploader: &ChunkUploader<'_>,
project_assets: HashMap<String, ProjectAsset>,
canister_assets: HashMap<String, AssetDetails>,
asset_deletion_reason: AssetDeletionReason,
canister_asset_properties: HashMap<String, AssetProperties>,
batch_id: Nat,
) -> CommitBatchArguments {
let operations = assemble_batch_operations(
Some(chunk_uploader),
&project_assets,
canister_assets,
asset_deletion_reason,
canister_asset_properties,
);
)
.await;
CommitBatchArguments {
operations,
batch_id,
Expand Down Expand Up @@ -153,21 +159,28 @@ pub(crate) fn unset_obsolete_encodings(
}
}

pub(crate) fn set_encodings(
pub(crate) async fn set_encodings(
operations: &mut Vec<BatchOperationKind>,
chunk_uploader: Option<&ChunkUploader<'_>>,
project_assets: &HashMap<String, ProjectAsset>,
) {
for (key, project_asset) in project_assets {
for (content_encoding, v) in &project_asset.encodings {
if v.already_in_place {
continue;
}

let chunk_ids = if let Some(uploader) = chunk_uploader {
uploader
.uploader_ids_to_canister_chunk_ids(&v.uploader_chunk_ids)
.await
} else {
vec![]
};
operations.push(BatchOperationKind::SetAssetContent(
SetAssetContentArguments {
key: key.clone(),
content_encoding: content_encoding.clone(),
chunk_ids: v.chunk_ids.clone(),
chunk_ids,
sha256: Some(v.sha256.clone()),
},
));
Expand Down
138 changes: 130 additions & 8 deletions src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::asset::content::Content;
use crate::asset::content_encoder::ContentEncoder;
use crate::batch_upload::semaphores::Semaphores;
use crate::canister_api::methods::chunk::create_chunk;
use crate::canister_api::methods::chunk::create_chunks;
use crate::canister_api::types::asset::AssetDetails;
use crate::error::CreateChunkError;
use crate::error::CreateEncodingError;
Expand All @@ -14,10 +15,12 @@ use futures::TryFutureExt;
use ic_utils::Canister;
use mime::Mime;
use slog::{debug, info, Logger};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;

const CONTENT_ENCODING_IDENTITY: &str = "identity";

Expand All @@ -35,7 +38,7 @@ pub(crate) struct AssetDescriptor {
}

pub(crate) struct ProjectAssetEncoding {
pub(crate) chunk_ids: Vec<Nat>,
pub(crate) uploader_chunk_ids: Vec<usize>,
pub(crate) sha256: Vec<u8>,
pub(crate) already_in_place: bool,
}
Expand All @@ -46,30 +49,68 @@ pub(crate) struct ProjectAsset {
pub(crate) encodings: HashMap<String, ProjectAssetEncoding>,
}

type IdMapping = BTreeMap<usize, Nat>;
type UploadQueue = Vec<(usize, Vec<u8>)>;
pub(crate) struct ChunkUploader<'agent> {
canister: Canister<'agent>,
batch_id: Nat,
api_version: u16,
chunks: Arc<AtomicUsize>,
bytes: Arc<AtomicUsize>,
// maps uploader_chunk_id to canister_chunk_id
id_mapping: Arc<Mutex<IdMapping>>,
upload_queue: Arc<Mutex<UploadQueue>>,
}

impl<'agent> ChunkUploader<'agent> {
pub(crate) fn new(canister: Canister<'agent>, batch_id: Nat) -> Self {
pub(crate) fn new(canister: Canister<'agent>, api_version: u16, batch_id: Nat) -> Self {
Self {
canister,
batch_id,
api_version,
chunks: Arc::new(AtomicUsize::new(0)),
bytes: Arc::new(AtomicUsize::new(0)),
id_mapping: Arc::new(Mutex::new(BTreeMap::new())),
upload_queue: Arc::new(Mutex::new(vec![])),
}
}

/// Returns an uploader_chunk_id, which is different from the chunk id on the asset canister.
/// uploader_chunk_id can be mapped to canister_chunk_id using `uploader_ids_to_canister_chunk_ids`
/// once `finalize_upload` has completed.
pub(crate) async fn create_chunk(
&self,
contents: &[u8],
semaphores: &Semaphores,
) -> Result<Nat, CreateChunkError> {
self.chunks.fetch_add(1, Ordering::SeqCst);
) -> Result<usize, CreateChunkError> {
let uploader_chunk_id = self.chunks.fetch_add(1, Ordering::SeqCst);
self.bytes.fetch_add(contents.len(), Ordering::SeqCst);
create_chunk(&self.canister, &self.batch_id, contents, semaphores).await
if contents.len() == MAX_CHUNK_SIZE || self.api_version < 2 {
let canister_chunk_id =
create_chunk(&self.canister, &self.batch_id, contents, semaphores).await?;
let mut map = self.id_mapping.lock().await;
map.insert(uploader_chunk_id, canister_chunk_id);
Ok(uploader_chunk_id)
} else {
self.add_to_upload_queue(uploader_chunk_id, contents).await;
// Larger `max_retained_bytes` leads to batches that are filled closer to the max size.
// `4 * MAX_CHUNK_SIZE` leads to a pretty small memory footprint but still offers solid fill rates.
// Mini experiment:
// - Tested with: `for i in $(seq 1 50); do dd if=/dev/urandom of="src/hello_frontend/assets/file_$i.bin" bs=$(shuf -i 1-2000000 -n 1) count=1; done && dfx deploy hello_frontend`
// - Result: Roughly 15% of batches under 90% full.
// With other byte ranges (e.g. `shuf -i 1-3000000 -n 1`) stats improve significantly
self.upload_chunks(4 * MAX_CHUNK_SIZE, usize::MAX, semaphores)
.await?;
Ok(uploader_chunk_id)
}
}

pub(crate) async fn finalize_upload(
&self,
semaphores: &Semaphores,
) -> Result<(), CreateChunkError> {
self.upload_chunks(0, 0, semaphores).await?;
Ok(())
}

pub(crate) fn bytes(&self) -> usize {
Expand All @@ -78,6 +119,80 @@ impl<'agent> ChunkUploader<'agent> {
pub(crate) fn chunks(&self) -> usize {
self.chunks.load(Ordering::SeqCst)
}

/// Call only after `finalize_upload` has completed
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
&self,
uploader_ids: &[usize],
) -> Vec<Nat> {
let mapping = self.id_mapping.lock().await;
uploader_ids
.iter()
.map(|id| {
mapping
.get(id)
.expect("Chunk uploader did not upload all chunks. This is a bug.")
.clone()
})
.collect()
}

async fn add_to_upload_queue(&self, uploader_chunk_id: usize, contents: &[u8]) {
let mut queue = self.upload_queue.lock().await;
queue.push((uploader_chunk_id, contents.into()));
}

/// Calls `upload_chunks` with batches of chunks from `self.upload_queue` until at most `max_retained_bytes`
/// bytes and at most `max_retained_chunks` chunks remain in the upload queue. Larger values
/// will lead to better batch fill rates but also leave a larger memory footprint.
async fn upload_chunks(
&self,
max_retained_bytes: usize,
max_retained_chunks: usize,
semaphores: &Semaphores,
) -> Result<(), CreateChunkError> {
let mut queue = self.upload_queue.lock().await;

let mut batches = vec![];
while queue
.iter()
.map(|(_, content)| content.len())
.sum::<usize>()
> max_retained_bytes
|| queue.len() > max_retained_chunks
{
// Greedily fills batch with the largest chunk that fits
queue.sort_unstable_by_key(|(_, content)| content.len());
let mut batch = vec![];
let mut batch_size = 0;
for (uploader_chunk_id, content) in std::mem::take(&mut *queue).into_iter().rev() {
if content.len() <= MAX_CHUNK_SIZE - batch_size {
batch_size += content.len();
batch.push((uploader_chunk_id, content));
} else {
queue.push((uploader_chunk_id, content));
}
}
batches.push(batch);
}

try_join_all(batches.into_iter().map(|chunks| async move {
let (uploader_chunk_ids, chunks): (Vec<_>, Vec<_>) = chunks.into_iter().unzip();
let canister_chunk_ids =
create_chunks(&self.canister, &self.batch_id, chunks, semaphores).await?;
let mut map = self.id_mapping.lock().await;
for (uploader_id, canister_id) in uploader_chunk_ids
.into_iter()
.zip(canister_chunk_ids.into_iter())
{
map.insert(uploader_id, canister_id);
}
Ok(())
}))
.await?;

Ok(())
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -110,7 +225,7 @@ async fn make_project_asset_encoding(
false
};

let chunk_ids = if already_in_place {
let uploader_chunk_ids = if already_in_place {
info!(
logger,
" {}{} ({} bytes) sha {} is already installed",
Expand Down Expand Up @@ -144,7 +259,7 @@ async fn make_project_asset_encoding(
};

Ok(ProjectAssetEncoding {
chunk_ids,
uploader_chunk_ids,
sha256,
already_in_place,
})
Expand Down Expand Up @@ -305,6 +420,13 @@ pub(crate) async fn make_project_assets(
})
.collect();
let project_assets = try_join_all(project_asset_futures).await?;
if let Some(uploader) = chunk_upload_target {
uploader.finalize_upload(&semaphores).await.map_err(|err| {
CreateProjectAssetError::CreateEncodingError(CreateEncodingError::CreateChunkFailed(
err,
))
})?;
}

let mut hm = HashMap::new();
for project_asset in project_assets {
Expand All @@ -321,7 +443,7 @@ async fn upload_content_chunks(
content_encoding: &str,
semaphores: &Semaphores,
logger: &Logger,
) -> Result<Vec<Nat>, CreateChunkError> {
) -> Result<Vec<usize>, CreateChunkError> {
if content.data.is_empty() {
let empty = vec![];
let chunk_id = chunk_uploader.create_chunk(&empty, semaphores).await?;
Expand Down
15 changes: 8 additions & 7 deletions src/canisters/frontend/ic-asset/src/batch_upload/semaphores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,31 @@ const MAX_SIMULTANEOUS_LOADED_MB: usize = 50;
// How many simultaneous chunks being created at once
const MAX_SIMULTANEOUS_CREATE_CHUNK: usize = 50;

// How many simultaneous Agent.call() to create_chunk
// How many simultaneous Agent.call() to create_chunk(s)
const MAX_SIMULTANEOUS_CREATE_CHUNK_CALLS: usize = 25;

// How many simultaneous Agent.wait() on create_chunk result
// How many simultaneous Agent.wait() on create_chunk(s) result
const MAX_SIMULTANEOUS_CREATE_CHUNK_WAITS: usize = 25;

#[derive(Debug)]
pub(crate) struct Semaphores {
// The "file" semaphore limits how much file data to load at once. A given loaded file's data
// may be simultaneously encoded (gzip and so forth).
pub file: SharedSemaphore,

// The create_chunk semaphore limits the number of chunks that can be in the process
// of being created at one time. Since each chunk creation can involve retries,
// this focuses those retries on a smaller number of chunks.
// The create_chunk semaphore limits the number of chunk creation calls
// that can be in progress at one time. Since each chunk creation can involve retries,
// this focuses those retries on a smaller number of calls.
// Without this semaphore, every chunk would make its first attempt, before
// any chunk made its second attempt.
pub create_chunk: SharedSemaphore,

// The create_chunk_call semaphore limits the number of simultaneous
// agent.call()s to create_chunk.
// agent.call()s to create_chunk(s).
pub create_chunk_call: SharedSemaphore,

// The create_chunk_wait semaphore limits the number of simultaneous
// agent.wait() calls for outstanding create_chunk requests.
// agent.wait() calls for outstanding create_chunk(s) requests.
pub create_chunk_wait: SharedSemaphore,
}

Expand Down
Loading
Loading