Skip to content

Conversation

@pedro-alves-ferreira
Copy link
Contributor

This PR adds GStreamer source and sink plugins.

@pedro-alves-ferreira pedro-alves-ferreira requested review from a team and vt-tv December 17, 2025 19:25
Simao Fonseca and others added 3 commits December 18, 2025 12:39
Signed-off-by: Simao Fonseca <simaofonseca@bisect.pt>
Signed-off-by: Simao Fonseca <simao.fonseca@bisect.pt>
There is an indirect dependency that uses the "Apache-2.0 WITH LLVM-exception" license.
This commits adds that licensed to the allowed list.

Signed-off-by: Pedro Ferreira <pedro@bisect.pt>
@felixpou felixpou linked an issue Dec 18, 2025 that may be closed by this pull request
@jonasohland
Copy link
Contributor

Thanks for all the work! I was able to get a few basic pipelines working without issues. Will do a proper review of the rust code later.

@pedro-alves-ferreira
Copy link
Contributor Author

Thanks for the feedback, @jonasohland. There's still some work to be done around reopening the flow, clean shutdown, etc. but we're almost there.

@felixpou felixpou added this to the v1.0 milestone Dec 22, 2025
Copy link
Contributor

@mlefebvre1 mlefebvre1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only went through half of the PR. But here's my comment so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could be moved and maintained in the mxl crate.

pub parents: Vec<String>,
}

pub struct FlowDef {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use an enum instead? A flow can't be both at the same time.

pub format: String,
pub tags: HashMap<String, Vec<String>>,
pub label: String,
pub id: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use the uuid crate to add validation.

if let Ok(settings) = self.settings.lock() {
match pspec.name() {
"video-flow" => settings.video_flow.to_value(),
"audio-flow" => settings.video_flow.to_value(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be settings.audio_flow.to_value()

fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("video-flow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not the attribute be named video-flow-id?
Same applies to audio-flow

}
}

trace!("PTS: {:?} GST-CURRENT: {:?}", buffer.pts(), ts_gst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general comment, using structured logging is better for logs processing.

trace!(pts=?buffer.pts(), ts_gst=?ts_gst, buffer=?buffer, "Produced buffer");


trace!("PTS: {:?} GST-CURRENT: {:?}", buffer.pts(), ts_gst);
trace!("Produced buffer {:?}", buffer);
if video_state.frame_counter == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can a comment be added here to explain this logic?

.sample_rate()
.map_err(|_| gst::FlowError::Error)?;

let batch_size = DEFAULT_BATCH_SIZE.min(continuous_flow_info.bufferLength / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do note that reader_info.config.common().max_commit_batch_size_hint() indicates the maximum number of samples a writer can commit at once. This DEFAULT_BATCH_SIZE could be bigger if left unchecked, thus violating that contract.

);

let mut head = reader_info.runtime.head_index();
wait_for_sample(head, batch, audio_state)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we just don't use samples_reader.get_samples() to wait for samples, use error variants to determine if we are OutOfRangeTooLate or OutOfRangeTooEarly, and act accordingly.


let buffer = build_buffer(pts, samples, is_discont, interleaved)?;

audio_state.batch_counter += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to keep synchronized a batch_counter with the index. Unless I'm missing something, we can infer it from the index.

end_hms,
elapsed_real.as_millis()
);
let _ = initial_info;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be removed

let _ = initial_info;
let initial_info = &state.initial_info;

let pts = gst::ClockTime::from_nseconds(pts as u64);

let mut pts = pts + initial_info.gst_time;
let _ = initial_info;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove let _ = initial_info;
You are already dropping the shared reference when you create the exclusive reference at line 107.

.map_err(|_| gst::FlowError::Error)?,
);
let video_state = state.video.as_mut().ok_or(gst::FlowError::Error)?;
let gst_time = mxlsink
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain where and the how the clock type is set from this plugin. If it is using GST_CLOCK_TYPE_MONOTONIC, in the rare case of a leap of second, there could be an offset between this plugin producer and a consumer that uses mxl/flow.h (which uses CLOCK_TAI instead).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin does not propose any clock. It uses the pipeline's clock. It is the user's responsibility to set one or to use the default, which is monotonic, as explained in this documentation.
In regards to the leap of second case, it is to my understanding that monotonic clocks won't cause an offset between the producer and consumer, that case occurs in real-time clocks.

index,
current_index,
gst_time,
if pts > gst_time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pts.saturating_sub(gst_time)

access
.commit(copy_len as u16)
.map_err(|_| gst::FlowError::Error)?;
trace!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, is this log still relevant? Why would you need to measure the time it takes copy and commit?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this was only useful for an early development bug and probably should’ve been removed sooner.

let mut copy_len = std::cmp::min(payload.len(), data.len());
let commit_time = Instant::now();
payload[..copy_len].copy_from_slice(&data[..copy_len]);
if copy_len > access.total_slices() as usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're comparing the payload size (in bytes) vs the number of slices. This will always be true ...

If you still want to do the check, you will have to obtain the number of bytes per slice which you can get from the field sliceSizes of DiscreteFlowConfigInfo.

.map_err(|_| gst::FlowError::Error)?
+ initial_info.index;

if write_index > current_index + buffer_length {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use std::cmp::min instead?

write_index, sample_rate.numerator, sample_rate.denominator
);

let max_chunk = (buffer_length / 2) as usize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's not possible to configure a max size for the gstreamer buffers we receive?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible, you can't restrict the max size of the buffers you receive in GStreamer.

state.audio = Some(AudioState {
writer,
bit_depth,
batch_size: (rate as usize / 100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using mxl flow_info you should be able to retrieve the max_commit_batch_size_hint instead of arbitrary setting this.

@pedro-alves-ferreira
Copy link
Contributor Author

Thanks, @mlefebvre1! We will review your comments a.s.a.p.

@mlefebvre1
Copy link
Contributor

Thanks, @mlefebvre1! We will review your comments a.s.a.p.

No worries @pedro-alves-ferreira, most of the comments are suggestions aimed at improving code quality and I would be fine with them being addressed after v1.0. You could focus on resolving these comments for v1.0 :
#291 (comment)
#291 (comment)
#291 (comment)
#291 (comment)
#291 (comment)
#291 (comment)

Signed-off-by: Simaofonseca <simao.fonseca@bisect.pt>
Signed-off-by: Simao Fonseca <simao.fonseca@bisect.pt>
@jonasohland
Copy link
Contributor

Thanks for the feedback, @jonasohland. There's still some work to be done around reopening the flow, clean shutdown, etc. but we're almost there.

Flow deletion, reopening and those things should be all automatic now that #290 has been merged. In the rust api everything should be RAII now. Hope that helps!

@pedro-alves-ferreira
Copy link
Contributor Author

Thanks for the feedback, @jonasohland. There's still some work to be done around reopening the flow, clean shutdown, etc. but we're almost there.

Flow deletion, reopening and those things should be all automatic now that #290 has been merged. In the rust api everything should be RAII now. Hope that helps!

Yes, that was quite an improvement. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

GStreamer/rust modules: video and audio

6 participants