Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rdp integration (WIP) #686

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use screenpipe_core::find_ffmpeg_path;
use screenpipe_server::{
cli::{Cli, CliAudioTranscriptionEngine, CliOcrEngine, Command, PipeCommand}, start_continuous_recording, watch_pid, DatabaseManager, PipeControl, PipeManager, ResourceMonitor, Server, highlight::{Highlight,HighlightConfig}
};
use screenpipe_vision::{monitor::list_monitors};
use screenpipe_vision::{core::CaptureSource, monitor::{get_default_monitor, list_monitors}, remote_desktop::list_rdp_sessions};
#[cfg(target_os = "macos")]
use screenpipe_vision::run_ui;
use serde_json::{json, Value};
Expand Down Expand Up @@ -347,7 +347,6 @@ async fn main() -> anyhow::Result<()> {
let vision_control_clone = Arc::clone(&vision_control);
let shutdown_tx_clone = shutdown_tx.clone();
let friend_wearable_uid_clone: Option<String> = friend_wearable_uid.clone(); // Clone here
let monitor_ids_clone = monitor_ids.clone();
let ignored_windows_clone = cli.ignored_windows.clone();
let included_windows_clone = cli.included_windows.clone();

Expand All @@ -366,6 +365,13 @@ async fn main() -> anyhow::Result<()> {
loop {
let vad_engine_clone = vad_engine.clone(); // Clone it here for each iteration
let mut shutdown_rx = shutdown_tx_clone.subscribe();
let capture_source = if cli.use_remote_desktop {
list_rdp_sessions().await.unwrap().iter().map(|(id, _username)| CaptureSource::RdpSession(Box::leak(id.to_string().into_boxed_str()))).collect()
} else if cli.monitor_id.len() > 0 {
cli.monitor_id.iter().map(|id| CaptureSource::LocalMonitor(*id)).collect()
} else {
vec![CaptureSource::LocalMonitor(get_default_monitor().await.id())]
};
let recording_future = start_continuous_recording(
db_clone.clone(),
output_path_clone.clone(),
Expand All @@ -379,7 +385,7 @@ async fn main() -> anyhow::Result<()> {
Arc::new(cli.audio_transcription_engine.clone().into()),
Arc::new(cli.ocr_engine.clone().into()),
friend_wearable_uid_clone.clone(),
monitor_ids_clone.clone(),
capture_source,
cli.use_pii_removal,
cli.disable_vision,
vad_engine_clone,
Expand Down
6 changes: 4 additions & 2 deletions screenpipe-server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ pub struct Cli {
#[arg(long, default_value_t = false)]
pub enable_frame_cache: bool,

/// If enabled will capture all RDP sessions on the machine
#[arg(long)]
pub use_remote_desktop: bool,

#[command(subcommand)]
pub command: Option<Command>,

}



impl Cli {
pub fn unique_languages(&self) -> Result<Vec<Language>, String> {
let mut unique_langs = std::collections::HashSet::new();
Expand Down
26 changes: 15 additions & 11 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use screenpipe_audio::{
use screenpipe_core::pii_removal::remove_pii;
use screenpipe_core::Language;
use screenpipe_integrations::friend_wearable::initialize_friend_wearable_loop;
use screenpipe_vision::core::CaptureSource;
use screenpipe_vision::OcrEngine;
use std::collections::HashMap;
use std::path::PathBuf;
Expand All @@ -35,7 +36,7 @@ pub async fn start_continuous_recording(
audio_transcription_engine: Arc<AudioTranscriptionEngine>,
ocr_engine: Arc<OcrEngine>,
friend_wearable_uid: Option<String>,
monitor_ids: Vec<u32>,
capture_sources: Vec<CaptureSource<'static>>,
use_pii_removal: bool,
vision_disabled: bool,
vad_engine: CliVadEngine,
Expand All @@ -55,11 +56,11 @@ pub async fn start_continuous_recording(
));
}

debug!("Starting video recording for monitor {:?}", monitor_ids);
debug!("Starting video recording for {:?}", capture_sources);
let video_tasks = if !vision_disabled {
monitor_ids
.iter()
.map(|&monitor_id| {
capture_sources
.into_iter()
.map(|capture_source| {
let db_manager_video = Arc::clone(&db);
let output_path_video = Arc::clone(&output_path);
let is_running_video = Arc::clone(&vision_control);
Expand All @@ -70,7 +71,7 @@ pub async fn start_continuous_recording(

let languages = languages.clone();

debug!("Starting video recording for monitor {}", monitor_id);
debug!("Starting video recording for {}", capture_source);
vision_handle.spawn(async move {
record_video(
db_manager_video,
Expand All @@ -80,7 +81,7 @@ pub async fn start_continuous_recording(
save_text_files,
ocr_engine,
friend_wearable_uid_video,
monitor_id,
capture_source,
use_pii_removal,
&ignored_windows_video,
&include_windows_video,
Expand Down Expand Up @@ -180,7 +181,7 @@ async fn record_video(
save_text_files: bool,
ocr_engine: Arc<OcrEngine>,
_friend_wearable_uid: Option<String>,
monitor_id: u32,
capture_source: CaptureSource<'static>,
use_pii_removal: bool,
ignored_windows: &[String],
include_windows: &[String],
Expand All @@ -190,7 +191,7 @@ async fn record_video(
debug!("record_video: Starting");
let db_chunk_callback = Arc::clone(&db);
let rt = Handle::current();
let device_name = Arc::new(format!("monitor_{}", monitor_id));
let device_name = Arc::new(format!("{}", capture_source));

let new_chunk_callback = {
let db_chunk_callback = Arc::clone(&db_chunk_callback);
Expand All @@ -200,7 +201,10 @@ async fn record_video(
let db_chunk_callback = Arc::clone(&db_chunk_callback);
let device_name = Arc::clone(&device_name);
rt.spawn(async move {
if let Err(e) = db_chunk_callback.insert_video_chunk(&file_path, &device_name).await {
if let Err(e) = db_chunk_callback
.insert_video_chunk(&file_path, &device_name)
.await
{
error!("Failed to insert new video chunk: {}", e);
}
debug!("record_video: Inserted new video chunk: {}", file_path);
Expand All @@ -215,7 +219,7 @@ async fn record_video(
new_chunk_callback,
save_text_files,
Arc::clone(&ocr_engine),
monitor_id,
capture_source,
ignored_windows,
include_windows,
languages,
Expand Down
18 changes: 9 additions & 9 deletions screenpipe-server/src/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use image::ImageFormat::{self};
use log::{debug, error};
use log::{info, warn};
use screenpipe_core::{find_ffmpeg_path, Language};
use screenpipe_vision::core::CaptureSource;
use screenpipe_vision::{continuous_capture, CaptureResult, OcrEngine};
use std::path::PathBuf;
use std::process::Stdio;
Expand Down Expand Up @@ -33,7 +34,7 @@ impl VideoCapture {
new_chunk_callback: impl Fn(&str) + Send + Sync + 'static,
save_text_files: bool,
ocr_engine: Arc<OcrEngine>,
monitor_id: u32,
capture_source: CaptureSource<'static>,
ignore_list: &[String],
include_list: &[String],
languages: Vec<Language>,
Expand Down Expand Up @@ -61,15 +62,14 @@ impl VideoCapture {
interval,
save_text_files,
*ocr_engine,
monitor_id,
capture_source,
&ignore_list_clone,
&include_list_clone,
languages.clone(),
)
.await;
});


// In the _queue_thread
let _queue_thread = tokio::spawn(async move {
// Helper function to push to queue and handle errors
Expand Down Expand Up @@ -129,7 +129,7 @@ impl VideoCapture {
&output_path,
fps,
new_chunk_callback_clone,
monitor_id,
capture_source,
video_chunk_duration,
)
.await;
Expand Down Expand Up @@ -170,7 +170,7 @@ pub async fn start_ffmpeg_process(output_file: &str, fps: f64) -> Result<Child,
// TODO: issue on macos https://github.com/mediar-ai/screenpipe/pull/580
#[cfg(target_os = "macos")]
args.extend_from_slice(&["-vcodec", "libx264", "-preset", "ultrafast", "-crf", "23"]);

#[cfg(not(target_os = "macos"))]
args.extend_from_slice(&["-vcodec", "libx265", "-preset", "ultrafast", "-crf", "23"]);

Expand Down Expand Up @@ -211,7 +211,7 @@ async fn save_frames_as_video(
output_path: &str,
fps: f64,
new_chunk_callback: Arc<dyn Fn(&str) + Send + Sync>,
monitor_id: u32,
capture_source: CaptureSource<'static>,
video_chunk_duration: Duration,
) {
debug!("Starting save_frames_as_video function");
Expand All @@ -230,7 +230,7 @@ async fn save_frames_as_video(
let first_frame = wait_for_first_frame(frame_queue).await;
let buffer = encode_frame(&first_frame);

let output_file = create_output_file(output_path, monitor_id);
let output_file = create_output_file(output_path, capture_source);
new_chunk_callback(&output_file);

match start_ffmpeg_process(&output_file, fps).await {
Expand Down Expand Up @@ -289,11 +289,11 @@ fn encode_frame(frame: &CaptureResult) -> Vec<u8> {
buffer
}

fn create_output_file(output_path: &str, monitor_id: u32) -> String {
fn create_output_file(output_path: &str, capture_source: CaptureSource) -> String {
let time = Utc::now();
let formatted_time = time.format("%Y-%m-%d_%H-%M-%S").to_string();
PathBuf::from(output_path)
.join(format!("monitor_{}_{}.mp4", monitor_id, formatted_time))
.join(format!("{}_{}.mp4", capture_source, formatted_time))
.to_str()
.expect("Failed to create valid path")
.to_string()
Expand Down
7 changes: 6 additions & 1 deletion screenpipe-vision/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tracing = { workspace = true }
once_cell = "1.19"
which = "6.0"

futures = "0.3"

[dev-dependencies]
tempfile = "3.3.0"
criterion = { workspace = true }
Expand Down Expand Up @@ -84,9 +86,12 @@ harness = false
name = "screenpipe-vision-websocket"
path = "examples/websocket.rs"

[[example]]
name = "rdp"
path = "examples/rdp.rs"

[target.'cfg(target_os = "windows")'.dependencies]
windows = { version = "0.58", features = ["Graphics_Imaging", "Media_Ocr", "Storage", "Storage_Streams"] }
windows = { version = "0.58", features = ["Win32_System_ProcessStatus", "Win32_UI_WindowsAndMessaging", "Win32_UI", "Win32_Graphics_Gdi", "Win32_Graphics", "Win32_System_Memory", "Graphics_Imaging", "Media_Ocr", "Storage", "Storage_Streams", "Win32_System_RemoteDesktop", "Win32_System_Diagnostics", "Win32_System_Diagnostics_Debug", "Win32_Security", "Win32_System_Threading", "Win32_System_Registry", "Win32_System_StationsAndDesktops"] }
xcap = "0.0.12"

[target.'cfg(target_os = "macos")'.dependencies]
Expand Down
Loading
Loading