Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BoxError trait to simplify the way to create generic error #627

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use log::info;
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down Expand Up @@ -58,11 +59,7 @@ impl TableEngine for TableEngineImpl {
info!("Try to close table engine");

// Close the instance.
self.instance
.close()
.await
.map_err(|e| Box::new(e) as _)
.context(Close)?;
self.instance.close().await.box_err().context(Close)?;

info!("Table engine closed");

Expand Down Expand Up @@ -99,7 +96,7 @@ impl TableEngine for TableEngineImpl {
ANALYTIC_ENGINE_TYPE.to_string(),
space_table,
)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(Unexpected)?,
),
};
Expand Down Expand Up @@ -151,7 +148,7 @@ impl TableEngine for TableEngineImpl {
ANALYTIC_ENGINE_TYPE.to_string(),
space_table,
)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(Unexpected)?,
),
};
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::{collections::HashMap, sync::Arc};

use common_util::error::BoxError;
use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
Expand Down Expand Up @@ -146,7 +147,7 @@ impl Instance {
.wal_manager
.write(&write_ctx, &log_batch)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WriteWal {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down Expand Up @@ -277,7 +278,7 @@ impl Instance {
);
let mut table_opts =
table_options::merge_table_options_for_alter(&options, &current_table_options)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidOptions {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down Expand Up @@ -322,7 +323,7 @@ impl Instance {
.wal_manager
.write(&write_ctx, &log_batch)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WriteWal {
space_id: table_data.space_id,
table: &table_data.name,
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::sync::Arc;

use common_util::error::BoxError;
use log::info;
use snafu::ResultExt;
use table_engine::engine::CreateTableRequest;
Expand Down Expand Up @@ -32,7 +33,7 @@ impl Instance {

let mut table_opts =
table_options::merge_table_options_for_create(&request.options, &self.table_opts)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidOptions {
space_id: space.id,
table: &request.table_name,
Expand Down
14 changes: 6 additions & 8 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::sync::Arc;

use common_types::schema::Version;
use common_util::define_result;
use common_util::{define_result, error::GenericError};
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
Expand Down Expand Up @@ -36,7 +36,7 @@ pub enum Error {
#[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))]
ReadMetaUpdate {
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand Down Expand Up @@ -106,7 +106,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand All @@ -120,7 +120,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand All @@ -134,7 +134,7 @@ pub enum Error {
space_id: SpaceId,
table: String,
table_id: TableId,
source: Box<dyn std::error::Error + Send + Sync>,
source: GenericError,
},

#[snafu(display(
Expand Down Expand Up @@ -187,9 +187,7 @@ pub enum Error {
AlterDroppedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit {
source: Box<dyn std::error::Error + Send + Sync>,
},
StoreVersionEdit { source: GenericError },

#[snafu(display(
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
Expand Down
42 changes: 17 additions & 25 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use common_types::{
time::TimeRange,
SequenceNumber,
};
use common_util::{config::ReadableDuration, define_result, runtime::Runtime, time};
use common_util::{
config::ReadableDuration,
define_result,
error::{BoxError, GenericError},
runtime::Runtime,
time,
};
use futures::{
channel::{mpsc, mpsc::channel},
future::try_join_all,
Expand Down Expand Up @@ -62,9 +68,7 @@ const DEFAULT_CHANNEL_SIZE: usize = 5;
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Failed to store version edit, err:{}", source))]
StoreVersionEdit {
source: Box<dyn std::error::Error + Send + Sync>,
},
StoreVersionEdit { source: GenericError },

#[snafu(display(
"Failed to purge wal, wal_location:{:?}, sequence:{}",
Expand All @@ -78,9 +82,7 @@ pub enum Error {
},

#[snafu(display("Failed to build mem table iterator, source:{}", source))]
InvalidMemIter {
source: Box<dyn std::error::Error + Send + Sync>,
},
InvalidMemIter { source: GenericError },

#[snafu(display(
"Failed to create sst writer, storage_format_hint:{:?}, err:{}",
Expand All @@ -93,10 +95,7 @@ pub enum Error {
},

#[snafu(display("Failed to write sst, file_path:{}, source:{}", path, source))]
WriteSst {
path: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
WriteSst { path: String, source: GenericError },

#[snafu(display("Background flush failed, cannot schedule flush task, err:{}", source))]
BackgroundFlushFailed {
Expand Down Expand Up @@ -125,9 +124,7 @@ pub enum Error {
},

#[snafu(display("Failed to split record batch, source:{}", source))]
SplitRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
},
SplitRecordBatch { source: GenericError },

#[snafu(display("Failed to read sst meta, source:{}", source))]
ReadSstMeta {
Expand Down Expand Up @@ -686,7 +683,7 @@ impl Instance {

for data in iter {
for (idx, record_batch) in split_record_batch_with_time_ranges(
data.map_err(|e| Box::new(e) as _).context(InvalidMemIter)?,
data.box_err().context(InvalidMemIter)?,
&time_ranges,
timestamp_idx,
)?
Expand Down Expand Up @@ -777,7 +774,7 @@ impl Instance {
let sst_info = writer
.write(request_id, &sst_meta, record_batch_stream)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.with_context(|| WriteSst {
path: sst_file_path.to_string(),
})?;
Expand Down Expand Up @@ -1004,7 +1001,7 @@ impl SpaceStore {
let sst_info = sst_writer
.write(request_id, &sst_meta, record_batch_stream)
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.with_context(|| WriteSst {
path: sst_file_path.to_string(),
})?;
Expand Down Expand Up @@ -1107,7 +1104,7 @@ fn split_record_batch_with_time_ranges(
};
builders[idx]
.append_row_view(&view)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(SplitRecordBatch)?;
} else {
panic!(
Expand All @@ -1118,12 +1115,7 @@ fn split_record_batch_with_time_ranges(
}
let mut ret = Vec::with_capacity(builders.len());
for mut builder in builders {
ret.push(
builder
.build()
.map_err(|e| Box::new(e) as _)
.context(SplitRecordBatch)?,
);
ret.push(builder.build().box_err().context(SplitRecordBatch)?);
}
Ok(ret)
}
Expand All @@ -1140,7 +1132,7 @@ fn build_mem_table_iter(memtable: MemTableRef, table_data: &TableData) -> Result
};
memtable
.scan(scan_ctx, scan_req)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(InvalidMemIter)
}

Expand Down
13 changes: 5 additions & 8 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema,
time::TimeRange,
};
use common_util::{define_result, runtime::Runtime};
use common_util::{define_result, error::BoxError, runtime::Runtime};
use futures::stream::Stream;
use log::{debug, error, trace};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -333,20 +333,17 @@ where
runtime.spawn(async move {
for mut iter in collection {
while let Some(record_batch) = iter.next_batch().await.transpose() {
let record_batch =
record_batch
.map_err(|e| Box::new(e) as _)
.context(ErrWithSource {
msg: "read record batch",
});
let record_batch = record_batch.box_err().context(ErrWithSource {
msg: "read record batch",
});

// Apply the projection to RecordBatchWithKey and gets the final RecordBatch.
let record_batch = record_batch.and_then(|batch_with_key| {
// TODO(yingwen): Try to use projector to do this, which precompute row
// indexes to project.
batch_with_key
.try_project(&projected_schema)
.map_err(|e| Box::new(e) as _)
.box_err()
.context(ErrWithSource {
msg: "project record batch",
})
Expand Down
17 changes: 7 additions & 10 deletions analytic_engine/src/instance/write_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{

use common_util::{
define_result,
error::{BoxError, GenericError},
runtime::{JoinHandle, Runtime},
time::InstantExt,
};
Expand Down Expand Up @@ -43,9 +44,7 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to wait flush completed, channel disconnected, err:{}", source))]
WaitFlush {
source: Box<dyn std::error::Error + Send + Sync>,
},
WaitFlush { source: GenericError },

#[snafu(display(
"Background flush failed, cannot write more data, err:{}.\nBacktrace:\n{}",
Expand All @@ -67,9 +66,7 @@ pub enum Error {
},

#[snafu(display("Channel error, err:{}", source))]
Channel {
source: Box<dyn std::error::Error + Send + Sync>,
},
Channel { source: GenericError },

#[snafu(display(
"Disallowed to manipulate table data, table does not belong to the worker, table:{}, worker:{}.\nBacktrace:\n{}",
Expand Down Expand Up @@ -232,7 +229,7 @@ impl WorkerLocal {
self.background_rx
.changed()
.await
.map_err(|e| Box::new(e) as _)
.box_err()
.context(WaitFlush)?;
}
assert!(!self.data.is_flushing());
Expand Down Expand Up @@ -534,7 +531,7 @@ pub async fn process_command_in_write_worker<T, E: std::error::Error + Send + Sy

// Receive alter options result.
match rx.await {
Ok(res) => res.map_err(|e| Box::new(e) as _).context(Channel),
Ok(res) => res.box_err().context(Channel),
Err(_) => ReceiveFromWorker {
table: &table_data.name,
worker_id: table_data.write_handle.worker_id(),
Expand All @@ -553,7 +550,7 @@ pub async fn join_all<T, E: std::error::Error + Send + Sync + 'static>(
let table_data = &table_vec[pos];
match res {
Ok(res) => {
res.map_err(|e| Box::new(e) as _).context(Channel)?;
res.box_err().context(Channel)?;
}
Err(_) => {
return ReceiveFromWorker {
Expand Down Expand Up @@ -910,7 +907,7 @@ impl WriteWorker {
TableAlterSchemaPolicy::Unknown,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.map_err(|e| Box::new(e) as GenericError)
.context(Channel);
if let Err(res) = tx.send(alter_res) {
error!(
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct LoadRequest {
pub cluster_version: u64,
pub shard_id: ShardId,
}
use common_util::error::GenericError;

/// Manifest holds meta data of all tables.
#[async_trait]
Expand All @@ -31,7 +32,7 @@ pub trait Manifest: Send + Sync + fmt::Debug {
async fn store_update(
&self,
request: MetaUpdateRequest,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;
) -> std::result::Result<(), GenericError>;

/// Load table meta data from manifest.
///
Expand All @@ -40,7 +41,7 @@ pub trait Manifest: Send + Sync + fmt::Debug {
async fn load_data(
&self,
load_request: &LoadRequest,
) -> Result<Option<TableManifestData>, Box<dyn std::error::Error + Send + Sync>>;
) -> Result<Option<TableManifestData>, GenericError>;
}

pub type ManifestRef = Arc<dyn Manifest>;
Loading