Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit af45314

Browse files
authored
Namespace forking (#646)
* add namespace fork endpoint * add fork method to namespace store * implement general forking mecanism * refactor namespace module * fix bottomless did_recover * fix clippy * handle fork replica error * fix rebase quirks * fix rebase issues * add docstring to restore method
1 parent 045d865 commit af45314

File tree

9 files changed

+270
-47
lines changed

9 files changed

+270
-47
lines changed

bottomless/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ pub extern "C" fn xGetPathname(buf: *mut c_char, orig: *const c_char, orig_len:
415415

416416
async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
417417
match replicator.restore(None, None).await {
418-
Ok(replicator::RestoreAction::SnapshotMainDbFile) => {
418+
Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => {
419419
replicator.new_generation();
420420
match replicator.snapshot_main_db_file(None).await {
421421
Ok(Some(h)) => {
@@ -437,7 +437,7 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
437437
return ffi::SQLITE_CANTOPEN;
438438
}
439439
}
440-
Ok(replicator::RestoreAction::ReuseGeneration(gen)) => {
440+
Ok((replicator::RestoreAction::ReuseGeneration(gen), _)) => {
441441
replicator.set_generation(gen);
442442
}
443443
Err(e) => {

bottomless/src/replicator.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -962,12 +962,13 @@ impl Replicator {
962962
Some((first_frame_no, last_frame_no, timestamp, compression_kind))
963963
}
964964

965-
// Restores the database state from given remote generation
965+
/// Restores the database state from given remote generation
966+
/// On success, returns the RestoreAction, and whether the database was recovered from backup.
966967
async fn restore_from(
967968
&mut self,
968969
generation: Uuid,
969970
utc_time: Option<NaiveDateTime>,
970-
) -> Result<RestoreAction> {
971+
) -> Result<(RestoreAction, bool)> {
971972
if let Some(tombstone) = self.get_tombstone().await? {
972973
if let Some(timestamp) = Self::generation_to_timestamp(&generation) {
973974
if tombstone.timestamp() as u64 >= timestamp.to_unix().0 {
@@ -989,7 +990,7 @@ impl Replicator {
989990
let last_frame = self.get_last_consistent_frame(&generation).await?;
990991
tracing::debug!("Last consistent remote frame in generation {generation}: {last_frame}.");
991992
if let Some(action) = self.compare_with_local(generation, last_frame).await? {
992-
return Ok(action);
993+
return Ok((action, false));
993994
}
994995

995996
// at this point we know, we should do a full restore
@@ -1071,11 +1072,11 @@ impl Replicator {
10711072

10721073
if applied_wal_frame {
10731074
tracing::info!("WAL file has been applied onto database file in generation {}. Requesting snapshot.", generation);
1074-
Ok::<_, anyhow::Error>(RestoreAction::SnapshotMainDbFile)
1075+
Ok::<_, anyhow::Error>((RestoreAction::SnapshotMainDbFile, true))
10751076
} else {
10761077
tracing::info!("Reusing generation {}.", generation);
10771078
// since WAL was not applied, we can reuse the latest generation
1078-
Ok::<_, anyhow::Error>(RestoreAction::ReuseGeneration(generation))
1079+
Ok::<_, anyhow::Error>((RestoreAction::ReuseGeneration(generation), true))
10791080
}
10801081
}
10811082

@@ -1299,19 +1300,20 @@ impl Replicator {
12991300
Ok(())
13001301
}
13011302

1302-
// Restores the database state from newest remote generation
1303+
/// Restores the database state from newest remote generation
1304+
/// On success, returns the RestoreAction, and whether the database was recovered from backup.
13031305
pub async fn restore(
13041306
&mut self,
13051307
generation: Option<Uuid>,
13061308
timestamp: Option<NaiveDateTime>,
1307-
) -> Result<RestoreAction> {
1309+
) -> Result<(RestoreAction, bool)> {
13081310
let generation = match generation {
13091311
Some(gen) => gen,
13101312
None => match self.latest_generation_before(timestamp.as_ref()).await {
13111313
Some(gen) => gen,
13121314
None => {
13131315
tracing::debug!("No generation found, nothing to restore");
1314-
return Ok(RestoreAction::SnapshotMainDbFile);
1316+
return Ok((RestoreAction::SnapshotMainDbFile, false));
13151317
}
13161318
},
13171319
};

sqld/src/admin_api.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,25 @@ use crate::connection::config::{DatabaseConfig, DatabaseConfigStore};
1515
use crate::error::LoadDumpError;
1616
use crate::namespace::{DumpStream, MakeNamespace, NamespaceStore, RestoreOption};
1717

18-
struct AppState<F: MakeNamespace> {
18+
struct AppState<M: MakeNamespace> {
1919
db_config_store: Arc<DatabaseConfigStore>,
20-
namespaces: Arc<NamespaceStore<F>>,
20+
namespaces: Arc<NamespaceStore<M>>,
2121
}
2222

23-
pub async fn run_admin_api<F: MakeNamespace>(
23+
pub async fn run_admin_api<M: MakeNamespace>(
2424
addr: SocketAddr,
2525
db_config_store: Arc<DatabaseConfigStore>,
26-
namespaces: Arc<NamespaceStore<F>>,
26+
namespaces: Arc<NamespaceStore<M>>,
2727
) -> anyhow::Result<()> {
2828
use axum::routing::{get, post};
2929
let router = axum::Router::new()
3030
.route("/", get(handle_get_index))
3131
.route("/v1/config", get(handle_get_config))
3232
.route("/v1/block", post(handle_post_block))
33+
.route(
34+
"/v1/namespaces/:namespace/fork/:to",
35+
post(handle_fork_namespace),
36+
)
3337
.route(
3438
"/v1/namespaces/:namespace/create",
3539
post(handle_create_namespace),
@@ -60,8 +64,8 @@ async fn handle_get_index() -> &'static str {
6064
"Welcome to the sqld admin API"
6165
}
6266

63-
async fn handle_get_config<F: MakeNamespace>(
64-
State(app_state): State<Arc<AppState<F>>>,
67+
async fn handle_get_config<M: MakeNamespace>(
68+
State(app_state): State<Arc<AppState<M>>>,
6569
) -> Json<Arc<DatabaseConfig>> {
6670
Json(app_state.db_config_store.get())
6771
}
@@ -79,8 +83,8 @@ struct CreateNamespaceReq {
7983
dump_url: Option<Url>,
8084
}
8185

82-
async fn handle_post_block<F: MakeNamespace>(
83-
State(app_state): State<Arc<AppState<F>>>,
86+
async fn handle_post_block<M: MakeNamespace>(
87+
State(app_state): State<Arc<AppState<M>>>,
8488
Json(req): Json<BlockReq>,
8589
) -> (axum::http::StatusCode, &'static str) {
8690
let mut config = (*app_state.db_config_store.get()).clone();
@@ -97,8 +101,8 @@ async fn handle_post_block<F: MakeNamespace>(
97101
}
98102
}
99103

100-
async fn handle_create_namespace<F: MakeNamespace>(
101-
State(app_state): State<Arc<AppState<F>>>,
104+
async fn handle_create_namespace<M: MakeNamespace>(
105+
State(app_state): State<Arc<AppState<M>>>,
102106
Path(namespace): Path<String>,
103107
Json(req): Json<CreateNamespaceReq>,
104108
) -> crate::Result<()> {
@@ -111,6 +115,14 @@ async fn handle_create_namespace<F: MakeNamespace>(
111115
Ok(())
112116
}
113117

118+
async fn handle_fork_namespace<M: MakeNamespace>(
119+
State(app_state): State<Arc<AppState<M>>>,
120+
Path((from, to)): Path<(String, String)>,
121+
) -> crate::Result<()> {
122+
app_state.namespaces.fork(from.into(), to.into()).await?;
123+
Ok(())
124+
}
125+
114126
async fn dump_stream_from_url(url: &Url) -> Result<DumpStream, LoadDumpError> {
115127
match url.scheme() {
116128
"http" => {

sqld/src/error.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use hyper::StatusCode;
33
use tonic::metadata::errors::InvalidMetadataValueBytes;
44

55
use crate::{
6-
auth::AuthError, query_result_builder::QueryResultBuilderError,
6+
auth::AuthError, namespace::ForkError, query_result_builder::QueryResultBuilderError,
77
replication::replica::error::ReplicationError,
88
};
99

@@ -77,6 +77,8 @@ pub enum Error {
7777
LoadDumpExistingDb,
7878
#[error("cannot restore database when conflicting params were provided")]
7979
ConflictingRestoreParameters,
80+
#[error("failed to fork database: {0}")]
81+
Fork(#[from] ForkError),
8082
}
8183

8284
trait ResponseError: std::error::Error {
@@ -126,6 +128,7 @@ impl IntoResponse for Error {
126128
ReplicaRestoreError => self.format_err(StatusCode::BAD_REQUEST),
127129
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
128130
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
131+
Fork(e) => e.into_response(),
129132
}
130133
}
131134
}
@@ -181,3 +184,17 @@ impl IntoResponse for LoadDumpError {
181184
}
182185
}
183186
}
187+
188+
impl ResponseError for ForkError {}
189+
190+
impl IntoResponse for ForkError {
191+
fn into_response(self) -> axum::response::Response {
192+
match self {
193+
ForkError::Internal(_)
194+
| ForkError::Io(_)
195+
| ForkError::LogRead(_)
196+
| ForkError::CreateNamespace(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
197+
ForkError::ForkReplica => self.format_err(StatusCode::BAD_REQUEST),
198+
}
199+
}
200+
}

sqld/src/lib.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -443,21 +443,18 @@ pub async fn init_bottomless_replicator(
443443
.ok_or_else(|| anyhow::anyhow!("Invalid db path"))?
444444
.to_owned();
445445
let mut replicator = bottomless::replicator::Replicator::with_options(path, options).await?;
446-
let mut did_recover = false;
447446

448447
let (generation, timestamp) = match restore_option {
449448
RestoreOption::Latest | RestoreOption::Dump(_) => (None, None),
450449
RestoreOption::Generation(generation) => (Some(*generation), None),
451450
RestoreOption::PointInTime(timestamp) => (None, Some(*timestamp)),
452451
};
453452

454-
match replicator.restore(generation, timestamp).await? {
453+
let (action, did_recover) = replicator.restore(generation, timestamp).await?;
454+
match action {
455455
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
456456
replicator.new_generation();
457-
if let Some(handle) = replicator.snapshot_main_db_file(None).await? {
458-
handle.await?;
459-
did_recover = true;
460-
}
457+
replicator.snapshot_main_db_file(None).await?;
461458
// Restoration process only leaves the local WAL file if it was
462459
// detected to be newer than its remote counterpart.
463460
replicator.maybe_replicate_wal().await?

sqld/src/namespace/fork.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use std::io::SeekFrom;
2+
use std::path::PathBuf;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use bytes::Bytes;
7+
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
8+
use tokio_stream::StreamExt;
9+
10+
use crate::database::PrimaryDatabase;
11+
use crate::replication::frame::Frame;
12+
use crate::replication::primary::frame_stream::FrameStream;
13+
use crate::replication::{LogReadError, ReplicationLogger};
14+
15+
use super::{MakeNamespace, RestoreOption};
16+
17+
// FIXME: get this const from somewhere else (crate wide)
18+
const PAGE_SIZE: usize = 4096;
19+
20+
type Result<T> = crate::Result<T, ForkError>;
21+
22+
#[derive(Debug, thiserror::Error)]
23+
pub enum ForkError {
24+
#[error("internal error: {0}")]
25+
Internal(anyhow::Error),
26+
#[error("io error: {0}")]
27+
Io(#[from] std::io::Error),
28+
#[error("failed to read frame from replication log: {0}")]
29+
LogRead(anyhow::Error),
30+
#[error("an error occured creating the namespace: {0}")]
31+
CreateNamespace(Box<crate::error::Error>),
32+
#[error("cannot fork a replica, try again with the primary.")]
33+
ForkReplica,
34+
}
35+
36+
impl From<tokio::task::JoinError> for ForkError {
37+
fn from(e: tokio::task::JoinError) -> Self {
38+
Self::Internal(e.into())
39+
}
40+
}
41+
42+
async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
43+
let page_no = frame.header().page_no;
44+
let page_pos = (page_no - 1) as usize * PAGE_SIZE;
45+
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
46+
temp_file.write_all(frame.page()).await?;
47+
48+
Ok(())
49+
}
50+
51+
pub struct ForkTask<'a> {
52+
pub base_path: PathBuf,
53+
pub logger: Arc<ReplicationLogger>,
54+
pub dest_namespace: Bytes,
55+
pub make_namespace: &'a dyn MakeNamespace<Database = PrimaryDatabase>,
56+
}
57+
58+
impl ForkTask<'_> {
59+
pub async fn fork(self) -> Result<super::Namespace<PrimaryDatabase>> {
60+
match self.try_fork().await {
61+
Err(e) => {
62+
let _ = tokio::fs::remove_dir_all(
63+
self.base_path
64+
.join("dbs")
65+
.join(std::str::from_utf8(&self.dest_namespace).unwrap()),
66+
)
67+
.await;
68+
Err(e)
69+
}
70+
Ok(ns) => Ok(ns),
71+
}
72+
}
73+
74+
async fn try_fork(&self) -> Result<super::Namespace<PrimaryDatabase>> {
75+
// until what index to replicate
76+
let base_path = self.base_path.clone();
77+
let temp_dir =
78+
tokio::task::spawn_blocking(move || tempfile::tempdir_in(base_path)).await??;
79+
let mut data_file = tokio::fs::File::create(temp_dir.path().join("data")).await?;
80+
81+
let logger = self.logger.clone();
82+
let end_frame_no = *logger.new_frame_notifier.borrow();
83+
let mut next_frame_no = 0;
84+
while next_frame_no < end_frame_no {
85+
let mut streamer = FrameStream::new(logger.clone(), next_frame_no, false, None)
86+
.map_err(|e| ForkError::LogRead(e.into()))?;
87+
while let Some(res) = streamer.next().await {
88+
match res {
89+
Ok(frame) => {
90+
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
91+
write_frame(frame, &mut data_file).await?;
92+
}
93+
Err(LogReadError::SnapshotRequired) => {
94+
let snapshot = loop {
95+
if let Some(snap) = logger
96+
.get_snapshot_file(next_frame_no)
97+
.map_err(ForkError::Internal)?
98+
{
99+
break snap;
100+
}
101+
102+
// the snapshot must exist, it is just not yet available.
103+
tokio::time::sleep(Duration::from_millis(100)).await;
104+
};
105+
106+
let iter = snapshot.frames_iter_from(next_frame_no);
107+
for frame in iter {
108+
let frame = frame.map_err(ForkError::LogRead)?;
109+
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
110+
write_frame(frame, &mut data_file).await?;
111+
}
112+
}
113+
Err(LogReadError::Ahead) => {
114+
unreachable!("trying to fork ahead of the forked database!")
115+
}
116+
Err(LogReadError::Error(e)) => return Err(ForkError::LogRead(e)),
117+
}
118+
}
119+
}
120+
121+
let dest_path = self
122+
.base_path
123+
.join("dbs")
124+
.join(std::str::from_utf8(&self.dest_namespace).unwrap());
125+
tokio::fs::rename(temp_dir.path(), dest_path).await?;
126+
127+
tokio::io::stdin().read_i8().await.unwrap();
128+
self.make_namespace
129+
.create(self.dest_namespace.clone(), RestoreOption::Latest, true)
130+
.await
131+
.map_err(|e| ForkError::CreateNamespace(Box::new(e)))
132+
}
133+
}

0 commit comments

Comments
 (0)