Skip to content

Commit

Permalink
Add vacuum job
Browse files Browse the repository at this point in the history
Signed-off-by: trivernis <trivernis@protonmail.com>
  • Loading branch information
Trivernis committed Mar 8, 2022
1 parent 496d720 commit 0cb37e1
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 16 deletions.
1 change: 1 addition & 0 deletions mediarepo-daemon/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions mediarepo-daemon/mediarepo-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ path = "../mediarepo-core"
[dependencies.mediarepo-logic]
path = "../mediarepo-logic"

[dependencies.mediarepo-database]
path = "../mediarepo-database"

[dependencies.tokio]
version = "1.17.0"
features = ["macros"]
Expand Down
58 changes: 58 additions & 0 deletions mediarepo-daemon/mediarepo-worker/src/execution_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
}

pub struct ExecutionStateSynchronizer {
state: Arc<AtomicU8>,
}

impl Default for ExecutionStateSynchronizer {
fn default() -> Self {
Self {
state: Arc::new(AtomicU8::new(0)),
}
}
}

impl ExecutionStateSynchronizer {
pub fn set_scheduled(&self) {
self.state.store(0, Ordering::Relaxed);
}

#[must_use]
pub fn set_running(&self) -> RunningHandle {
self.state.store(1, Ordering::Relaxed);
RunningHandle {
state: Arc::clone(&self.state),
}
}

pub fn set_finished(&self) {
self.state.store(2, Ordering::SeqCst)
}

pub fn state(&self) -> JobExecutionState {
match self.state.load(Ordering::SeqCst) {
0 => JobExecutionState::Scheduled,
1 => JobExecutionState::Running,
2 => JobExecutionState::Scheduled,
_ => JobExecutionState::Finished,
}
}
}

pub struct RunningHandle {
state: Arc<AtomicU8>,
}

impl Drop for RunningHandle {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
18 changes: 7 additions & 11 deletions mediarepo-daemon/mediarepo-worker/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::progress::JobProgressUpdate;
mod vacuum;

pub use vacuum::*;

use crate::execution_state::JobExecutionState;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
Expand All @@ -9,14 +14,5 @@ use tokio::sync::mpsc::Sender;
pub trait ScheduledJob {
async fn set_state(&self, state: StateData) -> RepoResult<()>;

async fn run(&self, sender: &mut Sender<JobProgressUpdate>, repo: Repo) -> RepoResult<()>;

fn execution_state(&self) -> JobExecutionState;
}

#[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum JobExecutionState {
Scheduled,
Running,
Finished,
async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()>;
}
29 changes: 29 additions & 0 deletions mediarepo-daemon/mediarepo-worker/src/jobs/vacuum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState};
use crate::jobs::ScheduledJob;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use async_trait::async_trait;
use mediarepo_core::error::RepoResult;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

#[derive(Default, Clone)]
pub struct VacuumJob;

#[async_trait]
impl ScheduledJob for VacuumJob {
async fn set_state(&self, _: StateData) -> RepoResult<()> {
Ok(())
}

async fn run(&self, sender: &ProgressSender, repo: Repo) -> RepoResult<()> {
sender.send_progress_percent(0.0);
repo.job().vacuum().await?;
sender.send_progress_percent(1.0);

Ok(())
}
}
1 change: 1 addition & 0 deletions mediarepo-daemon/mediarepo-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod execution_state;
pub mod jobs;
pub mod jobs_table;
pub mod progress;
Expand Down
32 changes: 31 additions & 1 deletion mediarepo-daemon/mediarepo-worker/src/progress.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::jobs::JobExecutionState;
use crate::execution_state::{ExecutionStateSynchronizer, JobExecutionState, RunningHandle};
use tokio::sync::mpsc::Sender;

#[derive(Clone, Debug)]
pub struct JobProgressUpdate {
Expand Down Expand Up @@ -46,3 +47,32 @@ impl JobProgressUpdate {
self.total = Some(total)
}
}

pub struct ProgressSender {
job_id: i64,
execution_state_sync: ExecutionStateSynchronizer,
pub inner: Sender<JobProgressUpdate>,
}

impl ProgressSender {
pub fn new(job_id: i64, sender: Sender<JobProgressUpdate>) -> Self {
Self {
job_id,
inner: sender,
execution_state_sync: ExecutionStateSynchronizer::default(),
}
}

pub fn send_progress(&self, progress: u64, total: u64) {
let _ = self.inner.send(JobProgressUpdate {
id: self.job_id,
state: JobExecutionState::Running,
progress: Some(progress),
total: Some(total),
});
}

pub fn send_progress_percent(&self, percent: f64) {
self.send_progress((percent * 100.0) as u64, 100);
}
}
26 changes: 22 additions & 4 deletions mediarepo-daemon/mediarepo-worker/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::jobs::{JobExecutionState, ScheduledJob};
use crate::execution_state::JobExecutionState;
use crate::jobs::{ScheduledJob, VacuumJob};
use crate::jobs_table::JobsTable;
use crate::progress::JobProgressUpdate;
use crate::progress::{JobProgressUpdate, ProgressSender};
use crate::state_data::StateData;
use mediarepo_core::error::RepoResult;
use mediarepo_core::futures::select;
use mediarepo_core::settings::LogLevel::Debug;
use mediarepo_core::tokio_graceful_shutdown::SubsystemHandle;
use mediarepo_database::entities::job::JobType;
use mediarepo_logic::dao::repo::Repo;
use mediarepo_logic::dao::DaoProvider;
use mediarepo_logic::dto::JobDto;
Expand Down Expand Up @@ -90,9 +92,11 @@ impl Scheduler {
progress.set_state(JobExecutionState::Running);
let _ = sender.send(progress.clone()).await;

if let Err(e) = scheduled_job.run(&mut sender, repo).await {
let progress_sender = ProgressSender::new(progress.id(), sender);
if let Err(e) = scheduled_job.run(&progress_sender, repo).await {
tracing::error!("error occurred during job execution: {}", e);
}
let sender = progress_sender.inner;
progress.set_state(JobExecutionState::Finished);
let _ = sender.send(progress).await;
});
Expand Down Expand Up @@ -160,5 +164,19 @@ impl Scheduler {
}

fn create_job(dto: JobDto) -> Box<dyn ScheduledJob + Send + Sync> {
todo!("implement")
match dto.job_type() {
JobType::MigrateCDs => {
todo!()
}
JobType::CalculateSizes => {
todo!()
}
JobType::GenerateThumbs => {
todo!()
}
JobType::CheckIntegrity => {
todo!()
}
JobType::Vacuum => Box::new(VacuumJob),
}
}

0 comments on commit 0cb37e1

Please sign in to comment.