Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add video compression job #28

Merged
merged 8 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 161 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/db/migrations/20241101164200_add_videos.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE videos (
id TEXT PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(100) NOT NULL,
raw_video_path VARCHAR(255) NOT NULL,
raw_video_path VARCHAR(255),
processed_video_path VARCHAR(255),
processing_status processing_status NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Remove compression-related columns
ALTER TABLE videos
DROP COLUMN compression_status,
DROP COLUMN compressed_video_path;

-- Drop the compression status enum type
DROP TYPE compression_status;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Create compression status enum type
CREATE TYPE compression_status AS ENUM ('pending', 'compressing', 'completed', 'failed');

-- Add compression-related columns
ALTER TABLE videos
ADD COLUMN compression_status compression_status NOT NULL DEFAULT 'pending',
ADD COLUMN compressed_video_path VARCHAR(255);
1 change: 1 addition & 0 deletions packages/queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ tracing = "0.1"
ulid = { version = "1", features = ["uuid"] }
uuid = { version = "1", features = ["serde", "v4"] }
vod = { path = "../../packages/vod" }
zip = "0.6"
40 changes: 35 additions & 5 deletions packages/queue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub trait Queue: Send + Sync + Debug {
async fn push(
&self,
job: Message,
status: PostgresJobStatus,
scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error>;
/// pull fetches at most `number_of_jobs` from the queue.
Expand All @@ -35,6 +36,7 @@ pub struct Job {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
ProcessRawVideoIntoStream { video_id: String },
CompressRawVideo { video_id: String },
}

/// The queue itself
Expand All @@ -60,18 +62,23 @@ impl Queue for PostgresQueue {
async fn push(
&self,
job: Message,
status: PostgresJobStatus,
date: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<(), Error> {
let scheduled_for = date.unwrap_or(chrono::Utc::now());
let failed_attempts: i32 = 0;
let message = Json(job);
let status = PostgresJobStatus::Queued;
let now = chrono::Utc::now();
let job_id: Uuid = Ulid::new().into();
let query = "INSERT INTO queue
(id, created_at, updated_at, scheduled_for, failed_attempts, status, message)
VALUES ($1, $2, $3, $4, $5, $6, $7)";

tracing::debug!(
"Adding job to queue with id: {}, status: {:?}, scheduled_for: {}",
job_id,
status,
scheduled_for
);
sqlx::query(query)
.bind(job_id)
.bind(now)
Expand All @@ -94,12 +101,34 @@ impl Queue for PostgresQueue {

async fn fail_job(&self, job_id: Uuid) -> Result<(), Error> {
let now = chrono::Utc::now();

// First get the current failed_attempts count
let query = "SELECT failed_attempts FROM queue WHERE id = $1";
let failed_attempts: i32 = sqlx::query_scalar(query)
.bind(job_id)
.fetch_one(&self.db)
.await?;

tracing::debug!(
"Failing job with id: {}, attempt {} of {}",
job_id,
failed_attempts + 1,
self.max_attempts
);
// Determine the new status based on failed attempts
let new_status = if failed_attempts + 1 >= self.max_attempts as i32 {
PostgresJobStatus::Failed
} else {
PostgresJobStatus::Queued
};

// Update the job with new status and increment failed_attempts
let query = "UPDATE queue
SET status = $1, updated_at = $2, failed_attempts = failed_attempts + 1
WHERE id = $3";

sqlx::query(query)
.bind(PostgresJobStatus::Queued)
.bind(new_status)
.bind(now)
.bind(job_id)
.execute(&self.db)
Expand All @@ -119,7 +148,7 @@ impl Queue for PostgresQueue {
WHERE id IN (
SELECT id
FROM queue
WHERE status = $3 AND scheduled_for <= $4 AND failed_attempts < $5
WHERE status = ANY($3) AND scheduled_for <= $4 AND failed_attempts < $5
ORDER BY scheduled_for
FOR UPDATE SKIP LOCKED
LIMIT $6
Expand All @@ -129,12 +158,13 @@ impl Queue for PostgresQueue {
let jobs: Vec<PostgresJob> = sqlx::query_as::<_, PostgresJob>(query)
.bind(PostgresJobStatus::Running)
.bind(now)
.bind(PostgresJobStatus::Queued)
.bind(vec![PostgresJobStatus::Queued])
.bind(now)
.bind(self.max_attempts as i32)
.bind(number_of_jobs)
.fetch_all(&self.db)
.await?;

Ok(jobs.into_iter().map(Into::into).collect())
}

Expand Down
Loading