Skip to content

Commit

Permalink
start splitting wall and media duration for #34
Browse files Browse the repository at this point in the history
This splits the schema and playback path. The recording path still
adjusts the frame durations and always says the wall and media durations
are the same. I expect to change that in a following commit. I wouldn't
be surprised if that shakes out some bugs in this portion.
  • Loading branch information
scottlamb committed Aug 5, 2020
1 parent 476bd86 commit cb97ccd
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 238 deletions.
72 changes: 39 additions & 33 deletions db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ const INSERT_VIDEO_SAMPLE_ENTRY_SQL: &'static str = r#"
const UPDATE_STREAM_COUNTERS_SQL: &'static str = r#"
update stream
set cum_recordings = :cum_recordings,
cum_duration_90k = :cum_duration_90k,
cum_media_duration_90k = :cum_media_duration_90k,
cum_runs = :cum_runs
where id = :stream_id
"#;
Expand Down Expand Up @@ -178,7 +178,8 @@ pub struct ListRecordingsRow {
pub id: CompositeId,

/// This is a recording::Duration, but a single recording's duration fits into an i32.
pub duration_90k: i32,
pub wall_duration_90k: i32,
pub media_duration_90k: i32,
pub video_samples: i32,
pub video_sync_samples: i32,
pub sample_file_bytes: i32,
Expand All @@ -189,7 +190,7 @@ pub struct ListRecordingsRow {
/// This is populated by `list_recordings_by_id` but not `list_recordings_by_time`.
/// (It's not included in the `recording_cover` index, so adding it to
/// `list_recordings_by_time` would be inefficient.)
pub prev_duration_and_runs: Option<(recording::Duration, i32)>,
pub prev_media_duration_and_runs: Option<(recording::Duration, i32)>,
}

/// A row used in `list_aggregated_recordings`.
Expand All @@ -213,7 +214,7 @@ impl ListAggregatedRecordingsRow { fn from(row: ListRecordingsRow) -> Self {
let uncommitted = (row.flags & RecordingFlags::Uncommitted as i32) != 0;
let growing = (row.flags & RecordingFlags::Growing as i32) != 0;
ListAggregatedRecordingsRow {
time: row.start .. recording::Time(row.start.0 + row.duration_90k as i64),
time: row.start .. recording::Time(row.start.0 + row.wall_duration_90k as i64),
ids: recording_id .. recording_id+1,
video_samples: row.video_samples as i64,
video_sync_samples: row.video_sync_samples as i64,
Expand Down Expand Up @@ -252,12 +253,13 @@ pub struct RecordingToInsert {
pub start: recording::Time,

/// Filled in by `add_recording`.
pub prev_duration: recording::Duration,
pub prev_media_duration: recording::Duration,

/// Filled in by `add_recording`.
pub prev_runs: i32,

pub duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
pub wall_duration_90k: i32, // a recording::Duration, but guaranteed to fit in i32.
pub media_duration_90k: i32,
pub local_time_delta: recording::Duration,
pub video_samples: i32,
pub video_sync_samples: i32,
Expand All @@ -272,14 +274,15 @@ impl RecordingToInsert {
start: self.start,
video_sample_entry_id: self.video_sample_entry_id,
id,
duration_90k: self.duration_90k,
wall_duration_90k: self.wall_duration_90k,
media_duration_90k: self.media_duration_90k,
video_samples: self.video_samples,
video_sync_samples: self.video_sync_samples,
sample_file_bytes: self.sample_file_bytes,
run_offset: self.run_offset,
open_id,
flags: self.flags | RecordingFlags::Uncommitted as i32,
prev_duration_and_runs: Some((self.prev_duration, self.prev_runs)),
prev_media_duration_and_runs: Some((self.prev_media_duration, self.prev_runs)),
}
}
}
Expand All @@ -290,7 +293,7 @@ impl RecordingToInsert {
pub(crate) struct ListOldestRecordingsRow {
pub id: CompositeId,
pub start: recording::Time,
pub duration: i32,
pub wall_duration_90k: i32,
pub sample_file_bytes: i32,
}

Expand Down Expand Up @@ -488,8 +491,8 @@ pub struct Stream {
/// The `cum_recordings` currently committed to the database.
pub(crate) cum_recordings: i32,

/// The `cum_duration_90k` currently committed to the database.
cum_duration: recording::Duration,
/// The `cum_media_duration_90k` currently committed to the database.
cum_media_duration: recording::Duration,

/// The `cum_runs` currently committed to the database.
cum_runs: i32,
Expand Down Expand Up @@ -640,7 +643,7 @@ fn init_recordings(conn: &mut rusqlite::Connection, stream_id: i32, camera: &Cam
let mut stmt = conn.prepare(r#"
select
recording.start_time_90k,
recording.duration_90k,
recording.wall_duration_90k,
recording.sample_file_bytes
from
recording
Expand Down Expand Up @@ -782,7 +785,7 @@ impl StreamStateChanger {
let mut stmt = tx.prepare_cached(r#"
insert into stream (camera_id, sample_file_dir_id, type, rtsp_url, record,
retain_bytes, flush_if_sec, cum_recordings,
cum_duration_90k, cum_runs)
cum_media_duration_90k, cum_runs)
values (:camera_id, :sample_file_dir_id, :type, :rtsp_url, :record,
0, :flush_if_sec, 0,
0, 0)
Expand Down Expand Up @@ -834,7 +837,7 @@ impl StreamStateChanger {
days: BTreeMap::new(),
record: sc.record,
cum_recordings: 0,
cum_duration: recording::Duration(0),
cum_media_duration: recording::Duration(0),
cum_runs: 0,
uncommitted: VecDeque::new(),
synced_recordings: 0,
Expand Down Expand Up @@ -883,7 +886,7 @@ impl LockedDatabase {
/// A call to `add_recording` is also a promise that previous recordings (even if not yet
/// synced and committed) won't change.
///
/// This fills the `prev_duration` and `prev_runs` fields.
/// This fills the `prev_media_duration` and `prev_runs` fields.
pub(crate) fn add_recording(&mut self, stream_id: i32, mut r: RecordingToInsert)
-> Result<(CompositeId, Arc<Mutex<RecordingToInsert>>), Error> {
let stream = match self.streams_by_id.get_mut(&stream_id) {
Expand All @@ -895,11 +898,12 @@ impl LockedDatabase {
match stream.uncommitted.back() {
Some(s) => {
let l = s.lock();
r.prev_duration = l.prev_duration + recording::Duration(l.duration_90k.into());
r.prev_media_duration =
l.prev_media_duration + recording::Duration(l.wall_duration_90k.into());
r.prev_runs = l.prev_runs + if l.run_offset == 0 { 1 } else { 0 };
},
None => {
r.prev_duration = stream.cum_duration;
r.prev_media_duration = stream.cum_media_duration;
r.prev_runs = stream.cum_runs;
},
};
Expand Down Expand Up @@ -1006,15 +1010,15 @@ impl LockedDatabase {
let l = s.uncommitted[i].lock();
raw::insert_recording(
&tx, o, CompositeId::new(stream_id, s.cum_recordings + i as i32), &l)?;
new_duration += i64::from(l.duration_90k);
new_duration += i64::from(l.wall_duration_90k);
new_runs += if l.run_offset == 0 { 1 } else { 0 };
}
if s.synced_recordings > 0 {
new_ranges.entry(stream_id).or_insert(None);
stmt.execute_named(named_params!{
":stream_id": stream_id,
":cum_recordings": s.cum_recordings + s.synced_recordings as i32,
":cum_duration_90k": s.cum_duration.0 + new_duration,
":cum_media_duration_90k": s.cum_media_duration.0 + new_duration,
":cum_runs": s.cum_runs + new_runs,
})?;
}
Expand Down Expand Up @@ -1096,7 +1100,7 @@ impl LockedDatabase {
for row in s.to_delete.drain(..) {
log.deleted.push(row.id);
dir.garbage_needs_unlink.insert(row.id);
let d = recording::Duration(row.duration as i64);
let d = recording::Duration(i64::from(row.wall_duration_90k));
s.duration -= d;
adjust_days(row.start .. row.start + d, -1, &mut s.days);
}
Expand All @@ -1111,10 +1115,11 @@ impl LockedDatabase {
log.added.push(CompositeId::new(stream_id, s.cum_recordings));
let l = u.lock();
s.cum_recordings += 1;
let dur = recording::Duration(l.duration_90k.into());
s.cum_duration += dur;
let wall_dur = recording::Duration(l.wall_duration_90k.into());
let media_dur = recording::Duration(l.media_duration_90k.into());
s.cum_media_duration += media_dur;
s.cum_runs += if l.run_offset == 0 { 1 } else { 0 };
let end = l.start + dur;
let end = l.start + wall_dur;
s.add_recording(l.start .. end, l.sample_file_bytes);
}
s.synced_recordings = 0;
Expand Down Expand Up @@ -1258,7 +1263,7 @@ impl LockedDatabase {
let row = {
let l = u.lock();
if l.video_samples > 0 {
let end = l.start + recording::Duration(l.duration_90k as i64);
let end = l.start + recording::Duration(l.wall_duration_90k as i64);
if l.start > desired_time.end || end < desired_time.start {
continue; // there's no overlap with the requested range.
}
Expand Down Expand Up @@ -1337,7 +1342,7 @@ impl LockedDatabase {
Entry::Occupied(mut e) => {
let a = e.get_mut();
let new_dur = a.time.end - a.time.start +
recording::Duration(row.duration_90k as i64);
recording::Duration(row.wall_duration_90k as i64);
let needs_flush =
a.ids.end != recording_id ||
row.video_sample_entry_id != a.video_sample_entry_id ||
Expand All @@ -1354,7 +1359,7 @@ impl LockedDatabase {
bail!("stream {} recording {} has open id {} but {} has {}",
stream_id, a.ids.end - 1, a.open_id, row.id, row.open_id);
}
a.time.end.0 += row.duration_90k as i64;
a.time.end.0 += row.wall_duration_90k as i64;
a.ids.end = recording_id + 1;
a.video_samples += row.video_samples as i64;
a.video_sync_samples += row.video_sync_samples as i64;
Expand Down Expand Up @@ -1562,7 +1567,7 @@ impl LockedDatabase {
retain_bytes,
flush_if_sec,
cum_recordings,
cum_duration_90k,
cum_media_duration_90k,
cum_runs,
record
from
Expand Down Expand Up @@ -1600,7 +1605,7 @@ impl LockedDatabase {
duration: recording::Duration(0),
days: BTreeMap::new(),
cum_recordings: row.get(7)?,
cum_duration: recording::Duration(row.get(8)?),
cum_media_duration: recording::Duration(row.get(8)?),
cum_runs: row.get(9)?,
record: row.get(10)?,
uncommitted: VecDeque::new(),
Expand Down Expand Up @@ -2209,7 +2214,7 @@ mod tests {
{
let db = db.lock();
let stream = db.streams_by_id().get(&stream_id).unwrap();
let dur = recording::Duration(r.duration_90k as i64);
let dur = recording::Duration(r.wall_duration_90k as i64);
assert_eq!(Some(r.start .. r.start + dur), stream.range);
assert_eq!(r.sample_file_bytes as i64, stream.sample_file_bytes);
assert_eq!(dur, stream.duration);
Expand All @@ -2227,7 +2232,7 @@ mod tests {
rows += 1;
recording_id = Some(row.id);
assert_eq!(r.start, row.start);
assert_eq!(r.duration_90k, row.duration_90k);
assert_eq!(r.wall_duration_90k, row.wall_duration_90k);
assert_eq!(r.video_samples, row.video_samples);
assert_eq!(r.video_sync_samples, row.video_sync_samples);
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
Expand All @@ -2243,7 +2248,7 @@ mod tests {
rows += 1;
assert_eq!(recording_id, Some(row.id));
assert_eq!(r.start, row.start);
assert_eq!(r.duration_90k, row.duration);
assert_eq!(r.wall_duration_90k, row.wall_duration_90k);
assert_eq!(r.sample_file_bytes, row.sample_file_bytes);
true
}).unwrap();
Expand Down Expand Up @@ -2442,9 +2447,10 @@ mod tests {
run_offset: 0,
flags: 0,
start,
prev_duration: recording::Duration(0),
prev_media_duration: recording::Duration(0),
prev_runs: 0,
duration_90k: TIME_UNITS_PER_SEC as i32,
wall_duration_90k: TIME_UNITS_PER_SEC.try_into().unwrap(),
media_duration_90k: TIME_UNITS_PER_SEC.try_into().unwrap(),
local_time_delta: recording::Duration(0),
video_samples: 1,
video_sync_samples: 1,
Expand Down
Loading

0 comments on commit cb97ccd

Please sign in to comment.