Skip to content

Commit

Permalink
refactor: refactor the error handling logic in compact
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Aug 4, 2023
1 parent 31bc5c8 commit 356fb9d
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 98 deletions.
78 changes: 20 additions & 58 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,38 +437,26 @@ mod test {
..Default::default()
};

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} is higher than current revision {}",
range_request_with_future_rev.revision, current_revision
))
.to_string();
let message = tonic::Status::from(
let expected_tonic_status = tonic::Status::from(
range_request_with_future_rev
.check_revision(compacted_revision, current_revision)
.unwrap_err(),
)
.to_string();
assert_eq!(message, expected_err_message);
);
assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);

let range_request_with_compacted_rev = RangeRequest {
key: b"foo".to_vec(),
revision: 2,
..Default::default()
};

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} has been compacted, compacted revision is {}",
range_request_with_compacted_rev.revision, compacted_revision
))
.to_string();

let message = tonic::Status::from(
let expected_tonic_status = tonic::Status::from(
range_request_with_compacted_rev
.check_revision(compacted_revision, current_revision)
.unwrap_err(),
)
.to_string();
assert_eq!(message, expected_err_message);
);

assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);
}

#[tokio::test]
Expand All @@ -487,20 +475,13 @@ mod test {
failure: vec![],
};

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} is higher than current revision {}",
20, current_revision
))
.to_string();

let message = tonic::Status::from(
let expected_tonic_status = tonic::Status::from(
txn_request_with_future_revision
.check_revision(compacted_revision, current_revision)
.unwrap_err(),
)
.to_string();
);

assert_eq!(message, expected_err_message);
assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);

let txn_request_with_compacted_revision = TxnRequest {
compare: vec![],
Expand All @@ -514,20 +495,13 @@ mod test {
failure: vec![],
};

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} has been compacted, compacted revision is {}",
3, compacted_revision
))
.to_string();

let message = tonic::Status::from(
let expected_tonic_status = tonic::Status::from(
txn_request_with_compacted_revision
.check_revision(compacted_revision, current_revision)
.unwrap_err(),
)
.to_string();
);

assert_eq!(message, expected_err_message);
assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);
}

#[tokio::test]
Expand All @@ -537,24 +511,12 @@ mod test {
..Default::default()
};

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} is higher than current revision {}",
compact_request.revision, 8
))
.to_string();

let message =
tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()).to_string();
assert_eq!(message, expected_err_message);

let expected_err_message = tonic::Status::out_of_range(format!(
"required revision {} has been compacted, compacted revision is {}",
compact_request.revision, 13
))
.to_string();

let message =
tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()).to_string();
assert_eq!(message, expected_err_message);
let expected_tonic_status =
tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err());
assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);

let expected_tonic_status =
tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err());
assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange);
}
}
32 changes: 12 additions & 20 deletions xline/src/server/watch_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,17 @@ where
/// Handle watch event
async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) {
let watch_id = watch_event.watch_id();
let response = if watch_event.compacted() {
WatchResponse {
header: Some(ResponseHeader {
revision: watch_event.revision(),
..ResponseHeader::default()
}),
watch_id,
compact_revision: self.kv_watcher.compacted_revision(),
canceled: true,
..WatchResponse::default()
}
let mut response = WatchResponse {
header: Some(ResponseHeader {
revision: watch_event.revision(),
..ResponseHeader::default()
}),
watch_id,
..WatchResponse::default()
};
if watch_event.compacted() {
response.compact_revision = self.kv_watcher.compacted_revision();
response.canceled = true;
} else {
let mut events = watch_event.take_events();
if events.is_empty() {
Expand All @@ -318,15 +318,7 @@ where
}
}
}
WatchResponse {
header: Some(ResponseHeader {
revision: watch_event.revision(),
..ResponseHeader::default()
}),
watch_id,
events,
..WatchResponse::default()
}
response.events = events;
};

if self.response_tx.send(Ok(response)).await.is_err() {
Expand Down
25 changes: 19 additions & 6 deletions xline/src/storage/compact/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use curp::{client::Client, cmd::ProposeId, error::ProposeError};
use curp::{
client::Client,
cmd::ProposeId,
error::CommandProposeError::{AfterSync, Execute},
};
use event_listener::Event;
use periodic_compactor::PeriodicCompactor;
use revision_compactor::RevisionCompactor;
Expand All @@ -12,7 +16,7 @@ use uuid::Uuid;
use super::{
index::{Index, IndexOperate},
storage_api::StorageApi,
KvStore,
ExecuteError, KvStore,
};
use crate::{
revision_number::RevisionNumberGenerator,
Expand Down Expand Up @@ -45,21 +49,30 @@ pub(crate) trait Compactor: std::fmt::Debug + Send + Sync {
#[async_trait]
pub(crate) trait Compactable: std::fmt::Debug + Send + Sync {
/// do compact
async fn compact(&self, revision: i64) -> Result<(), ProposeError>;
async fn compact(&self, revision: i64) -> Result<(), ExecuteError>;
}

#[async_trait]
impl Compactable for Client<Command> {
async fn compact(&self, revision: i64) -> Result<(), ProposeError> {
async fn compact(&self, revision: i64) -> Result<(), ExecuteError> {
let request = CompactionRequest {
revision,
physical: false,
};
let request_wrapper = RequestWithToken::new_with_token(request.into(), None);
let propose_id = ProposeId::new(format!("auto-compactor-{}", Uuid::new_v4()));
let cmd = Command::new(vec![], request_wrapper, propose_id);
let _cmd_res = self.propose(cmd).await?;
Ok(())
if let Err(e) = self.propose(cmd).await {
#[allow(clippy::wildcard_enum_match_arm)]
match e {
Execute(e) | AfterSync(e) => Err(e),
_ => {
unreachable!("Compaction should not receive any errors other than ExecuteError, but it receives {e:?}");
}
}
} else {
Ok(())
}
}
}

Expand Down
25 changes: 18 additions & 7 deletions xline/src/storage/compact/periodic_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use event_listener::Event;
use tracing::{info, warn};

use super::{Compactable, Compactor};
use crate::revision_number::RevisionNumberGenerator;
use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError};

/// `RevisionWindow` is a ring buffer used to store periodically sampled revision.
struct RevisionWindow {
Expand Down Expand Up @@ -106,13 +106,24 @@ impl<C: Compactable> PeriodicCompactor<C> {
"starting auto periodic compaction, revision = {}, period = {:?}",
revision, self.period
);
// TODO: add more error processing logic

if let Err(e) = self.client.compact(revision).await {
warn!(
"failed auto revision compaction, revision = {}, period = {:?}, error: {:?}",
revision, self.period, e
);
None
if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e {
info!(
"required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}",
revision,
compacted_rev,
self.period,
now.elapsed().as_secs()
);
Some(compacted_rev)
} else {
warn!(
"failed auto revision compaction, revision = {}, period = {:?}, error: {:?}",
revision, self.period, e
);
None
}
} else {
info!(
"completed auto revision compaction, revision = {}, period = {:?}, took {:?}",
Expand Down
22 changes: 16 additions & 6 deletions xline/src/storage/compact/revision_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use event_listener::Event;
use tracing::{info, warn};

use super::{Compactable, Compactor};
use crate::revision_number::RevisionNumberGenerator;
use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError};

/// check for the need of compaction every 5 minutes
const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60);
Expand Down Expand Up @@ -67,11 +67,21 @@ impl<C: Compactable> RevisionCompactor<C> {
);
// TODO: add more error processing logic
if let Err(e) = self.client.compact(target_revision).await {
warn!(
"failed auto revision compaction, revision = {}, retention = {}, error: {:?}",
target_revision, self.retention, e
);
None
if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e {
info!(
"required revision {} has been compacted, the current compacted revision is {}, retention = {:?}",
target_revision,
compacted_rev,
self.retention,
);
Some(compacted_rev)
} else {
warn!(
"failed auto revision compaction, revision = {}, retention = {}, error: {:?}",
target_revision, self.retention, e
);
None
}
} else {
info!(
"completed auto revision compaction, revision = {}, retention = {}, took {:?}",
Expand Down
1 change: 0 additions & 1 deletion xline/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub(crate) mod snapshot_allocator;
pub(crate) mod storage_api;

pub use self::execute_error::ExecuteError;

pub(crate) use self::{
auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, revision::Revision,
};

0 comments on commit 356fb9d

Please sign in to comment.