Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: make initialize failure fatal error, release memory while task stopped. #16071

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 53 additions & 47 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,53 @@ where
self.meta_client.clone()
}

fn on_fatal_error_of_task(&self, task: &str, err: &Error) -> future![()] {
metrics::update_task_status(TaskStatus::Error, task);
let meta_cli = self.get_meta_client();
let pdc = self.pd_client.clone();
let store_id = self.store_id;
let sched = self.scheduler.clone();
let safepoint_name = self.pause_guard_id_for_task(task);
let safepoint_ttl = self.pause_guard_duration();
let code = err.error_code().code.to_owned();
let msg = err.to_string();
let task = task.to_owned();
async move {
let err_fut = async {
let safepoint = meta_cli.global_progress_of_task(&task).await?;
pdc.update_service_safe_point(
safepoint_name,
TimeStamp::new(safepoint.saturating_sub(1)),
safepoint_ttl,
)
.await?;
meta_cli.pause(&task).await?;
let mut last_error = StreamBackupError::new();
last_error.set_error_code(code);
last_error.set_error_message(msg.clone());
last_error.set_store_id(store_id);
last_error.set_happen_at(TimeStamp::physical_now());
meta_cli.report_last_error(&task, last_error).await?;
Result::Ok(())
};
if let Err(err_report) = err_fut.await {
err_report.report(format_args!("failed to upload error {}", err_report));
let name = task.to_owned();
// Let's retry reporting after 5s.
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
try_send!(
sched,
Task::FatalError(
TaskSelector::ByName(name),
Box::new(annotate!(err_report, "origin error: {}", msg))
)
);
});
}
}
}

fn on_fatal_error(&self, select: TaskSelector, err: Box<Error>) {
err.report_fatal();
let tasks = self
Expand All @@ -220,49 +267,7 @@ where
for task in tasks {
// Let's pause the task first.
self.unload_task(&task);
metrics::update_task_status(TaskStatus::Error, &task);

let meta_cli = self.get_meta_client();
let pdc = self.pd_client.clone();
let store_id = self.store_id;
let sched = self.scheduler.clone();
let safepoint_name = self.pause_guard_id_for_task(&task);
let safepoint_ttl = self.pause_guard_duration();
let code = err.error_code().code.to_owned();
let msg = err.to_string();
self.pool.block_on(async move {
let err_fut = async {
let safepoint = meta_cli.global_progress_of_task(&task).await?;
pdc.update_service_safe_point(
safepoint_name,
TimeStamp::new(safepoint.saturating_sub(1)),
safepoint_ttl,
)
.await?;
meta_cli.pause(&task).await?;
let mut last_error = StreamBackupError::new();
last_error.set_error_code(code);
last_error.set_error_message(msg.clone());
last_error.set_store_id(store_id);
last_error.set_happen_at(TimeStamp::physical_now());
meta_cli.report_last_error(&task, last_error).await?;
Result::Ok(())
};
if let Err(err_report) = err_fut.await {
err_report.report(format_args!("failed to upload error {}", err_report));
// Let's retry reporting after 5s.
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
try_send!(
sched,
Task::FatalError(
TaskSelector::ByName(task.to_owned()),
Box::new(annotate!(err_report, "origin error: {}", msg))
)
);
});
}
});
self.pool.block_on(self.on_fatal_error_of_task(&task, &err));
}
}

Expand Down Expand Up @@ -637,6 +642,9 @@ where
let run = async move {
let task_name = task.info.get_name();
let ranges = cli.ranges_of_task(task_name).await?;
fail::fail_point!("load_task::error_when_fetching_ranges", |_| {
Err(Error::Other("what range? no such thing, go away.".into()))
});
info!(
"register backup stream ranges";
"task" => ?task,
Expand Down Expand Up @@ -664,10 +672,8 @@ where
Result::Ok(())
};
if let Err(e) = run.await {
e.report(format!(
"failed to register backup stream task {} to router: ranges not found",
task_clone.info.get_name()
));
self.on_fatal_error_of_task(&task_clone.info.name, &Box::new(e))
.await;
}
});
metrics::update_task_status(TaskStatus::Running, &task_name);
Expand Down
14 changes: 13 additions & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,19 @@ impl<Store: MetaStore> MetadataClient<Store> {
Ok(())
}

pub async fn get_last_error(
pub async fn get_last_error(&self, name: &str) -> Result<Option<StreamBackupError>> {
let key = MetaKey::last_errors_of(name);

let r = self.meta_store.get_latest(Keys::Prefix(key)).await?.inner;
if r.is_empty() {
return Ok(None);
}
let r = &r[0];
let err = protobuf::parse_from_bytes(r.value())?;
Ok(Some(err))
}

pub async fn get_last_error_of(
&self,
name: &str,
store_id: u64,
Expand Down
34 changes: 34 additions & 0 deletions components/backup-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,28 @@ pub struct StreamTaskInfo {
temp_file_pool: Arc<TempFilePool>,
}

impl Drop for StreamTaskInfo {
fn drop(&mut self) {
let (success, failed): (Vec<_>, Vec<_>) = self
.flushing_files
.get_mut()
.drain(..)
.chain(self.flushing_meta_files.get_mut().drain(..))
.map(|(_, f, _)| f.inner.path().to_owned())
.map(|p| self.temp_file_pool.remove(&p))
.partition(|r| *r);
info!("stream task info dropped[1/2], removing flushing_temp files"; "success" => %success.len(), "failure" => %failed.len());
let (success, failed): (Vec<_>, Vec<_>) = self
.files
.get_mut()
.drain()
.map(|(_, f)| f.into_inner().inner.path().to_owned())
.map(|p| self.temp_file_pool.remove(&p))
.partition(|r| *r);
info!("stream task info dropped[2/2], removing temp files"; "success" => %success.len(), "failure" => %failed.len());
}
}

impl std::fmt::Debug for StreamTaskInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamTaskInfo")
Expand Down Expand Up @@ -2089,6 +2111,12 @@ mod tests {
let (task, _path) = task("cleanup_test".to_owned()).await?;
must_register_table(&router, task, 1).await;
write_simple_data(&router).await;
let tempfiles = router
.get_task_info("cleanup_test")
.await
.unwrap()
.temp_file_pool
.clone();
router
.get_task_info("cleanup_test")
.await?
Expand All @@ -2097,6 +2125,7 @@ mod tests {
write_simple_data(&router).await;
let mut w = walkdir::WalkDir::new(&tmp).into_iter();
assert!(w.next().is_some(), "the temp files doesn't created");
assert!(tempfiles.mem_used() > 0, "the temp files doesn't created.");
drop(router);
let w = walkdir::WalkDir::new(&tmp)
.into_iter()
Expand All @@ -2114,6 +2143,11 @@ mod tests {
"the temp files should be removed, but it is {:?}",
w
);
assert_eq!(
tempfiles.mem_used(),
0,
"the temp files hasn't been cleared."
);
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions components/backup-stream/src/tempfiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ impl TempFilePool {
&self.cfg
}

#[cfg(test)]
pub fn mem_used(&self) -> usize {
self.current.load(Ordering::Acquire)
}

/// Create a file for writting.
/// This function is synchronous so we can call it easier in the polling
/// context. (Anyway, it is really hard to call an async function in the
Expand Down
29 changes: 28 additions & 1 deletion components/backup-stream/tests/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ mod all {
use super::{
make_record_key, make_split_key_at_record, mutation, run_async_test, SuiteBuilder,
};
use crate::make_table_key;

#[test]
fn failed_register_task() {
let suite = SuiteBuilder::new_named("failed_register_task").build();
fail::cfg("load_task::error_when_fetching_ranges", "return").unwrap();
let cli = suite.get_meta_cli();
block_on(cli.insert_task_with_range(
&suite.simple_task("failed_register_task"),
&[(&make_table_key(1, b""), &make_table_key(2, b""))],
))
.unwrap();

for _ in 0..10 {
if block_on(cli.get_last_error_of("failed_register_task", 1))
.unwrap()
.is_some()
{
return;
}
std::thread::sleep(Duration::from_millis(100));
}

suite.dump_slash_etc();
panic!("No error uploaded when failed to comminate to PD.");
}

#[test]
fn basic() {
Expand Down Expand Up @@ -192,7 +218,8 @@ mod all {
suite.must_split(&make_split_key_at_record(1, 42));
std::thread::sleep(Duration::from_secs(2));

let error = run_async_test(suite.get_meta_cli().get_last_error("retry_abort", 1)).unwrap();
let error =
run_async_test(suite.get_meta_cli().get_last_error_of("retry_abort", 1)).unwrap();
let error = error.expect("no error uploaded");
error
.get_error_message()
Expand Down
2 changes: 1 addition & 1 deletion components/backup-stream/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ mod all {
let err = run_async_test(
suite
.get_meta_cli()
.get_last_error("test_fatal_error", *victim),
.get_last_error_of("test_fatal_error", *victim),
)
.unwrap()
.unwrap();
Expand Down
5 changes: 5 additions & 0 deletions components/backup-stream/tests/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ impl Suite {
MetadataClient::new(self.meta_store.clone(), 0)
}

#[allow(dead_code)]
pub fn dump_slash_etc(&self) {
self.meta_store.inner.blocking_lock().dump();
}

pub fn must_split(&mut self, key: &[u8]) {
let region = self.cluster.get_region(key);
self.cluster.must_split(&region, key);
Expand Down