Skip to content

Commit

Permalink
Manage sync threads failures (#49)
Browse files Browse the repository at this point in the history
This PR makes bunch of changes to ways how local sync 
and s3 sync threads are managed inside parseable. The 
goal is to make parseable main thread be aware of unwanted 
failure inside either of these dedicated sync thread. Additionally 
we want to have control over life cycle of these threads and 
have them stop on command, This can be later explored in 
future if we want to have graceful shutdown an also to 
guarantee more data consistency in case of failure.

This solution works something like this:

Sync threads are spawned with pair of one shot channels 
(named with suffix inbox and outbox) for communication with 
main thread.

inbox is sender type which can be used to stop the respective 
thread by breaking its scheduler loop. outbox is receiver type 
which is polled inside the main thread's looped select. So that 
whenever we receive a message from thread we stop.
scheduler is ran inside loop which runs pending tasks and polls 
the channel for any message from main thread 
( if any then stop the thread )

All of this is wrapped by a catch unwind. So in case of an 
unwinding because of panic there is a chance to let main thread 
know and handle accordingly. This is added as a safeguard for 
now but later we need to verify that local_sync and s3_sync 
don't have anything that can panic.

Fixes #43
  • Loading branch information
trueleo authored Aug 20, 2022
1 parent 4e539de commit 44c55e6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 23 deletions.
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ structopt = { version = "0.3.25" }
sysinfo = "0.20.5"
thiserror = "1"
tokio-stream = "0.1.8"
tokio = { version = "1.13.1", default-features = false, features=["sync", "macros"] }
clokwerk = "0.4.0-rc1"
actix-web-static-files = "4.0"
static-files = "0.2.1"
Expand Down
125 changes: 103 additions & 22 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer};
use actix_web_httpauth::extractors::basic::BasicAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use actix_web_static_files::ResourceFiles;
use clokwerk::{AsyncScheduler, TimeUnits};
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
use log::warn;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

use std::thread;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

mod banner;
mod error;
Expand Down Expand Up @@ -63,33 +67,110 @@ async fn main() -> anyhow::Result<()> {
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
warn!("could not populate local metadata. {:?}", e);
}
thread::spawn(sync);
run_http().await?;

Ok(())
}
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();

#[actix_web::main]
async fn sync() {
let mut scheduler = AsyncScheduler::new();
scheduler
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().local_sync().await {
warn!("failed to sync local data. {:?}", e);
let app = run_http();
tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
s3sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
localsync_handler.join().unwrap_or(());
s3sync_handler.join().unwrap_or(());
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to disc. This can happen due to critical error in disc or environment. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut s3sync_outbox => {
// s3sync failed, this is recoverable by just starting s3sync thread again
s3sync_handler.join().unwrap_or(());
(s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync();
}
};
}
}

fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
let handle = thread::spawn(move || {
let res = catch_unwind(move || {
let rt = actix_web::rt::System::new();
rt.block_on(async {
let mut scheduler = AsyncScheduler::new();
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().s3_sync().await {
warn!("failed to sync local data with object store. {:?}", e);
}
});

loop {
scheduler.run_pending().await;
match AssertUnwindSafe(|| inbox_rx.try_recv())() {
Ok(_) => break,
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Closed) => {
// should be unreachable but breaking anyways
break;
}
}
}
})
});
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.run(|| async {
if let Err(e) = S3::new().s3_sync().await {
warn!("failed to sync local data with object store. {:?}", e);

if res.is_err() {
outbox_tx.send(()).unwrap();
}
});

(handle, outbox_rx, inbox_tx)
}

fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
let handle = thread::spawn(move || {
let res = catch_unwind(move || {
let mut scheduler = Scheduler::new();
scheduler
.every((storage::LOCAL_SYNC_INTERVAL as u32).seconds())
.run(move || {
if let Err(e) = S3::new().local_sync() {
warn!("failed to sync local data. {:?}", e);
}
});

loop {
thread::sleep(Duration::from_millis(50));
scheduler.run_pending();
match AssertUnwindSafe(|| inbox_rx.try_recv())() {
Ok(_) => break,
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Closed) => {
// should be unreachable but breaking anyways
break;
}
}
}
});

loop {
scheduler.run_pending().await;
}
if res.is_err() {
outbox_tx.send(()).unwrap();
}
});

(handle, outbox_rx, inbox_tx)
}

async fn validator(
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait ObjectStorage: Sync + 'static {
query: &Query,
results: &mut Vec<RecordBatch>,
) -> Result<(), ObjectStorageError>;
async fn local_sync(&self) -> io::Result<()> {
fn local_sync(&self) -> io::Result<()> {
// If the local data path doesn't exist yet, return early.
// This method will be called again after next ticker interval
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
Expand Down

0 comments on commit 44c55e6

Please sign in to comment.