Skip to content

Commit

Permalink
Add spec expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikWin committed Jan 27, 2025
1 parent cde9191 commit 631b7a3
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ You can find details on this in our [Getting Started Guide](https://ixlab.github

## 🔍 About the project

Vidformer is a highly modular suite of tools that work together; these are detailed [here](https://ixlab.github.io/vidformer/tools.html).
Vidformer is a highly modular suite of tools that work together; these are detailed [here](https://ixlab.github.io/vidformer/modules.html).

❌ vidformer is ***NOT***:
* A conventional video editor (like Premiere Pro or Final Cut)
Expand Down
5 changes: 3 additions & 2 deletions vidformer-igni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
pretty_env_logger = "0.5.0"
log = "0.4.20"
clap = { version = "4.4.6", features = ["derive"] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid" ] }
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid", "chrono" ] }
uuid = { version = "1.7.0", features = ["v4", "fast-rng"] }
tabled = "0.17.0"
vidformer = { path = "../vidformer" }
Expand All @@ -25,4 +25,5 @@ regex = "1"
"rand" = "0.8.5"
zstd = "0.13"
base64 = "0.22.1"
flate2 = "1.0"
flate2 = "1.0"
chrono = "0.4.39"
3 changes: 2 additions & 1 deletion vidformer-igni/igni.toml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
vod_prefix = "http://localhost:8080/vod/"
vod_prefix = "http://localhost:8080/vod/"
gc_period = 300
7 changes: 5 additions & 2 deletions vidformer-igni/init/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ CREATE TABLE source (
pix_fmt TEXT NOT NULL,
width INT NOT NULL,
height INT NOT NULL,
file_size BIGINT NOT NULL
file_size BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- source_t table
Expand All @@ -49,7 +50,9 @@ CREATE TABLE spec (
pos_terminal INT,
closed BOOLEAN NOT NULL,
ready_hook TEXT,
steer_hook TEXT
steer_hook TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ
);

-- spec_t table
Expand Down
1 change: 1 addition & 0 deletions vidformer-igni/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ async fn cmd_spec_add(pool: sqlx::Pool<sqlx::Postgres>, opt: SpecAddOpt) -> Resu
pix_fmt,
ready_hook,
steer_hook,
None,
)
.await?;

Expand Down
4 changes: 3 additions & 1 deletion vidformer-igni/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ pub(crate) async fn add_spec(
pix_fmt: String,
ready_hook: Option<String>,
steer_hook: Option<String>,
ttl: Option<i64>,
) -> Result<uuid::Uuid, IgniError> {
let spec_id = uuid::Uuid::new_v4();

sqlx::query("INSERT INTO spec (id, user_id, width, height, pix_fmt, vod_segment_length_num, vod_segment_length_denom, frame_rate_num, frame_rate_denom, pos_discontinuity, closed, ready_hook, steer_hook) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)")
sqlx::query("INSERT INTO spec (id, user_id, width, height, pix_fmt, vod_segment_length_num, vod_segment_length_denom, frame_rate_num, frame_rate_denom, pos_discontinuity, closed, ready_hook, steer_hook, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)")
.bind(spec_id)
.bind(user_id)
.bind(width)
Expand All @@ -123,6 +124,7 @@ pub(crate) async fn add_spec(
.bind(false)
.bind(ready_hook)
.bind(steer_hook)
.bind(ttl.map(|ttl| chrono::Utc::now() + chrono::Duration::seconds(ttl)))
.execute(pool)
.await?;

Expand Down
3 changes: 3 additions & 0 deletions vidformer-igni/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct SpecRow {
pub closed: bool,
pub ready_hook: Option<String>,
pub steer_hook: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
}

#[derive(sqlx::FromRow)]
Expand All @@ -37,4 +39,5 @@ pub struct SourceRow {
pub width: i32,
pub height: i32,
pub file_size: i64,
pub created_at: chrono::DateTime<chrono::Utc>,
}
21 changes: 17 additions & 4 deletions vidformer-igni/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use num_rational::Rational64;
use regex::Regex;

mod api;
mod gc;
mod vod;

struct IgniServerGlobal {
Expand Down Expand Up @@ -37,9 +38,10 @@ impl UserPermissions {
impl UserPermissions {
pub fn default_regular() -> UserPermissions {
let limits_int = [
("spec:max_width", 4096), // DCI 4K
("spec:max_height", 2160), // DCI 4K
("spec:max_frames", 432000), // 4 hours @ 30 fps / 5 hours @ 24 fps
("spec:max_width", 4096), // DCI 4K
("spec:max_height", 2160), // DCI 4K
("spec:max_frames", 432000), // 4 hours @ 30 fps / 5 hours @ 24 fps
("spec:max_ttl", 24 * 60 * 60), // 24 hours
("source:max_width", 4096),
("source:max_height", 2160),
]
Expand Down Expand Up @@ -102,7 +104,8 @@ impl UserPermissions {
let limits_int = [
("spec:max_width", 1280),
("spec:max_height", 720),
("spec:max_frames", 162000), // 90 minutes @ 30 fps
("spec:max_frames", 162000), // 90 minutes @ 30 fps
("spec:max_ttl", 1 * 60 * 60), // 1 hours
]
.iter()
.map(|(key, value)| (key.to_string(), *value))
Expand Down Expand Up @@ -334,6 +337,7 @@ fn load_config(path: &String) -> Result<ServerConfig, IgniError> {
#[derive(serde::Deserialize, Debug)]
struct ServerConfig {
vod_prefix: String,
gc_period: i64,
}

pub(crate) async fn cmd_server(
Expand All @@ -350,6 +354,15 @@ pub(crate) async fn cmd_server(
.await
.map_err(|e| IgniError::General(format!("Failed to bind to {}: {}", addr, e)))?;

if global.config.gc_period > 0 {
let global_gc = global.clone();
tokio::task::spawn(async move {
if let Err(err) = gc::gc_main(global_gc).await {
error!("Garbage collector failed: {:?}", err);
}
});
}

println!("Opened igni server on {}", addr);

loop {
Expand Down
14 changes: 13 additions & 1 deletion vidformer-igni/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub(crate) async fn get_source(
"width": source.width,
"height": source.height,
"ts": ts,
"created_at": source.created_at.to_rfc3339(),
}
);

Expand Down Expand Up @@ -433,6 +434,8 @@ pub(crate) async fn get_spec(
"frames_applied": row.pos_discontinuity,
"closed": row.closed,
"vod_endpoint": format!("{}{}/", global.config.vod_prefix, source_id), // TODO: This should be configurable
"created_at": row.created_at.to_rfc3339(),
"expires_at": row.expires_at.map(|expires_at| expires_at.to_rfc3339()),
});

Ok(hyper::Response::builder()
Expand Down Expand Up @@ -521,9 +524,10 @@ pub(crate) async fn push_spec(
frame_rate: [i32; 2],
ready_hook: Option<String>,
steer_hook: Option<String>,
ttl: Option<i64>,
}

let req: RequestContent = match serde_json::from_slice(&req) {
let mut req: RequestContent = match serde_json::from_slice(&req) {
Err(err) => {
error!("Error parsing request body");
return Ok(hyper::Response::builder()
Expand Down Expand Up @@ -579,6 +583,13 @@ pub(crate) async fn push_spec(
{
return Ok(err);
}
if let Some(max_ttl) = user.permissions.limits_int.get("spec:max_ttl") {
if let Some(ttl) = &mut req.ttl {
req.ttl = Some((*ttl).min(*max_ttl));
} else {
req.ttl = Some(*max_ttl);
}
}

let spec = crate::ops::add_spec(
&global.pool,
Expand All @@ -590,6 +601,7 @@ pub(crate) async fn push_spec(
req.pix_fmt,
req.ready_hook,
req.steer_hook,
req.ttl,
)
.await;

Expand Down
58 changes: 58 additions & 0 deletions vidformer-igni/src/server/gc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use super::super::IgniError;
use super::IgniServerGlobal;
use log::*;
use uuid::Uuid;

pub(crate) async fn gc_main(global: std::sync::Arc<IgniServerGlobal>) -> Result<(), IgniError> {
let period = global.config.gc_period;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(period as u64)).await;
let gc_result = gc(global.clone()).await;
if let Err(err) = gc_result {
error!("Garbage collector failure: {:?}", err);
}
}
}

pub(crate) async fn gc(global: std::sync::Arc<IgniServerGlobal>) -> Result<(), IgniError> {
info!("Running garbage collector");
let now = chrono::Utc::now();

let mut transaction = global.pool.begin().await?;
let spec_ids_to_delete: Vec<(Uuid,)> = sqlx::query_as(
"SELECT id FROM spec WHERE NOT closed AND expires_at IS NOT NULL AND expires_at < $1",
)
.bind(now)
.fetch_all(&mut *transaction)
.await?;

for (spec_id,) in &spec_ids_to_delete {
// Remove any spec dependencies
sqlx::query("DELETE FROM spec_source_dependency WHERE spec_id = $1")
.bind(spec_id)
.execute(&mut *transaction)
.await?;

// Remove any spec_t entries
sqlx::query("DELETE FROM spec_t WHERE spec_id = $1")
.bind(spec_id)
.execute(&mut *transaction)
.await?;

// Mark the spec as closed
sqlx::query("UPDATE spec SET closed = true WHERE id = $1")
.bind(spec_id)
.execute(&mut *transaction)
.await?;
}

transaction.commit().await?;

if !spec_ids_to_delete.is_empty() {
info!("Deleted {} expired specs", spec_ids_to_delete.len());
} else {
info!("No expired specs to delete");
}

Ok(())
}
3 changes: 3 additions & 0 deletions vidformer-py/vidformer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ def create_spec(
frame_rate,
ready_hook=None,
steer_hook=None,
ttl=None,
) -> IgniSpec:
assert type(width) is int
assert type(height) is int
Expand All @@ -601,6 +602,7 @@ def create_spec(
assert type(frame_rate) is Fraction
assert type(ready_hook) is str or ready_hook is None
assert type(steer_hook) is str or steer_hook is None
assert ttl is None or type(ttl) is int

req = {
"width": width,
Expand All @@ -613,6 +615,7 @@ def create_spec(
"frame_rate": [frame_rate.numerator, frame_rate.denominator],
"ready_hook": ready_hook,
"steer_hook": steer_hook,
"ttl": ttl,
}
response = self._session.post(
f"{self._endpoint}/spec",
Expand Down
4 changes: 3 additions & 1 deletion vidformer-py/vidformer/cv2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def __init__(
size,
batch_size=1024,
compression="gzip",
ttl=3600,
vod_segment_length=Fraction(2, 1),
):
server = _server()
Expand All @@ -371,8 +372,9 @@ def __init__(
assert isinstance(size, tuple) or isinstance(size, list)
assert len(size) == 2
width, height = size
assert ttl is None or isinstance(ttl, int)
self._spec = server.create_spec(
width, height, "yuv420p", vod_segment_length, 1 / self._f_time
width, height, "yuv420p", vod_segment_length, 1 / self._f_time, ttl=ttl
)
self._batch_size = batch_size
assert compression is None or compression in ["gzip"]
Expand Down
9 changes: 9 additions & 0 deletions viper-den/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def test_get_source():
assert source["height"] == 720
assert source["pix_fmt"] == "yuv420p"
assert len(source["ts"]) == 17616
assert re.match(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}", source["created_at"]
)
for ts in source["ts"]:
assert len(ts) == 3
assert isinstance(ts[0], int)
Expand Down Expand Up @@ -175,6 +178,12 @@ def test_get_spec():
assert spec["terminated"] is False
assert spec["closed"] is False
assert spec["vod_endpoint"] == ENDPOINT + "vod/" + spec_id + "/"
assert re.match(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}", spec["created_at"]
)
assert re.match(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}", spec["expires_at"]
)


def test_error_get_spec_not_exists():
Expand Down
10 changes: 8 additions & 2 deletions viper-den/test_cv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def test_access_video_by_http_url():
assert count == 17616


def test_write_video():
@pytest.mark.parametrize("ttl", [None, 10, 10**7])
def test_write_video(ttl):
server = vf.IgniServer(ENDPOINT, API_KEY)
cv2.set_server(server)

Expand All @@ -56,7 +57,12 @@ def test_write_video():
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

out = cv2.VideoWriter(
None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height), batch_size=50
None,
cv2.VideoWriter_fourcc(*"mp4v"),
fps,
(width, height),
batch_size=50,
ttl=ttl,
)
video_url = cv2.vidplay(out, method="link")
assert type(video_url) is str
Expand Down
7 changes: 5 additions & 2 deletions viper-den/test_python_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ def test_delete_source():
assert tos.id() not in sources


def test_create_spec():
@pytest.mark.parametrize("ttl", [None, 10, 10**7])
def test_create_spec(ttl):
server = vf.IgniServer(ENDPOINT, API_KEY)
segment_legnth = Fraction(2, 1)
spec_id = server.create_spec(1920, 1080, "yuv420p", segment_legnth, Fraction(30, 1))
spec_id = server.create_spec(
1920, 1080, "yuv420p", segment_legnth, Fraction(30, 1), ttl=ttl
)
assert isinstance(spec_id, vf.IgniSpec)


Expand Down

0 comments on commit 631b7a3

Please sign in to comment.