Skip to content

Commit

Permalink
speechmaticstranscriber: store and use a start time
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuDuponchelle authored and GStreamer Marge Bot committed Oct 23, 2024
1 parent a81b7f3 commit dc1d634
Showing 1 changed file with 55 additions and 41 deletions.
96 changes: 55 additions & 41 deletions audio/speechmatics/src/transcriber/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ struct State {
last_chained_buffer_rtime: Option<gst::ClockTime>,
pad_serial: u32,
srcpads: BTreeSet<super::TranscriberSrcPad>,
start_time: Option<gst::ClockTime>,
}

impl State {}
Expand All @@ -231,6 +232,7 @@ impl Default for State {
last_chained_buffer_rtime: gst::ClockTime::NONE,
pad_serial: 0,
srcpads: BTreeSet::new(),
start_time: None,
}
}
}
Expand All @@ -254,10 +256,14 @@ impl TranscriberSrcPad {
.downcast::<super::Transcriber>()
.expect("parent is transcriber");

let Some((start_time, now)) = transcriber.imp().get_start_time_and_now() else {
// Wait for the clock to be available
return true;
};

let latency = gst::ClockTime::from_mseconds(
transcriber.imp().settings.lock().unwrap().latency_ms as u64,
);
let now = transcriber.current_running_time().unwrap();

/* First, check our pending buffers */
let mut items = vec![];
Expand All @@ -268,7 +274,9 @@ impl TranscriberSrcPad {
let mut state = self.state.lock().unwrap();

if let Some(ref mut accumulator_inner) = state.accumulator {
if now.saturating_sub(accumulator_inner.start_time) + granularity > latency {
if now.saturating_sub(accumulator_inner.start_time + start_time) + granularity
> latency
{
gst::log!(CAT, "Finally draining accumulator");
gst::debug!(
CAT,
Expand All @@ -287,9 +295,30 @@ impl TranscriberSrcPad {
state.send_eos && state.buffers.is_empty() && state.accumulator.is_none();

while let Some(buf) = state.buffers.front() {
if now.saturating_sub(buf.pts().unwrap()) + granularity > latency {
if now.saturating_sub(buf.pts().unwrap() + start_time) + granularity > latency {
/* Safe unwrap, we know we have an item */
let buf = state.buffers.pop_front().unwrap();
let mut buf = state.buffers.pop_front().unwrap();
{
let buf_mut = buf.make_mut();
let mut pts = buf_mut.pts().unwrap() + start_time;
let mut duration = buf_mut.duration().unwrap();
if let Some(position) = state.out_segment.position() {
if pts < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
pts,
position,
);
duration = duration.saturating_sub(position - pts);
pts = position;
}
}

buf_mut.set_pts(pts);
buf_mut.set_duration(duration);
}
items.push(buf);
} else {
break;
Expand Down Expand Up @@ -415,27 +444,11 @@ impl TranscriberSrcPad {
fn enqueue_translation(&self, state: &mut TranscriberSrcPadState, translation: &Translation) {
gst::log!(CAT, "Enqueuing {:?}", translation);
for item in &translation.results {
let mut start_time =
let start_time =
gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64);
let mut end_time =
let end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);

if let Some(position) = state.out_segment.position() {
if start_time < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
start_time,
position,
);
start_time = position;
if end_time < start_time {
end_time = start_time;
}
}
}

let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes());

{
Expand All @@ -452,28 +465,12 @@ impl TranscriberSrcPad {
gst::log!(CAT, "Enqueuing {:?}", transcript);
for item in &transcript.results {
if let Some(alternative) = item.alternatives.first() {
let mut start_time = gst::ClockTime::from_nseconds(
let start_time = gst::ClockTime::from_nseconds(
(item.start_time as f64 * 1_000_000_000.0) as u64,
);
let mut end_time =
let end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);

if let Some(position) = state.out_segment.position() {
if start_time < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
start_time,
position,
);
start_time = position;
if end_time < start_time {
end_time = start_time;
}
}
}

if let Some(ref mut accumulator_inner) = state.accumulator {
if item.type_ == "punctuation" {
accumulator_inner.text.push_str(&alternative.content);
Expand Down Expand Up @@ -814,13 +811,14 @@ impl Transcriber {
Ok(_) => {
let mut ret = gst::Pad::event_default(pad, Some(&*self.obj()), event);

let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
for srcpad in &state.srcpads {
if let Err(err) = srcpad.imp().stop_task() {
gst::error!(CAT, imp = self, "Failed to stop srcpad task: {}", err);
ret = false;
}
}
state.start_time = None;

ret
}
Expand Down Expand Up @@ -1280,6 +1278,22 @@ impl Transcriber {

Ok(())
}

fn get_start_time_and_now(&self) -> Option<(gst::ClockTime, gst::ClockTime)> {
let now = self.obj().current_running_time()?;

let mut state = self.state.lock().unwrap();

if state.start_time.is_none() {
state.start_time = Some(now);
for pad in state.srcpads.iter() {
let mut sstate = pad.imp().state.lock().unwrap();
sstate.out_segment.set_position(now);
}
}

Some((state.start_time.unwrap(), now))
}
}

// Implementation of gst::ChildProxy virtual methods.
Expand Down

0 comments on commit dc1d634

Please sign in to comment.