Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLN] Flush error propagation and debug #2131

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions rust/worker/src/blockstore/arrow/flusher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::errors::ChromaError;

use super::{
provider::{BlockManager, SparseIndexManager},
sparse_index::SparseIndex,
types::{ArrowWriteableKey, ArrowWriteableValue},
};
use std::collections::{HashMap, HashSet};
use crate::errors::ChromaError;
use std::collections::HashSet;
use uuid::Uuid;

pub(crate) struct ArrowBlockfileFlusher {
Expand Down Expand Up @@ -37,14 +36,13 @@ impl ArrowBlockfileFlusher {
pub(crate) async fn flush<K: ArrowWriteableKey, V: ArrowWriteableValue>(
self,
) -> Result<(), Box<dyn ChromaError>> {
// TODO: We could flush in parallel
for delta_id in self.modified_delta_ids {
self.block_manager.flush(&delta_id).await;
self.block_manager.flush(&delta_id).await?
}
// TODO: catch errors from the flush
let res = self
.sparse_index_manager
self.sparse_index_manager
.flush::<K>(&self.sparse_index.id)
.await;
.await?;
Ok(())
}

Expand Down
101 changes: 56 additions & 45 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use super::{
block::{self, delta::BlockDelta, Block},
blockfile::{self, ArrowBlockfileReader, ArrowBlockfileWriter},
sparse_index::{self, SparseIndex},
block::{delta::BlockDelta, Block},
blockfile::{ArrowBlockfileReader, ArrowBlockfileWriter},
sparse_index::SparseIndex,
types::{ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue},
};
use crate::{
blockstore::{
key::KeyWrapper,
memory::storage::Readable,
provider::{BlockfileProvider, CreateError, OpenError},
provider::{CreateError, OpenError},
BlockfileReader, BlockfileWriter, Key, Value,
},
errors::ChromaError,
errors::{ChromaError, ErrorCodes},
storage::Storage,
};
use core::panic;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
use tokio::{io::AsyncReadExt, pin};
use thiserror::Error;
use tokio::io::AsyncReadExt;
use uuid::Uuid;

/// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production).
Expand Down Expand Up @@ -192,35 +193,9 @@ impl BlockManager {
}
}
}

// match cache.get(id) {
// Some(block) => Some(block.clone()),
// None => {
// let key = format!("block/{}", id);
// let bytes = self.storage.get(&key).await;
// match bytes {
// Ok(mut bytes) => {
// let mut buf: Vec<u8> = Vec::new();
// bytes.read_to_end(&mut buf);
// let block = Block::from_bytes(&buf);
// match block {
// Ok(block) => {
// self.read_cache.write().insert(*id, block.clone());
// Some(block)
// }
// Err(_) => {
// // TODO: log error
// None
// }
// }
// }
// Err(_) => None,
// }
// }
// }
}

pub(super) async fn flush(&self, id: &Uuid) {
pub(super) async fn flush(&self, id: &Uuid) -> Result<(), Box<dyn ChromaError>> {
let block = self.get(id).await;

match block {
Expand All @@ -230,15 +205,32 @@ impl BlockManager {
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
println!("Block written to storage")
println!("Block: {} written to storage", id);
Ok(())
}
Err(e) => {
println!("Error writing block to storage {}", e);
Err(Box::new(e))
}
}
// TODO: error handling
}
None => {}
None => {
return Err(Box::new(BlockFlushError::NotFound));
}
}
}
}

#[derive(Error, Debug)]
pub enum BlockFlushError {
#[error("Not found")]
NotFound,
}

impl ChromaError for BlockFlushError {
fn code(&self) -> ErrorCodes {
match self {
BlockFlushError::NotFound => ErrorCodes::NotFound,
}
}
}
Expand Down Expand Up @@ -332,7 +324,10 @@ impl SparseIndexManager {
self.cache.write().insert(index.id, index);
}

pub async fn flush<'read, K: ArrowWriteableKey + 'read>(&self, id: &Uuid) {
pub async fn flush<'read, K: ArrowWriteableKey + 'read>(
&self,
id: &Uuid,
) -> Result<(), Box<dyn ChromaError>> {
let index = self.get::<K::ReadableKey<'read>>(id).await;
match index {
Some(index) => {
Expand All @@ -344,22 +339,24 @@ impl SparseIndexManager {
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
println!("Sparse index written to storage")
println!("Sparse index written to storage");
Ok(())
}
Err(_) => {
Err(e) => {
println!("Error writing sparse index to storage");
Err(Box::new(e))
}
}
}
Err(_) => {
// TODO: error
panic!("Failed to convert sparse index to block");
Err(e) => {
println!("Failed to convert sparse index to block");
Err(e)
}
}
}
None => {
// TODO: error
panic!("Tried to flush a sparse index that doesn't exist");
println!("Tried to flush a sparse index that doesn't exist");
return Err(Box::new(SparseIndexFlushError::NotFound));
}
}
}
Expand All @@ -377,3 +374,17 @@ impl SparseIndexManager {
forked
}
}

#[derive(Error, Debug)]
pub enum SparseIndexFlushError {
#[error("Not found")]
NotFound,
}

impl ChromaError for SparseIndexFlushError {
fn code(&self) -> ErrorCodes {
match self {
SparseIndexFlushError::NotFound => ErrorCodes::NotFound,
}
}
}
4 changes: 2 additions & 2 deletions rust/worker/src/index/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ impl HnswIndexProvider {
println!("Flushed hnsw index file: {}", file);
}
Err(e) => {
// TODO: return err
panic!("Failed to flush index: {}", e);
println!("Failed to flush index: {}", e);
return Err(Box::new(e));
}
}
}
Expand Down
89 changes: 77 additions & 12 deletions rust/worker/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,106 @@
use self::config::StorageConfig;
use self::s3::S3GetError;
use crate::config::Configurable;
use crate::errors::ChromaError;
use async_trait::async_trait;
use bytes::Bytes;
use crate::errors::{ChromaError, ErrorCodes};
use tokio::io::AsyncBufRead;
pub(crate) mod config;
pub(crate) mod local;
pub(crate) mod s3;
use thiserror::Error;

#[derive(Clone)]
pub(crate) enum Storage {
S3(s3::S3Storage),
Local(local::LocalStorage),
}

#[derive(Error, Debug)]
pub enum GetError {
#[error("No such key: {0}")]
NoSuchKey(String),
#[error("S3 error: {0}")]
S3Error(#[from] S3GetError),
#[error("Local storage error: {0}")]
LocalError(String),
}

impl ChromaError for GetError {
fn code(&self) -> ErrorCodes {
match self {
GetError::NoSuchKey(_) => ErrorCodes::NotFound,
GetError::S3Error(_) => ErrorCodes::Internal,
GetError::LocalError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub enum PutError {
#[error("S3 error: {0}")]
S3Error(#[from] s3::S3PutError),
#[error("Local storage error: {0}")]
LocalError(String),
}

impl ChromaError for PutError {
fn code(&self) -> ErrorCodes {
match self {
PutError::S3Error(_) => ErrorCodes::Internal,
PutError::LocalError(_) => ErrorCodes::Internal,
}
}
}

impl Storage {
pub(crate) async fn get(
&self,
key: &str,
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, String> {
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, GetError> {
match self {
Storage::S3(s3) => s3.get(key).await,
Storage::Local(local) => local.get(key).await,
Storage::S3(s3) => {
let res = s3.get(key).await;
match res {
Ok(res) => Ok(res),
Err(e) => match e {
S3GetError::NoSuchKey(_) => Err(GetError::NoSuchKey(key.to_string())),
_ => Err(GetError::S3Error(e)),
},
}
}
Storage::Local(local) => {
let res = local.get(key).await;
match res {
Ok(res) => Ok(res),
// TODO: Special case no such key if possible
Err(e) => Err(GetError::LocalError(e)),
}
}
}
}

pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), String> {
pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), PutError> {
match self {
Storage::S3(s3) => s3.put_file(key, path).await,
Storage::Local(local) => local.put_file(key, path).await,
Storage::S3(s3) => s3
.put_file(key, path)
.await
.map_err(|e| PutError::S3Error(e)),
Storage::Local(local) => local
.put_file(key, path)
.await
.map_err(|e| PutError::LocalError(e)),
}
}

pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), String> {
pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), PutError> {
match self {
Storage::S3(s3) => s3.put_bytes(key, bytes).await,
Storage::Local(local) => local.put_bytes(key, &bytes).await,
Storage::S3(s3) => s3
.put_bytes(key, bytes)
.await
.map_err(|e| PutError::S3Error(e)),
Storage::Local(local) => local
.put_bytes(key, &bytes)
.await
.map_err(|e| PutError::LocalError(e)),
}
}
}
Expand Down
Loading
Loading