From 1b73ec3c5cae54bb1c847c68b927b9a4b527dbac Mon Sep 17 00:00:00 2001 From: Yousaf Nabi Date: Sun, 14 Jul 2024 17:36:17 +0100 Subject: [PATCH] !fix: use threads instead of tokio for loading plugins for all os --- drivers/rust/driver/src/child_process.rs | 56 ++++---- .../rust/driver/src/child_process_windows.rs | 126 ------------------ drivers/rust/driver/src/lib.rs | 3 - drivers/rust/driver/src/plugin_manager.rs | 62 +-------- drivers/rust/driver/src/plugin_models.rs | 3 - 5 files changed, 34 insertions(+), 216 deletions(-) delete mode 100644 drivers/rust/driver/src/child_process_windows.rs diff --git a/drivers/rust/driver/src/child_process.rs b/drivers/rust/driver/src/child_process.rs index 085ea1da..cdea420e 100644 --- a/drivers/rust/driver/src/child_process.rs +++ b/drivers/rust/driver/src/child_process.rs @@ -1,13 +1,12 @@ -//! Module for managing running child processes (non-Windows version) - +//! Module for managing running child processes +use std::io::{BufRead, BufReader}; +use std::process::Child; use std::sync::mpsc::channel; use std::time::Duration; use anyhow::anyhow; use serde::{Deserialize, Serialize}; -use sysinfo::{Pid, Signal, System}; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Child; +use sysinfo::{Pid, System}; use tracing::{debug, error, trace, warn}; use crate::plugin_models::PactPluginManifest; @@ -32,8 +31,7 @@ impl ChildPluginProcess { /// Start the child process and try read the startup JSON message from its standard output. pub async fn new(mut child: Child, manifest: &PactPluginManifest) -> anyhow::Result { let (tx, rx) = channel(); - let child_pid = child.id() - .ok_or_else(|| anyhow!("Could not get the child process ID"))?; + let child_pid = child.id(); let child_out = child.stdout.take() .ok_or_else(|| anyhow!("Could not get the child process standard output stream"))?; let child_err = child.stderr.take() @@ -41,20 +39,19 @@ impl ChildPluginProcess { trace!("Starting output polling tasks..."); - let mfso = manifest.clone(); - tokio::task::spawn(async move { - trace!("Starting task to poll plugin stdout"); + let plugin_name = manifest.name.clone(); + std::thread::spawn(move || { + trace!("Starting thread to poll plugin STDOUT"); + let mut startup_read = false; - let reader = BufReader::new(child_out); - let mut lines = reader.lines(); - let plugin_name = mfso.name.as_str(); - while let Ok(line) = lines.next_line().await { - if let Some(line) = line { + let mut reader = BufReader::new(child_out); + let mut line = String::with_capacity(256); + while let Ok(chars_read) = reader.read_line(&mut line) { + if chars_read > 0 { debug!("Plugin({}, {}, STDOUT) || {}", plugin_name, child_pid, line); if !startup_read { let line = line.trim(); if line.starts_with("{") { - startup_read = true; match serde_json::from_str::(line) { Ok(plugin_info) => { tx.send(Ok(ChildPluginProcess { @@ -67,25 +64,32 @@ impl ChildPluginProcess { tx.send(Err(anyhow!("Failed to read startup info from plugin - {}", err))) .unwrap_or_default() } - } + }; + startup_read = true; } } + } else { + trace!("0 bytes read from STDOUT, this indicates EOF"); + break; } } - trace!("Task to poll plugin stderr done"); + trace!("Thread to poll plugin STDOUT done"); }); let plugin_name = manifest.name.clone(); - tokio::task::spawn(async move { - trace!("Starting task to poll plugin stderr"); - let reader = BufReader::new(child_err); - let mut lines = reader.lines(); - while let Ok(line) = lines.next_line().await { - if let Some(line) = line { + std::thread::spawn(move || { + trace!("Starting thread to poll plugin STDERR"); + let mut reader = BufReader::new(child_err); + let mut line = String::with_capacity(256); + while let Ok(chars_read) = reader.read_line(&mut line) { + if chars_read > 0 { debug!("Plugin({}, {}, STDERR) || {}", plugin_name, child_pid, line); + } else { + trace!("0 bytes read from STDERR, this indicates EOF"); + break; } } - trace!("Task to poll plugin stderr done"); + trace!("Thread to poll plugin STDERR done"); }); trace!("Starting output polling tasks... DONE"); @@ -111,7 +115,7 @@ impl ChildPluginProcess { let mut s = System::new(); s.refresh_processes(); if let Some(process) = s.process(Pid::from_u32(self.child_pid as u32)) { - process.kill_with(Signal::Term); + process.kill(); } else { warn!("Child process with PID {} was not found", self.child_pid); } diff --git a/drivers/rust/driver/src/child_process_windows.rs b/drivers/rust/driver/src/child_process_windows.rs deleted file mode 100644 index f6d20753..00000000 --- a/drivers/rust/driver/src/child_process_windows.rs +++ /dev/null @@ -1,126 +0,0 @@ -//! Module for managing running child processes (Windows version) -//! -//! This uses threads to read STDOUT/STDERR from the plugin process instead of Tokio tasks. -//! -use std::io::{BufRead, BufReader}; -use std::process::Child; -use std::sync::mpsc::channel; -use std::time::Duration; - -use anyhow::anyhow; -use serde::{Deserialize, Serialize}; -use sysinfo::{Pid, System}; -use tracing::{debug, error, trace, warn}; - -use crate::plugin_models::PactPluginManifest; - -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct RunningPluginInfo { - pub port: u16, - pub server_key: String -} - -/// Running child process -#[derive(Debug, Clone)] -pub struct ChildPluginProcess { - /// OS PID of the running process - pub child_pid: usize, - /// Info on the running plugin - pub plugin_info: RunningPluginInfo -} - -impl ChildPluginProcess { - /// Start the child process and try read the startup JSON message from its standard output. - pub async fn new(mut child: Child, manifest: &PactPluginManifest) -> anyhow::Result { - let (tx, rx) = channel(); - let child_pid = child.id(); - let child_out = child.stdout.take() - .ok_or_else(|| anyhow!("Could not get the child process standard output stream"))?; - let child_err = child.stderr.take() - .ok_or_else(|| anyhow!("Could not get the child process standard error stream"))?; - - trace!("Starting output polling tasks..."); - - let plugin_name = manifest.name.clone(); - std::thread::spawn(move || { - trace!("Starting thread to poll plugin STDOUT"); - - let mut startup_read = false; - let mut reader = BufReader::new(child_out); - let mut line = String::with_capacity(256); - while let Ok(chars_read) = reader.read_line(&mut line) { - if chars_read > 0 { - debug!("Plugin({}, {}, STDOUT) || {}", plugin_name, child_pid, line); - if !startup_read { - let line = line.trim(); - if line.starts_with("{") { - match serde_json::from_str::(line) { - Ok(plugin_info) => { - tx.send(Ok(ChildPluginProcess { - child_pid: child_pid as usize, - plugin_info - })).unwrap_or_default() - } - Err(err) => { - error!("Failed to read startup info from plugin - {}", err); - tx.send(Err(anyhow!("Failed to read startup info from plugin - {}", err))) - .unwrap_or_default() - } - }; - startup_read = true; - } - } - } else { - trace!("0 bytes read from STDOUT, this indicates EOF"); - break; - } - } - trace!("Thread to poll plugin STDOUT done"); - }); - - let plugin_name = manifest.name.clone(); - std::thread::spawn(move || { - trace!("Starting thread to poll plugin STDERR"); - let mut reader = BufReader::new(child_err); - let mut line = String::with_capacity(256); - while let Ok(chars_read) = reader.read_line(&mut line) { - if chars_read > 0 { - debug!("Plugin({}, {}, STDERR) || {}", plugin_name, child_pid, line); - } else { - trace!("0 bytes read from STDERR, this indicates EOF"); - break; - } - } - trace!("Thread to poll plugin STDERR done"); - }); - - trace!("Starting output polling tasks... DONE"); - - // TODO: This timeout needs to be configurable - // TODO: Timeout is not working on Alpine, waits indefinitely if the plugin does not start properly - match rx.recv_timeout(Duration::from_secs(60)) { - Ok(value) => value, - Err(err) => { - error!("Timeout waiting to get plugin startup info: {}", err); - Err(anyhow!("Plugin process did not output the correct startup message in 60 seconds: {}", err)) - } - } - } - - /// Port the plugin is running on - pub fn port(&self) -> u16 { - self.plugin_info.port - } - - /// Kill the running plugin process - pub fn kill(&self) { - let mut s = System::new(); - s.refresh_processes(); - if let Some(process) = s.process(Pid::from_u32(self.child_pid as u32)) { - process.kill(); - } else { - warn!("Child process with PID {} was not found", self.child_pid); - } - } -} diff --git a/drivers/rust/driver/src/lib.rs b/drivers/rust/driver/src/lib.rs index 70879891..8482fdc2 100644 --- a/drivers/rust/driver/src/lib.rs +++ b/drivers/rust/driver/src/lib.rs @@ -2,10 +2,7 @@ pub mod plugin_models; pub mod plugin_manager; -#[cfg(not(windows))] mod child_process; -#[cfg(windows)] -mod child_process_windows; pub mod proto; pub mod catalogue_manager; pub mod content; diff --git a/drivers/rust/driver/src/plugin_manager.rs b/drivers/rust/driver/src/plugin_manager.rs index b0f04ff4..c4209cde 100644 --- a/drivers/rust/driver/src/plugin_manager.rs +++ b/drivers/rust/driver/src/plugin_manager.rs @@ -6,7 +6,7 @@ use std::fs::File; use std::io::{BufReader, Write}; use std::path::PathBuf; use std::process::Stdio; -#[cfg(windows)] use std::process::Command; +use std::process::Command; use std::str::from_utf8; use std::str::FromStr; use std::sync::Mutex; @@ -18,7 +18,7 @@ use itertools::Either; use lazy_static::lazy_static; use log::max_level; use maplit::hashmap; -#[cfg(not(windows))] use os_info::Type; +use os_info::Type; use pact_models::bodies::OptionalBody; use pact_models::json_utils::json_to_string; use pact_models::PactSpecification; @@ -29,13 +29,10 @@ use reqwest::Client; use semver::Version; use serde_json::Value; use sysinfo::{Pid,System}; -#[cfg(not(windows))] use sysinfo::Signal; -#[cfg(not(windows))] use tokio::process::Command; use tracing::{debug, info, trace, warn}; use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries}; -#[cfg(not(windows))] use crate::child_process::ChildPluginProcess; -#[cfg(windows)] use crate::child_process_windows::ChildPluginProcess; +use crate::child_process::ChildPluginProcess; use crate::content::ContentMismatch; use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url}; use crate::metrics::send_metrics; @@ -244,7 +241,6 @@ pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn Pac Ok(()) } -#[cfg(not(windows))] async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result { debug!("Starting plugin with manifest {:?}", manifest); @@ -257,56 +253,6 @@ async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result

Ok(PactPlugin::new(manifest, child)), - Err(err) => { - let mut s = System::new(); - s.refresh_processes(); - if let Some(process) = s.process(Pid::from_u32(child_pid)) { - process.kill_with(Signal::Term); - } else { - warn!("Child process with PID {} was not found", child_pid); - } - Err(err) - } - } -} - -#[cfg(windows)] -async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result { - debug!("Starting plugin with manifest {:?}", manifest); - - let mut path = if let Some(entry) = manifest.entry_points.get("windows") { - PathBuf::from(entry) - } else { - PathBuf::from(&manifest.entry_point) - }; if !path.is_absolute() || !path.exists() { path = PathBuf::from(manifest.plugin_dir.clone()).join(path); } @@ -468,7 +414,7 @@ pub async fn start_mock_server_v2( }; let response = plugin.start_mock_server(request).await?; debug!("Got response ${response:?}"); - + let mock_server_response = response.response .ok_or_else(|| anyhow!("Did not get a valid response from the start mock server call"))?; match mock_server_response { diff --git a/drivers/rust/driver/src/plugin_models.rs b/drivers/rust/driver/src/plugin_models.rs index a87bf801..a7193d7a 100644 --- a/drivers/rust/driver/src/plugin_models.rs +++ b/drivers/rust/driver/src/plugin_models.rs @@ -16,10 +16,7 @@ use tonic::service::Interceptor; use tonic::transport::Channel; use tracing::{debug, trace}; -#[cfg(not(windows))] use crate::child_process::ChildPluginProcess; -#[cfg(windows)] -use crate::child_process_windows::ChildPluginProcess; use crate::proto::*; use crate::proto::pact_plugin_client::PactPluginClient;