Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add query resolvers into web UI #9182

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tokio-stream = { version = "0.1.12", features = ["net"] }
tokio-util = { version = "0.7.7", features = ["compat"] }
tonic = { version = "0.11.0", features = ["transport"] }
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["cors"] }
tracing-appender = "0.2.2"
tracing-chrome = "0.7.1"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
Expand Down
10 changes: 7 additions & 3 deletions crates/turborepo-lib/src/commands/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, sync::Arc};

use async_graphql::{EmptyMutation, EmptySubscription, Schema, ServerError};
use miette::{Diagnostic, Report, SourceSpan};
Expand All @@ -10,7 +10,7 @@ use crate::{
cli::Command,
commands::{run::get_signal, CommandBase},
query,
query::{Error, Query},
query::{Error, RepositoryQuery},
run::builder::RunBuilder,
signal::SignalHandler,
};
Expand Down Expand Up @@ -84,7 +84,11 @@ pub async fn run(
fs::read_to_string(AbsoluteSystemPathBuf::from_unknown(run.repo_root(), query))?
};

let schema = Schema::new(Query::new(run), EmptyMutation, EmptySubscription);
let schema = Schema::new(
RepositoryQuery::new(Arc::new(run)),
EmptyMutation,
EmptySubscription,
);

let result = schema.execute(&query).await;
if result.errors.is_empty() {
Expand Down
12 changes: 7 additions & 5 deletions crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::future::Future;
use std::{future::Future, sync::Arc};

use tracing::error;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand Down Expand Up @@ -40,10 +40,12 @@ pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result<i3

let run_fut = async {
let (analytics_sender, analytics_handle) = run_builder.start_analytics();
let mut run = run_builder
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?;
let run = Arc::new(
run_builder
.with_analytics_sender(analytics_sender)
.build(&handler, telemetry)
.await?,
);

let (sender, handle) = run.start_ui()?.unzip();

Expand Down
18 changes: 11 additions & 7 deletions crates/turborepo-lib/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ pub enum Error {
Path(#[from] turbopath::PathError),
}

pub struct Query {
pub struct RepositoryQuery {
run: Arc<Run>,
}

impl Query {
pub fn new(run: Run) -> Self {
Self { run: Arc::new(run) }
impl RepositoryQuery {
pub fn new(run: Arc<Run>) -> Self {
Self { run }
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ impl PackagePredicate {
}

#[Object]
impl Query {
impl RepositoryQuery {
async fn affected_packages(
&self,
base: Option<String>,
Expand Down Expand Up @@ -339,12 +339,16 @@ impl Query {
}
}

async fn graphiql() -> impl IntoResponse {
pub async fn graphiql() -> impl IntoResponse {
response::Html(GraphiQLSource::build().endpoint("/").finish())
}

pub async fn run_server(run: Run, signal: SignalHandler) -> Result<(), Error> {
let schema = Schema::new(Query::new(run), EmptyMutation, EmptySubscription);
let schema = Schema::new(
RepositoryQuery::new(Arc::new(run)),
EmptyMutation,
EmptySubscription,
);
let app = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));

let subscriber = signal.subscribe().ok_or(Error::NoSignalHandler)?;
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/query/package_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

11 changes: 6 additions & 5 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod scope;
pub(crate) mod summary;
pub mod task_access;
pub mod task_id;
mod ui;
pub mod watch;

use std::{
Expand Down Expand Up @@ -211,7 +212,7 @@ impl Run {
&& tui::terminal_big_enough()?)
}

pub fn start_ui(&self) -> UIResult<UISender> {
pub fn start_ui(self: &Arc<Self>) -> UIResult<UISender> {
// Print prelude here as this needs to happen before the UI is started
if self.should_print_prelude {
self.print_run_prelude();
Expand All @@ -227,10 +228,10 @@ impl Run {
.map(|res| res.map(|(sender, handle)| (UISender::Wui(sender), handle))),
}
}
fn start_web_ui(&self) -> WuiResult {
fn start_web_ui(self: &Arc<Self>) -> WuiResult {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let handle = tokio::spawn(turborepo_ui::wui::server::start_server(rx));
let handle = tokio::spawn(ui::start_web_ui_server(rx, self.clone()));

Ok(Some((WebUISender { tx }, handle)))
}
Expand Down Expand Up @@ -260,7 +261,7 @@ impl Run {
}
}

pub async fn run(&mut self, ui_sender: Option<UISender>, is_watch: bool) -> Result<i32, Error> {
pub async fn run(&self, ui_sender: Option<UISender>, is_watch: bool) -> Result<i32, Error> {
let skip_cache_writes = self.opts.runcache_opts.skip_writes;
if let Some(subscriber) = self.signal_handler.subscribe() {
let run_cache = self.run_cache.clone();
Expand Down Expand Up @@ -356,7 +357,7 @@ impl Run {
self.engine.task_definitions(),
&self.repo_root,
&self.run_telemetry,
&mut self.daemon,
&self.daemon,
)?;

let root_workspace = self
Expand Down
55 changes: 55 additions & 0 deletions crates/turborepo-lib/src/run/ui.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::Arc;

use async_graphql::{EmptyMutation, EmptySubscription, MergedObject, Schema};
use async_graphql_axum::GraphQL;
use axum::{http::Method, routing::get, Router};
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use turborepo_ui::wui::{event::WebUIEvent, server::SharedState};

use crate::{query, query::graphiql, run::Run};

pub async fn start_web_ui_server(
rx: tokio::sync::mpsc::UnboundedReceiver<WebUIEvent>,
run: Arc<Run>,
) -> Result<(), turborepo_ui::Error> {
let state = SharedState::default();
let subscriber = turborepo_ui::wui::subscriber::Subscriber::new(rx);
tokio::spawn(subscriber.watch(state.clone()));

run_server(state.clone(), run).await?;

Ok(())
}

#[derive(MergedObject)]
struct Query(turborepo_ui::wui::RunQuery, query::RepositoryQuery);

async fn run_server(state: SharedState, run: Arc<Run>) -> Result<(), turborepo_ui::Error> {
let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods([Method::GET, Method::POST])
.allow_headers(Any)
// allow requests from any origin
.allow_origin(Any);

let web_ui_query = turborepo_ui::wui::RunQuery::new(state.clone());
let turbo_query = query::RepositoryQuery::new(run);
let combined_query = Query(web_ui_query, turbo_query);

let schema = Schema::new(combined_query, EmptyMutation, EmptySubscription);
let app = Router::new()
.route("/", get(graphiql).post_service(GraphQL::new(schema)))
.layer(cors);

axum::serve(
TcpListener::bind("127.0.0.1:8000")
.await
.map_err(turborepo_ui::wui::Error::Server)?,
app,
)
.await
.map_err(turborepo_ui::wui::Error::Server)?;

Ok(())
}
19 changes: 11 additions & 8 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ChangedPackages {
}

pub struct WatchClient {
run: Run,
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<PersistentRunHandle>,
connector: DaemonConnector,
Expand Down Expand Up @@ -130,9 +130,11 @@ impl WatchClient {
execution_args: execution_args.clone(),
});

let run = RunBuilder::new(new_base)?
.build(&handler, telemetry.clone())
.await?;
let run = Arc::new(
RunBuilder::new(new_base)?
.build(&handler, telemetry.clone())
.await?,
);

let watched_packages = run.get_relevant_packages();

Expand Down Expand Up @@ -288,7 +290,7 @@ impl WatchClient {
let signal_handler = self.handler.clone();
let telemetry = self.telemetry.clone();

let mut run = RunBuilder::new(new_base)?
let run = RunBuilder::new(new_base)?
.with_entrypoint_packages(packages)
.hide_prelude()
.build(&signal_handler, telemetry)
Expand Down Expand Up @@ -331,7 +333,8 @@ impl WatchClient {
self.run = RunBuilder::new(base.clone())?
.hide_prelude()
.build(&self.handler, self.telemetry.clone())
.await?;
.await?
.into();

self.watched_packages = self.run.get_relevant_packages();

Expand All @@ -357,7 +360,7 @@ impl WatchClient {
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let mut persistent_run = self.run.create_run_for_persistent_tasks();
let persistent_run = self.run.create_run_for_persistent_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
Expand All @@ -369,7 +372,7 @@ impl WatchClient {
});

// But we still run the regular tasks blocking
let mut non_persistent_run = self.run.create_run_without_persistent_tasks();
let non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run(self.ui_sender.clone(), true).await?)
} else {
Ok(self.run.run(self.ui_sender.clone(), true).await?)
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/task_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PackageInputsHashes {
task_definitions: &HashMap<TaskId<'static>, TaskDefinition>,
repo_root: &AbsoluteSystemPath,
telemetry: &GenericEventBuilder,
daemon: &mut Option<DaemonClient<DaemonConnector>>,
daemon: &Option<DaemonClient<DaemonConnector>>,
) -> Result<PackageInputsHashes, Error> {
tracing::trace!(scm_manual=%scm.is_manual(), "scm running in {} mode", if scm.is_manual() { "manual" } else { "git" });

Expand Down
1 change: 0 additions & 1 deletion crates/turborepo-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

tower-http = { version = "0.5.2", features = ["cors"] }
tracing = { workspace = true }
tui-term = { workspace = true }
turbopath = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/turborepo-ui/src/wui/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Web UI for Turborepo. Creates a WebSocket server that can be subscribed to
//! by a web client to display the status of tasks.

mod event;
pub mod event;
pub mod sender;
pub mod server;
mod subscriber;
pub mod subscriber;

use event::WebUIEvent;
pub use server::RunQuery;
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
Loading
Loading