Skip to content

Commit

Permalink
Add MP4 to HLS processing using a queue system (#10)
Browse files Browse the repository at this point in the history
* Add basic video to stream implementation

* Add queue module with video processing job

* Update README
  • Loading branch information
sneakycrow authored Nov 4, 2024
1 parent 5e5a8b4 commit 4618c0b
Show file tree
Hide file tree
Showing 10 changed files with 923 additions and 9 deletions.
237 changes: 235 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ["packages/api", "packages/db", "packages/vod"]
members = ["packages/api", "packages/db", "packages/vod", "packages/queue"]
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,13 @@
[![build api](https://github.com/sneakycrow/farmhand/actions/workflows/build-api.yml/badge.svg)](https://github.com/sneakycrow/farmhand/actions/workflows/build-api.yml)
[![build web](https://github.com/sneakycrow/farmhand/actions/workflows/build-web.yml/badge.svg)](https://github.com/sneakycrow/farmhand/actions/workflows/build-web.yml)

farmhand is a collection of tools for hosting, creating, and managing vods and clips from livestreams
Farmhand is a powerful, open-source clip and VOD management system built for creators and artists who want more control over their content.

## Features

- ✂️ **Advanced Clip Editor** - Create perfect moments with our Twitch-style clip editor, giving you precise control over your highlights
- 🔗 **Smart Backlinking** - Jump straight to the source with clip backlinking, letting viewers watch the full VOD from any clip's timestamp
- 🤖 **Intelligent Analysis** - Automatically detect clip-worthy moments through advanced video and audio analysis
- 🏷️ **Chat-Aware Tagging** - Automatically tag clips based on chat activity and reactions, making content discovery a breeze
- 🔍 **Smart Filtering** - Find the perfect clip with flexible filtering based on automatic tags like 'exciting' or 'Pog'
- 💚 **Open Source** - Free and open source forever. Built by creators, for creators
26 changes: 26 additions & 0 deletions packages/queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "queue"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
async-trait = "0.1"
chrono = "0.4"
db = { path = "../db" }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
thiserror = "1"
sqlx = { version = "0.7", features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
"uuid",
"json",
] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ulid = { version = "1", features = ["uuid"] }
uuid = { version = "1", features = ["serde", "v4"] }
vod = { path = "../vod" }
32 changes: 32 additions & 0 deletions packages/queue/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("Bad config: {0}")]
BadConfig(String),
#[error("Connecting to database: {0}")]
ConnectingToDatabase(String),
#[error("Internal error: {0}")]
Internal(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Migrating database: {0}")]
DatabaseMigration(String),
#[error("Invalid job message: {0}")]
InvalidJobMessage(String),
#[error("Video Processing: {0}")]
VideoProcessingError(String),
}

impl std::convert::From<sqlx::Error> for Error {
fn from(err: sqlx::Error) -> Self {
match err {
sqlx::Error::RowNotFound => Error::NotFound("row not found".into()),
_ => Error::Internal(err.to_string()),
}
}
}

impl std::convert::From<sqlx::migrate::MigrateError> for Error {
fn from(err: sqlx::migrate::MigrateError) -> Self {
Error::DatabaseMigration(err.to_string())
}
}
35 changes: 35 additions & 0 deletions packages/queue/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::{Job, Message};
use sqlx::types::Json;
use uuid::Uuid;

/// A Postgres representation of a Job
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct PostgresJob {
id: Uuid,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,

scheduled_for: chrono::DateTime<chrono::Utc>,
failed_attempts: i32,
status: PostgresJobStatus,
message: Json<Message>,
}

/// The different status' that a job can be in
// We use a INT as Postgres representation for performance reasons
#[derive(Debug, Clone, sqlx::Type, PartialEq)]
#[repr(i32)]
pub enum PostgresJobStatus {
Queued,
Running,
Failed,
}

impl From<PostgresJob> for Job {
fn from(item: PostgresJob) -> Self {
Job {
id: item.id,
message: item.message.0,
}
}
}
151 changes: 151 additions & 0 deletions packages/queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
mod error;
pub mod job;
pub mod runner;

use crate::error::Error;
use crate::job::{PostgresJob, PostgresJobStatus};
use serde::{Deserialize, Serialize};
use sqlx::{self, types::Json, PgPool};
use std::fmt::Debug;
use ulid::Ulid;
use uuid::Uuid;

#[async_trait::async_trait]
pub trait Queue: Send + Sync + Debug {
/// pushes a job to the queue
async fn push(
&self,
job: Message,
scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error>;
/// pull fetches at most `number_of_jobs` from the queue.
async fn pull(&self, number_of_jobs: i32) -> Result<Vec<Job>, Error>;
/// deletes a job from the queue
async fn delete_job(&self, job_id: Uuid) -> Result<(), Error>;
/// fails a job in the queue
async fn fail_job(&self, job_id: Uuid) -> Result<(), Error>;
/// clears the queue
async fn clear(&self) -> Result<(), Error>;
}

/// The job to be processed, containing the message payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
pub id: Uuid,
pub message: Message,
}

/// The payload of the job, containing the different jobs and their required data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
ProcessRawVideoIntoStream { video_id: String },
}

/// The queue itself
#[derive(Debug, Clone)]
pub struct PostgresQueue {
db: PgPool,
max_attempts: u32,
}

impl PostgresQueue {
pub fn new(db: PgPool) -> PostgresQueue {
let queue = PostgresQueue {
db,
max_attempts: 5,
};

queue
}
}

#[async_trait::async_trait]
impl Queue for PostgresQueue {
async fn push(
&self,
job: Message,
date: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error> {
let scheduled_for = date.unwrap_or(chrono::Utc::now());
let failed_attempts: i32 = 0;
let message = Json(job);
let status = PostgresJobStatus::Queued;
let now = chrono::Utc::now();
let job_id: Uuid = Ulid::new().into();
let query = "INSERT INTO queue
(id, created_at, updated_at, scheduled_for, failed_attempts, status, message)
VALUES ($1, $2, $3, $4, $5, $6, $7)";

sqlx::query(query)
.bind(job_id)
.bind(now)
.bind(now)
.bind(scheduled_for)
.bind(failed_attempts)
.bind(status)
.bind(message)
.execute(&self.db)
.await?;
Ok(())
}

async fn delete_job(&self, job_id: Uuid) -> Result<(), Error> {
let query = "DELETE FROM queue WHERE id = $1";

sqlx::query(query).bind(job_id).execute(&self.db).await?;
Ok(())
}

async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> {
let now = chrono::Utc::now();
let query = "UPDATE queue
SET status = $1, updated_at = $2, failed_attempts = failed_attempts + 1
WHERE id = $3";

sqlx::query(query)
.bind(PostgresJobStatus::Queued)
.bind(now)
.bind(job_id)
.execute(&self.db)
.await?;
Ok(())
}

async fn pull(&self, number_of_jobs: i32) -> Result<Vec<Job>, Error> {
let number_of_jobs = if number_of_jobs > 100 {
100
} else {
number_of_jobs
};
let now = chrono::Utc::now();
let query = "UPDATE queue
SET status = $1, updated_at = $2
WHERE id IN (
SELECT id
FROM queue
WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5
ORDER BY scheduled_for
FOR UPDATE SKIP LOCKED
LIMIT $6
)
RETURNING *";

let jobs: Vec<PostgresJob> = sqlx::query_as::<_, PostgresJob>(query)
.bind(PostgresJobStatus::Running)
.bind(now)
.bind(PostgresJobStatus::Queued)
.bind(now)
.bind(self.max_attempts as i32)
.bind(number_of_jobs)
.fetch_all(&self.db)
.await?;
Ok(jobs.into_iter().map(Into::into).collect())
}

async fn clear(&self) -> Result<(), Error> {
let query = "DELETE FROM queue";

sqlx::query(query).execute(&self.db).await?;
Ok(())
}
}
119 changes: 119 additions & 0 deletions packages/queue/src/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::error::Error;
use crate::{Job, Message, Queue};
use db::Video;
use futures::{stream, StreamExt};
use sqlx::{Pool, Postgres};
use std::{path::PathBuf, sync::Arc, time::Duration};
use vod::Vod;

/// Runs a loop that pulls jobs from the queue and runs <concurrency> jobs each loop
pub async fn run_worker(queue: Arc<dyn Queue>, concurrency: usize, db_conn: &Pool<Postgres>) {
loop {
// Pulls jobs from the queue
let jobs = match queue.pull(concurrency as i32).await {
Ok(jobs) => jobs,
Err(err) => {
// Trace the error
tracing::error!("runner: error pulling jobs {}", err);
// Go to sleep and try again
tokio::time::sleep(Duration::from_millis(500)).await;
Vec::new()
}
};
// Just for debugging the amount of jobs a queue has pulled in
let number_of_jobs = jobs.len();
if number_of_jobs > 0 {
tracing::debug!("Fetched {} jobs", number_of_jobs);
}
// Run each jobs concurrently
stream::iter(jobs)
.for_each_concurrent(concurrency, |job| async {
tracing::debug!("Starting job {}", job.id);
let job_id = job.id;

let res = match handle_job(job, db_conn).await {
Ok(_) => queue.delete_job(job_id).await,
Err(err) => {
println!("run_worker: handling job({}): {}", job_id, &err);
queue.fail_job(job_id).await
}
};

match res {
Ok(_) => {}
Err(err) => {
println!("run_worker: deleting / failing job: {}", &err);
}
}
})
.await;
// Take a break for a bit, we don't need to run every moment (our jobs are unlikely to complete that quickly)
tokio::time::sleep(Duration::from_millis(125)).await;
}
}

/// Individually processes a single job, based on its Job message type
async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
match job.message {
Message::ProcessRawVideoIntoStream { video_id } => {
// First, update video status to Processing
sqlx::query(
"UPDATE videos SET processing_status = 'processing', updated_at = NOW() WHERE id = $1"
)
.bind(video_id)
.execute(db)
.await?;

// Get video details from database
let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1")
.bind(video_id)
.fetch_one(db)
.await?;

// Create output directory path
let output_dir = PathBuf::from("processed_videos").join(video_id.to_string());

// Create VOD instance
let vod = Vod::new(PathBuf::from(&video.raw_video_path));

// Process the video
let result = vod.convert_video_to_stream(&output_dir);

match result {
Ok(master_playlist_path) => {
// Update video with success status and processed path
sqlx::query(
"UPDATE videos SET
processing_status = 'completed',
processed_video_path = $1,
updated_at = NOW()
WHERE id = $2",
)
.bind(master_playlist_path.to_str().unwrap())
.bind(video_id)
.execute(db)
.await?;

tracing::info!("Successfully processed video {}", video_id);
}
Err(e) => {
// Update video with failed status
sqlx::query(
"UPDATE videos SET
processing_status = 'failed',
updated_at = NOW()
WHERE id = $1",
)
.bind(video_id)
.execute(db)
.await?;

tracing::error!("Failed to process video {}: {}", video_id, e);
return Err(Error::VideoProcessingError(e.to_string()));
}
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions packages/vod/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ version = "0.1.0"
edition = "2021"

[dependencies]
ffmpeg-next = "6.0"
anyhow = "1.0"
Loading

0 comments on commit 4618c0b

Please sign in to comment.