From a2aba0beca1d0613670b00fb107650964c0a5009 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 17 Nov 2024 21:53:26 -0800 Subject: [PATCH 1/8] Make sure queue fails jobs correctly --- packages/queue/src/queue.rs | 19 +++++++++++- packages/queue/src/runner.rs | 56 ++++++++++++++---------------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/packages/queue/src/queue.rs b/packages/queue/src/queue.rs index 7a5b58a..2fa39df 100644 --- a/packages/queue/src/queue.rs +++ b/packages/queue/src/queue.rs @@ -35,6 +35,7 @@ pub struct Job { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Message { ProcessRawVideoIntoStream { video_id: String }, + CompressRawVideo { video_id: String }, } /// The queue itself @@ -94,12 +95,28 @@ impl Queue for PostgresQueue { async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> { let now = chrono::Utc::now(); + + // First get the current failed_attempts count + let query = "SELECT failed_attempts FROM queue WHERE id = $1"; + let failed_attempts: i32 = sqlx::query_scalar(query) + .bind(job_id) + .fetch_one(&self.db) + .await?; + + // Determine the new status based on failed attempts + let new_status = if failed_attempts + 1 >= self.max_attempts as i32 { + PostgresJobStatus::Failed + } else { + PostgresJobStatus::Queued + }; + + // Update the job with new status and increment failed_attempts let query = "UPDATE queue SET status = $1, updated_at = $2, failed_attempts = failed_attempts + 1 WHERE id = $3"; sqlx::query(query) - .bind(PostgresJobStatus::Queued) + .bind(new_status) .bind(now) .bind(job_id) .execute(&self.db) diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index 34fdc90..c068ec6 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -52,6 +52,7 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { match job.message { Message::ProcessRawVideoIntoStream { video_id } => { tracing::info!("Start video processing for video_id {video_id}"); + // Update video status to Processing sqlx::query( "UPDATE videos SET processing_status = 'processing', updated_at = NOW() WHERE id = $1" @@ -69,6 +70,7 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { // Create output directory let output_dir = PathBuf::from(get_videos_dir()).join(&video_id.to_string()); let ffmpeg_location = get_ffmpeg_location(); + // Initialize HLS converter let converter = HLSConverter::new( ffmpeg_location.as_str(), @@ -86,41 +88,27 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { ]; // Process the video - match converter.convert_to_hls(&video.raw_video_path, qualities) { - Ok(_) => { - // Update with success status - let master_playlist_path = output_dir.join("master.m3u8"); - sqlx::query( - "UPDATE videos SET - processing_status = 'completed', - processed_video_path = $1, - updated_at = NOW() - WHERE id = $2", - ) - .bind(master_playlist_path.to_str().unwrap()) - .bind(&video_id) - .execute(db) - .await?; - - tracing::info!("Successfully processed video {}", &video_id); - } - Err(e) => { - // Update with failed status - sqlx::query( - "UPDATE videos SET - processing_status = 'failed', - updated_at = NOW() - WHERE id = $1", - ) - .bind(&video_id) - .execute(db) - .await?; - - tracing::error!("Failed to process video {}: {}", &video_id, e); - return Err(Error::VideoProcessingError(e.to_string())); - } - } + converter + .convert_to_hls(&video.raw_video_path, qualities) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + // Update with success status + let master_playlist_path = output_dir.join("master.m3u8"); + sqlx::query( + "UPDATE videos SET + processing_status = 'completed', + processed_video_path = $1, + updated_at = NOW() + WHERE id = $2", + ) + .bind(master_playlist_path.to_str().unwrap()) + .bind(&video_id) + .execute(db) + .await?; + + tracing::info!("Successfully processed video {}", &video_id); } + _ => tracing::warn!("Unhandled job message passed"), } Ok(()) From f460c012cf093b4dd499c4f331783557b55126c2 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 17 Nov 2024 22:03:12 -0800 Subject: [PATCH 2/8] Fix error message for unused var --- services/barn-ui/src/lib/server/videos.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/barn-ui/src/lib/server/videos.ts b/services/barn-ui/src/lib/server/videos.ts index f212b67..444da2f 100644 --- a/services/barn-ui/src/lib/server/videos.ts +++ b/services/barn-ui/src/lib/server/videos.ts @@ -95,7 +95,7 @@ export const deleteVideos = async (idList: string[], token: string) => { try { const baseURL = `${env.API_URL}`; const serializedIDList = idList.join(','); - const res = await fetch(`${baseURL}/video?id=${serializedIDList}`, { + await fetch(`${baseURL}/video?id=${serializedIDList}`, { method: 'DELETE', headers: { Authorization: `Bearer ${token}` From e7f9c6415f640a599a19bae04849171cc942ec9d Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 17 Nov 2024 22:03:21 -0800 Subject: [PATCH 3/8] Add queue item for compressing video --- Cargo.lock | 162 ++++++++++++++++++++++++++++++++++- packages/queue/Cargo.toml | 1 + packages/queue/src/runner.rs | 79 +++++++++++++++++ 3 files changed, 241 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 7c0a73c..41a84cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.8.11" @@ -101,7 +112,7 @@ dependencies = [ "base64ct", "blake2", "cpufeatures", - "password-hash", + "password-hash 0.5.0", ] [[package]] @@ -343,12 +354,35 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cc" version = "1.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -373,6 +407,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -388,6 +432,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -418,6 +468,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -563,6 +622,16 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "flate2" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -981,12 +1050,30 @@ dependencies = [ "hashbrown 0.15.0", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.72" @@ -1303,6 +1390,17 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "password-hash" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" +dependencies = [ + "base64ct", + "rand_core", + "subtle", +] + [[package]] name = "password-hash" version = "0.5.0" @@ -1320,6 +1418,18 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbkdf2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" +dependencies = [ + "digest", + "hmac", + "password-hash 0.4.2", + "sha2", +] + [[package]] name = "pem" version = "1.1.1" @@ -1443,6 +1553,7 @@ dependencies = [ "ulid", "uuid", "vod", + "zip", ] [[package]] @@ -2864,3 +2975,52 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zip" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +dependencies = [ + "aes", + "byteorder", + "bzip2", + "constant_time_eq", + "crc32fast", + "crossbeam-utils", + "flate2", + "hmac", + "pbkdf2", + "sha1", + "time", + "zstd", +] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/packages/queue/Cargo.toml b/packages/queue/Cargo.toml index 96578d0..2083a26 100644 --- a/packages/queue/Cargo.toml +++ b/packages/queue/Cargo.toml @@ -22,3 +22,4 @@ tracing = "0.1" ulid = { version = "1", features = ["uuid"] } uuid = { version = "1", features = ["serde", "v4"] } vod = { path = "../../packages/vod" } +zip = "0.6" diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index c068ec6..e83a1b8 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -3,8 +3,13 @@ use crate::queue::{Job, Message, Queue}; use db::Video; use futures::{stream, StreamExt}; use sqlx::{Pool, Postgres}; +use std::fs; +use std::io::Write; use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::fs::File; +use tokio::io::AsyncReadExt; use vod::{HLSConverter, Quality}; +use zip::{write::FileOptions, ZipWriter}; /// Runs a loop that pulls jobs from the queue and runs jobs each loop pub async fn run_worker(queue: Arc, concurrency: usize, db_conn: &Pool) { @@ -108,6 +113,80 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { tracing::info!("Successfully processed video {}", &video_id); } + Message::CompressRawVideo { video_id } => { + tracing::info!("Start video compression for video_id {video_id}"); + + // Update video compression status + sqlx::query( + "UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1" + ) + .bind(&video_id) + .execute(db) + .await?; + + // Get video details + let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1") + .bind(&video_id) + .fetch_one(db) + .await?; + + // Create output directory if it doesn't exist + let videos_dir = PathBuf::from(get_videos_dir()); + fs::create_dir_all(&videos_dir) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + let zip_path = videos_dir.join(format!("{}_raw.zip", video_id)); + let mut zip = ZipWriter::new( + fs::File::create(&zip_path) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?, + ); + + let raw_video_path = PathBuf::from(&video.raw_video_path); + let file_name = raw_video_path + .file_name() + .ok_or_else(|| Error::VideoProcessingError("Invalid raw video path".to_string()))? + .to_string_lossy() + .into_owned(); + + zip.start_file(&file_name, FileOptions::default()) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + // Read the file in chunks + let mut file = File::open(&raw_video_path) + .await + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks + + loop { + let n = file + .read(&mut buffer) + .await + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + if n == 0 { + break; + } + zip.write_all(&buffer[..n]) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + } + + zip.finish() + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + // Update the video record with the compressed file path + sqlx::query( + "UPDATE videos SET + compression_status = 'completed', + compressed_video_path = $1, + updated_at = NOW() + WHERE id = $2", + ) + .bind(zip_path.to_str().unwrap()) + .bind(&video_id) + .execute(db) + .await?; + + tracing::info!("Successfully compressed video {}", &video_id); + } _ => tracing::warn!("Unhandled job message passed"), } From a5f11239c424ce00d49e1759b5a1168216725d28 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:28:30 -0800 Subject: [PATCH 4/8] Add compression migrations, fix raw_video_path, fix job --- .../20241101164200_add_videos.up.sql | 2 +- .../20241124231804_compresseion_cols.down.sql | 7 + .../20241124231804_compresseion_cols.up.sql | 7 + packages/queue/src/queue.rs | 19 +- packages/queue/src/runner.rs | 171 ++++++++++++------ services/forge-queue/src/main.rs | 2 +- services/silo-api/src/main.rs | 4 - services/silo-api/src/routes/upload.rs | 1 + 8 files changed, 145 insertions(+), 68 deletions(-) create mode 100644 packages/db/migrations/20241124231804_compresseion_cols.down.sql create mode 100644 packages/db/migrations/20241124231804_compresseion_cols.up.sql diff --git a/packages/db/migrations/20241101164200_add_videos.up.sql b/packages/db/migrations/20241101164200_add_videos.up.sql index 585464b..6e6f171 100644 --- a/packages/db/migrations/20241101164200_add_videos.up.sql +++ b/packages/db/migrations/20241101164200_add_videos.up.sql @@ -4,7 +4,7 @@ CREATE TABLE videos ( id TEXT PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, title VARCHAR(100) NOT NULL, - raw_video_path VARCHAR(255) NOT NULL, + raw_video_path VARCHAR(255), processed_video_path VARCHAR(255), processing_status processing_status NOT NULL DEFAULT 'pending', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, diff --git a/packages/db/migrations/20241124231804_compresseion_cols.down.sql b/packages/db/migrations/20241124231804_compresseion_cols.down.sql new file mode 100644 index 0000000..282db23 --- /dev/null +++ b/packages/db/migrations/20241124231804_compresseion_cols.down.sql @@ -0,0 +1,7 @@ +-- Remove compression-related columns +ALTER TABLE videos + DROP COLUMN compression_status, + DROP COLUMN compressed_video_path; + +-- Drop the compression status enum type +DROP TYPE compression_status; diff --git a/packages/db/migrations/20241124231804_compresseion_cols.up.sql b/packages/db/migrations/20241124231804_compresseion_cols.up.sql new file mode 100644 index 0000000..261b0e9 --- /dev/null +++ b/packages/db/migrations/20241124231804_compresseion_cols.up.sql @@ -0,0 +1,7 @@ +-- Create compression status enum type +CREATE TYPE compression_status AS ENUM ('pending', 'compressing', 'completed', 'failed'); + +-- Add compression-related columns +ALTER TABLE videos + ADD COLUMN compression_status compression_status NOT NULL DEFAULT 'pending', + ADD COLUMN compressed_video_path VARCHAR(255); diff --git a/packages/queue/src/queue.rs b/packages/queue/src/queue.rs index 2fa39df..f672040 100644 --- a/packages/queue/src/queue.rs +++ b/packages/queue/src/queue.rs @@ -12,6 +12,7 @@ pub trait Queue: Send + Sync + Debug { async fn push( &self, job: Message, + status: PostgresJobStatus, scheduled_for: Option>, ) -> Result<(), Error>; /// pull fetches at most `number_of_jobs` from the queue. @@ -61,18 +62,23 @@ impl Queue for PostgresQueue { async fn push( &self, job: Message, + status: PostgresJobStatus, date: Option>, ) -> Result<(), Error> { let scheduled_for = date.unwrap_or(chrono::Utc::now()); let failed_attempts: i32 = 0; let message = Json(job); - let status = PostgresJobStatus::Queued; let now = chrono::Utc::now(); let job_id: Uuid = Ulid::new().into(); let query = "INSERT INTO queue (id, created_at, updated_at, scheduled_for, failed_attempts, status, message) VALUES ($1, $2, $3, $4, $5, $6, $7)"; - + tracing::debug!( + "Adding job to queue with id: {}, status: {:?}, scheduled_for: {}", + job_id, + status, + scheduled_for + ); sqlx::query(query) .bind(job_id) .bind(now) @@ -94,6 +100,10 @@ impl Queue for PostgresQueue { } async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> { + tracing::debug!( + "Failing job with id: {}, attempting to update status and increment failed attempts", + job_id + ); let now = chrono::Utc::now(); // First get the current failed_attempts count @@ -136,7 +146,7 @@ impl Queue for PostgresQueue { WHERE id IN ( SELECT id FROM queue - WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5 + WHERE status = ANY($3) AND scheduled_for <= $4 AND failed_attempts < $5 ORDER BY scheduled_for FOR UPDATE SKIP LOCKED LIMIT $6 @@ -146,12 +156,13 @@ impl Queue for PostgresQueue { let jobs: Vec = sqlx::query_as::<_, PostgresJob>(query) .bind(PostgresJobStatus::Running) .bind(now) - .bind(PostgresJobStatus::Queued) + .bind(vec![PostgresJobStatus::Queued]) .bind(now) .bind(self.max_attempts as i32) .bind(number_of_jobs) .fetch_all(&self.db) .await?; + Ok(jobs.into_iter().map(Into::into).collect()) } diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index e83a1b8..ccbfac8 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -1,4 +1,5 @@ use crate::error::Error; +use crate::job::PostgresJobStatus; use crate::queue::{Job, Message, Queue}; use db::Video; use futures::{stream, StreamExt}; @@ -34,7 +35,7 @@ pub async fn run_worker(queue: Arc, concurrency: usize, db_conn: &Poo tracing::debug!("Starting job {}", job.id); let job_id = job.id; - let res = match handle_job(job, db_conn).await { + let res = match handle_job(queue.clone(), job, db_conn).await { Ok(_) => queue.delete_job(job_id).await, Err(err) => { tracing::error!("run_worker: handling job({}): {}", job_id, &err); @@ -53,7 +54,8 @@ pub async fn run_worker(queue: Arc, concurrency: usize, db_conn: &Poo } /// Individually processes a single job, based on its Job message type -async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { +async fn handle_job(queue: Arc, job: Job, db: &Pool) -> Result<(), Error> { + tracing::debug!("Got job of type {:?}", &job.message); match job.message { Message::ProcessRawVideoIntoStream { video_id } => { tracing::info!("Start video processing for video_id {video_id}"); @@ -111,81 +113,134 @@ async fn handle_job(job: Job, db: &Pool) -> Result<(), Error> { .execute(db) .await?; - tracing::info!("Successfully processed video {}", &video_id); + // After completed, queue up a Compress Raw Video job + let scheduled_time = chrono::Utc::now() + chrono::Duration::days(7); + queue + .push( + Message::CompressRawVideo { + video_id: video_id.clone(), + }, + PostgresJobStatus::Queued, + // Some(scheduled_time), // TODO: Uncomment after testing + None, + ) + .await?; + + tracing::info!( + "Successfully processed video {} and queued compression job", + &video_id + ); } Message::CompressRawVideo { video_id } => { tracing::info!("Start video compression for video_id {video_id}"); - // Update video compression status + // Update video compression status to compressing sqlx::query( - "UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1" - ) - .bind(&video_id) - .execute(db) - .await?; - - // Get video details - let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1") - .bind(&video_id) - .fetch_one(db) - .await?; + "UPDATE videos SET compression_status = 'compressing', updated_at = NOW() WHERE id = $1" + ) + .bind(&video_id) + .execute(db) + .await?; - // Create output directory if it doesn't exist - let videos_dir = PathBuf::from(get_videos_dir()); - fs::create_dir_all(&videos_dir) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + // Wrap the compression logic in a result to handle failures + let compression_result = async { + // Get video details + let video = sqlx::query_as::<_, Video>("SELECT * FROM videos WHERE id = $1") + .bind(&video_id) + .fetch_one(db) + .await?; - let zip_path = videos_dir.join(format!("{}_raw.zip", video_id)); - let mut zip = ZipWriter::new( - fs::File::create(&zip_path) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?, - ); + // Create video-specific directory if it doesn't exist + let videos_dir = PathBuf::from(get_videos_dir()); + let video_dir = videos_dir.join(&video_id.to_string()); + fs::create_dir_all(&video_dir) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - let raw_video_path = PathBuf::from(&video.raw_video_path); - let file_name = raw_video_path - .file_name() - .ok_or_else(|| Error::VideoProcessingError("Invalid raw video path".to_string()))? - .to_string_lossy() - .into_owned(); + let zip_path = video_dir.join("raw.zip"); + let mut zip = ZipWriter::new( + fs::File::create(&zip_path) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?, + ); - zip.start_file(&file_name, FileOptions::default()) - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + let raw_video_path = PathBuf::from(&video.raw_video_path); + let file_name = raw_video_path + .file_name() + .ok_or_else(|| { + Error::VideoProcessingError("Invalid raw video path".to_string()) + })? + .to_string_lossy() + .into_owned(); - // Read the file in chunks - let mut file = File::open(&raw_video_path) - .await - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks + zip.start_file(&file_name, FileOptions::default()) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - loop { - let n = file - .read(&mut buffer) + // Read the file in chunks + let mut file = File::open(&raw_video_path) .await .map_err(|e| Error::VideoProcessingError(e.to_string()))?; - if n == 0 { - break; + let mut buffer = vec![0; 1024 * 1024]; // 1MB chunks + + loop { + let n = file + .read(&mut buffer) + .await + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + if n == 0 { + break; + } + zip.write_all(&buffer[..n]) + .map_err(|e| Error::VideoProcessingError(e.to_string()))?; } - zip.write_all(&buffer[..n]) + + zip.finish() .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + + // Close file handle before trying to remove + drop(file); + + // Remove the original raw video file + tokio::fs::remove_file(&raw_video_path).await.map_err(|e| { + Error::VideoProcessingError(format!("Failed to remove raw video: {}", e)) + })?; + + Ok::(zip_path) } + .await; - zip.finish() - .map_err(|e| Error::VideoProcessingError(e.to_string()))?; + match compression_result { + Ok(zip_path) => { + // Update the video record with success status and compressed file path + sqlx::query( + "UPDATE videos SET + compression_status = 'completed', + compressed_video_path = $1, + raw_video_path = NULL, + updated_at = NOW() + WHERE id = $2", + ) + .bind(zip_path.to_str().unwrap()) + .bind(&video_id) + .execute(db) + .await?; - // Update the video record with the compressed file path - sqlx::query( - "UPDATE videos SET - compression_status = 'completed', - compressed_video_path = $1, - updated_at = NOW() - WHERE id = $2", - ) - .bind(zip_path.to_str().unwrap()) - .bind(&video_id) - .execute(db) - .await?; + tracing::info!("Successfully compressed video {}", &video_id); + } + Err(err) => { + // Update the video record with failed status + sqlx::query( + "UPDATE videos SET + compression_status = 'failed', + updated_at = NOW() + WHERE id = $1", + ) + .bind(&video_id) + .execute(db) + .await?; - tracing::info!("Successfully compressed video {}", &video_id); + tracing::error!("Failed to compress video {}: {}", &video_id, err); + return Err(err); + } + } } _ => tracing::warn!("Unhandled job message passed"), } diff --git a/services/forge-queue/src/main.rs b/services/forge-queue/src/main.rs index 1dc7402..f3b7241 100644 --- a/services/forge-queue/src/main.rs +++ b/services/forge-queue/src/main.rs @@ -9,7 +9,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "forge=debug,queue=debug,db=debug".into()), + .unwrap_or_else(|_| "forge=debug,queue=debug".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); diff --git a/services/silo-api/src/main.rs b/services/silo-api/src/main.rs index b007d57..a5da51b 100644 --- a/services/silo-api/src/main.rs +++ b/services/silo-api/src/main.rs @@ -53,10 +53,6 @@ async fn main() { .expect("Could not connect to database"); // Initialize the queue let queue = Arc::new(PostgresQueue::new(db.clone())); - // Run migrations - let _mig = db::run_migrations(&db) - .await - .expect("Could not run database migrations"); // Store shared data as state between routes let state = Arc::new(AppState { db, config, queue }); routes::upload::init_cleanup().await; diff --git a/services/silo-api/src/routes/upload.rs b/services/silo-api/src/routes/upload.rs index c22213c..d556414 100644 --- a/services/silo-api/src/routes/upload.rs +++ b/services/silo-api/src/routes/upload.rs @@ -366,6 +366,7 @@ pub async fn upload_video( .queue .push( process_video_message, + queue::job::PostgresJobStatus::Queued, None, // Schedule for immediate processing ) .await From 8d0116a1d1a26b10560f6866d56bee2717ae6693 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:29:25 -0800 Subject: [PATCH 5/8] Add seven days for compression --- packages/queue/src/runner.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index ccbfac8..aea4711 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -121,8 +121,7 @@ async fn handle_job(queue: Arc, job: Job, db: &Pool) -> Res video_id: video_id.clone(), }, PostgresJobStatus::Queued, - // Some(scheduled_time), // TODO: Uncomment after testing - None, + Some(scheduled_time), // TODO: Uncomment after testing ) .await?; From 408511c803d7ccb2d826efbb11e61b9607e9daed Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:35:12 -0800 Subject: [PATCH 6/8] Fix reset state after uploading video --- .../src/routes/(user)/upload/+page.svelte | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/services/barn-ui/src/routes/(user)/upload/+page.svelte b/services/barn-ui/src/routes/(user)/upload/+page.svelte index ac05b0f..0a613f4 100644 --- a/services/barn-ui/src/routes/(user)/upload/+page.svelte +++ b/services/barn-ui/src/routes/(user)/upload/+page.svelte @@ -17,6 +17,16 @@ const MAX_RETRIES = 3; const COMPRESSION_THRESHOLD = 5 * 1024 * 1024; // 5MB + function resetState() { + file = null; + progress = 0; + uploading = false; + paused = false; + abortController = null; + errorMessage = null; + successMessage = null; + } + async function calculateChecksum(chunk: Blob): Promise { const arrayBuffer = await chunk.arrayBuffer(); const hashBuffer = await crypto.subtle.digest('SHA-256', arrayBuffer); @@ -182,6 +192,8 @@ uploading = false; progress = 100; successMessage = `Successfully uploaded ${selectedFile.name}`; + // Clear the file after successful upload + file = null; } } catch (error) { console.error('Upload error:', error); @@ -191,10 +203,14 @@ } else { errorMessage = error.message; uploading = false; + // Clear the file on error + file = null; } } else { errorMessage = 'An unknown error occurred'; uploading = false; + // Clear the file on error + file = null; } } } @@ -202,6 +218,7 @@ function handleFileSelect(event: Event) { const input = event.target as HTMLInputElement; if (input.files && input.files[0]) { + resetState(); const selectedFile = input.files[0]; if (!selectedFile.type.startsWith('video/')) { errorMessage = 'Please select a video file'; From e22c9fe1bb91a7fcbe9f3f43545780321ee1ea6d Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:35:34 -0800 Subject: [PATCH 7/8] Add better debug message for failed jobs --- packages/queue/src/queue.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/queue/src/queue.rs b/packages/queue/src/queue.rs index f672040..29b6158 100644 --- a/packages/queue/src/queue.rs +++ b/packages/queue/src/queue.rs @@ -100,10 +100,6 @@ impl Queue for PostgresQueue { } async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> { - tracing::debug!( - "Failing job with id: {}, attempting to update status and increment failed attempts", - job_id - ); let now = chrono::Utc::now(); // First get the current failed_attempts count @@ -113,6 +109,12 @@ impl Queue for PostgresQueue { .fetch_one(&self.db) .await?; + tracing::debug!( + "Failing job with id: {}, attempt {} of {}", + job_id, + failed_attempts + 1, + self.max_attempts + ); // Determine the new status based on failed attempts let new_status = if failed_attempts + 1 >= self.max_attempts as i32 { PostgresJobStatus::Failed From 35b67d68086f54f07848dc7308f895a0b1ad2ab8 Mon Sep 17 00:00:00 2001 From: Zachary Corvidae Date: Sun, 24 Nov 2024 15:40:48 -0800 Subject: [PATCH 8/8] Make compression job happen after 1 day --- packages/queue/src/runner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/queue/src/runner.rs b/packages/queue/src/runner.rs index aea4711..de6539a 100644 --- a/packages/queue/src/runner.rs +++ b/packages/queue/src/runner.rs @@ -114,14 +114,14 @@ async fn handle_job(queue: Arc, job: Job, db: &Pool) -> Res .await?; // After completed, queue up a Compress Raw Video job - let scheduled_time = chrono::Utc::now() + chrono::Duration::days(7); + let scheduled_time = chrono::Utc::now() + chrono::Duration::days(1); queue .push( Message::CompressRawVideo { video_id: video_id.clone(), }, PostgresJobStatus::Queued, - Some(scheduled_time), // TODO: Uncomment after testing + Some(scheduled_time), ) .await?;