Skip to content

Commit

Permalink
convert raw record to independent audio and video tracks with summary
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Jul 15, 2024
1 parent 13b267e commit d042e15
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 85 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ jobs:
with:
use-cross: ${{ matrix.cross }}
command: build
args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_webm
args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record

- name: Rename server
if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
Expand All @@ -227,7 +227,7 @@ jobs:
- name: Rename record
if: ${{ matrix.build_record_tool && matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
run: |
mv ./target/${{ matrix.target }}/release/convert_webm${{ matrix.extension }} convert_webm-${{ matrix.target }}${{ matrix.extension }}
mv ./target/${{ matrix.target }}/release/convert_record${{ matrix.extension }} convert_record-${{ matrix.target }}${{ matrix.extension }}
- name: Upload Artifact to Summary
if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
Expand All @@ -252,8 +252,8 @@ jobs:
uses: svenstaro/upload-release-action@v2
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: convert_webm-${{ matrix.target }}${{ matrix.extension }}
asset_name: convert_webm-${{ matrix.target }}${{ matrix.extension }}
file: convert_record-${{ matrix.target }}${{ matrix.extension }}
asset_name: convert_record-${{ matrix.target }}${{ matrix.extension }}
tag: ${{ github.ref }}
overwrite: true

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions packages/media_record/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ webm = { version = "1.1.2", optional = true }
rtp = { version = "0.11.0", optional = true }
clap = { version = "4.5", features = ["env", "derive"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = "1.0.120"

[features]
default = ["convert_webm"]
convert_webm = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"]
default = ["convert_record"]
convert_record = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"]

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

[[bin]]
name = "convert_webm"
path = "./bin/convert_webm.rs"
required-features = ["convert_webm"]
name = "convert_record"
path = "./bin/convert_record.rs"
required-features = ["convert_record"]
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{collections::HashMap, io::Write};

use clap::Parser;
use media_server_record::{RoomReader, SessionMediaWriter};
use media_server_utils::CustomUri;
use rusty_s3::{Bucket, Credentials, UrlStyle};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::channel;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

Expand Down Expand Up @@ -37,6 +39,33 @@ fn convert_s3_uri(uri: &str) -> (Bucket, Credentials, String) {
(s3, credentials, s3_sub_folder)
}

#[derive(Serialize)]
struct TrackTimeline {
path: String,
start: u64,
end: Option<u64>,
}

#[derive(Default, Serialize)]
struct TrackSummary {
timeline: Vec<TrackTimeline>,
}

#[derive(Default, Serialize)]
struct SessionSummary {
track: HashMap<String, TrackSummary>,
}

#[derive(Default, Serialize)]
struct PeerSummary {
sessions: HashMap<u64, SessionSummary>,
}

#[derive(Default, Serialize)]
struct RecordSummary {
peers: HashMap<String, PeerSummary>,
}

#[tokio::main]
async fn main() {
if std::env::var_os("RUST_LOG").is_none() {
Expand All @@ -49,6 +78,7 @@ async fn main() {
tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init();
let (s3, credentials, s3_sub_folder) = convert_s3_uri(&args.uri);

let mut record_summary = RecordSummary { peers: HashMap::new() };
let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder);
let peers = room_reader.peers().await.unwrap();
//we use channel to wait all sessions
Expand All @@ -68,17 +98,43 @@ async fn main() {
session.connect().await.expect("Should connect session record folder");
while let Some(row) = session.recv().await {
log::debug!("push session {session_id} pkt {}", row.ts);
media.push(row);
if let Some(event) = media.push(row) {
tx.send((peer_id.clone(), session_id, event)).await.expect("Should send to main");
}
}

tx.send(session.path()).await.expect("Should send to main");
log::info!("end session {session_id} loop");
});
}
}
drop(tx);

while let Some(session) = rx.recv().await {
log::info!("done {session}");
while let Some((peer_id, session_id, event)) = rx.recv().await {
let peer = record_summary.peers.entry(peer_id).or_default();
let session = peer.sessions.entry(session_id).or_default();
match event {
media_server_record::Event::TrackStart(name, ts, path) => {
let track = session.track.entry(name.0).or_default();
track.timeline.push(TrackTimeline { path, start: ts, end: None });
}
media_server_record::Event::TrackStop(name, ts) => {
if let Some(track) = session.track.get_mut(&name.0) {
if let Some(timeline) = track.timeline.last_mut() {
if timeline.end.is_none() {
timeline.end = Some(ts);
} else {
log::warn!("timeline end not empty");
}
} else {
log::warn!("track stop but timeline not found");
}
} else {
log::warn!("track stop but track not found");
}
}
}
}

let summary_json = serde_json::to_string(&record_summary).expect("Should convert to json");
let mut summary_fs = std::fs::File::create("./meta.json").expect("Should create file");
summary_fs.write_all(summary_json.as_bytes()).expect("Should write meta to file");
}
4 changes: 2 additions & 2 deletions packages/media_record/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use storage::{memory::MemoryFile, FileId, RecordFile};
use tokio::sync::mpsc::Sender;
use worker::UploadWorker;

#[cfg(feature="convert_webm")]
#[cfg(feature="convert_record")]
mod media;
mod raw_record;
mod session;
mod storage;
mod worker;

#[cfg(feature="convert_webm")]
#[cfg(feature="convert_record")]
pub use media::*;
pub use raw_record::*;

Expand Down
90 changes: 48 additions & 42 deletions packages/media_record/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{collections::HashMap, fs::File};

use media_server_protocol::{
endpoint::{TrackMeta, TrackName},
media::MediaPacket,
record::{SessionRecordEvent, SessionRecordRow},
transport::RemoteTrackId,
};
Expand All @@ -9,76 +11,80 @@ use vpx_writer::VpxWriter;
mod vpx_demuxer;
mod vpx_writer;

struct TrackWriter {
writer: usize,
trait TrackWriter {
fn push_media(&mut self, pkt_ms: u64, pkt: MediaPacket);
}

struct WriterContainer {
writer: VpxWriter<File>,
audio_inuse: bool,
video_inuse: bool,
pub enum Event {
TrackStart(TrackName, u64, String),
TrackStop(TrackName, u64),
}

pub struct SessionMediaWriter {
path: String,
writers: Vec<WriterContainer>,
tracks: HashMap<RemoteTrackId, TrackWriter>,
tracks_meta: HashMap<RemoteTrackId, (TrackName, TrackMeta)>,
tracks_writer: HashMap<RemoteTrackId, Box<dyn TrackWriter + Send>>,
}

impl SessionMediaWriter {
pub fn new(path: &str) -> Self {
log::info!("new session media writer {path}");
Self {
path: path.to_string(),
writers: vec![],
tracks: HashMap::new(),
tracks_meta: HashMap::new(),
tracks_writer: HashMap::new(),
}
}

fn get_free_writer_for(&mut self, ts: u64, is_audio: bool) -> usize {
for (index, writer) in self.writers.iter().enumerate() {
if (is_audio && !writer.audio_inuse) || (!is_audio && !writer.video_inuse) {
return index;
}
}
let index = self.writers.len();
let path = format!("{}{}-{}.webm", self.path, index, ts);
let writer = VpxWriter::new(File::create(&path).expect("Should open file"), ts);
self.writers.push(WriterContainer {
writer,
audio_inuse: false,
video_inuse: false,
});
index
}

pub fn push(&mut self, event: SessionRecordRow) {
pub fn push(&mut self, event: SessionRecordRow) -> Option<Event> {
match event.event {
SessionRecordEvent::TrackStarted(id, name, meta) => {
log::info!("track {:?} started, name {name} meta {:?}", id, meta);
self.tracks_meta.insert(id, (name, meta));
None
}
SessionRecordEvent::TrackStopped(id) => {
log::info!("track {:?} stopped", id);
let (name, _) = self.tracks_meta.remove(&id)?;
self.tracks_writer.remove(&id)?;
Some(Event::TrackStop(name, event.ts))
}
SessionRecordEvent::TrackMedia(id, media) => {
if !self.tracks.contains_key(&id) {
let writer = self.get_free_writer_for(event.ts, media.meta.is_audio());
if media.meta.is_audio() {
self.writers[writer].audio_inuse = true;
let out = if !self.tracks_writer.contains_key(&id) {
if let Some((name, _meta)) = self.tracks_meta.get(&id) {
let (file_path, writer): (String, Box<dyn TrackWriter + Send>) = match &media.meta {
media_server_protocol::media::MediaMeta::Opus { .. } => {
let file_path = format!("{}-opus-{}-{}.webm", self.path, name.0, event.ts);
let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts));
(file_path, writer)
}
media_server_protocol::media::MediaMeta::H264 { .. } => todo!(),
media_server_protocol::media::MediaMeta::Vp8 { .. } => {
let file_path = format!("{}-vp8-{}-{}.webm", self.path, name.0, event.ts);
let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts));
(file_path, writer)
}
media_server_protocol::media::MediaMeta::Vp9 { .. } => {
let file_path = format!("{}-vp9-{}-{}.webm", self.path, name.0, event.ts);
let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts));
(file_path, writer)
}
};
log::info!("create writer for track {name}");
self.tracks_writer.insert(id, writer);
Some(Event::TrackStart(name.clone(), event.ts, file_path))
} else {
self.writers[writer].video_inuse = true;
log::warn!("missing track info for pkt form track {:?}", id);
return None;
}
log::info!("write track {:?} to writer {writer}", id);
self.tracks.insert(id, TrackWriter { writer });
}
let track = self.tracks.get_mut(&id).expect("Should have track here");
if media.meta.is_audio() {
self.writers[track.writer].writer.push_opus(event.ts, media);
} else {
self.writers[track.writer].writer.push_vpx(event.ts, media);
}
None
};
let writer = self.tracks_writer.get_mut(&id).expect("Should have track here");
writer.push_media(event.ts, media);
out
}
_ => {}
_ => None,
}
}
}
Loading

0 comments on commit d042e15

Please sign in to comment.