Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test DDL #161

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! [`AppExecution`]: Handles executing queries for the TUI application.

use crate::app::state::tabs::sql::Query;
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
use crate::execution::ExecutionContext;
use color_eyre::eyre::Result;
Expand All @@ -27,7 +26,6 @@ use datafusion::physical_plan::execute_stream;
use futures::StreamExt;
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -62,23 +60,25 @@ impl AppExecution {
///
/// Error handling: If an error occurs while executing a query, the error is
/// logged and execution continues
pub async fn run_sqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) -> Result<()> {
pub async fn run_sqls(
self: Arc<Self>,
sqls: Vec<String>,
sender: UnboundedSender<AppEvent>,
) -> Result<()> {
// We need to filter out empty strings to correctly determine the last query for displaying
// results.
info!("Running sqls: {:?}", sqls);
let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
let non_empty_sqls: Vec<String> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
info!("Non empty SQLs: {:?}", non_empty_sqls);
let statement_count = non_empty_sqls.len();
for (i, sql) in non_empty_sqls.into_iter().enumerate() {
info!("Running query {}", i);
let _sender = sender.clone();
let mut query =
Query::new(sql.to_string(), None, None, None, Duration::default(), None);
let start = std::time::Instant::now();
if i == statement_count - 1 {
info!("Executing last query and display results");
sender.send(AppEvent::NewExecution)?;
match self.inner.create_physical_plan(sql).await {
match self.inner.create_physical_plan(&sql).await {
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
Ok(stream) => {
self.set_result_stream(stream).await;
Expand All @@ -105,7 +105,7 @@ impl AppExecution {
}
}
Err(stream_err) => {
error!("Error creating physical plan: {:?}", stream_err);
error!("Error executing stream: {:?}", stream_err);
let elapsed = start.elapsed();
let e = ExecutionError {
query: sql.to_string(),
Expand All @@ -127,11 +127,8 @@ impl AppExecution {
}
}
} else {
match self.inner.execute_sql_and_discard_results(sql).await {
Ok(_) => {
let elapsed = start.elapsed();
query.set_execution_time(elapsed);
}
match self.inner.execute_sql_and_discard_results(&sql).await {
Ok(_) => {}
Err(e) => {
// We only log failed queries, we don't want to stop the execution of the
// remaining queries. Perhaps there should be a configuration option for
Expand All @@ -140,7 +137,6 @@ impl AppExecution {
}
}
}
_sender.send(AppEvent::QueryResult(query))?; // Send the query result to the UI
}
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion src/app/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
})
.collect();
let ctx = app.execution.session_ctx().clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
for q in queries {
info!("Executing DDL: {:?}", q);
match ctx.sql(&q).await {
Expand All @@ -178,6 +178,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> {
}
}
});
app.ddl_task = Some(handle);
}
AppEvent::NewExecution => {
app.state.sql_tab.reset_execution_results();
Expand Down
9 changes: 3 additions & 6 deletions src/app/handlers/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
info!("Running query: {}", sql);
let _event_tx = app.event_tx().clone();
let execution = Arc::clone(&app.execution);
// TODO: Extract this into function to be used in both normal and editable handler.
// Only useful if we get Ctrl / Cmd + Enter to work in editable mode though.
tokio::spawn(async move {
let sqls: Vec<&str> = sql.split(';').collect();
let _ = execution.run_sqls(sqls, _event_tx).await;
});
let sqls: Vec<String> = sql.split(';').map(|s| s.to_string()).collect();
let handle = tokio::spawn(execution.run_sqls(sqls, _event_tx));
app.state.sql_tab.set_execution_task(handle);
}
KeyCode::Right => {
let _event_tx = app.event_tx().clone();
Expand Down
36 changes: 28 additions & 8 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio_util::sync::CancellationToken;

use self::app_execution::AppExecution;
use self::handlers::{app_event_handler, crossterm_event_handler};
use self::state::tabs::sql::Query;
use crate::execution::ExecutionContext;

#[cfg(feature = "flightsql")]
Expand Down Expand Up @@ -121,7 +120,6 @@ pub enum AppEvent {
Resize(u16, u16),
ExecuteDDL(String),
NewExecution,
QueryResult(Query),
ExecutionResultsNextPage(ExecutionResultsBatch),
ExecutionResultsPreviousPage,
ExecutionResultsError(ExecutionError),
Expand All @@ -138,13 +136,15 @@ pub struct App<'app> {
event_rx: UnboundedReceiver<AppEvent>,
cancellation_token: CancellationToken,
task: JoinHandle<()>,
ddl_task: Option<JoinHandle<()>>,
}

impl<'app> App<'app> {
pub fn new(state: state::AppState<'app>, execution: ExecutionContext) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let cancellation_token = CancellationToken::new();
let task = tokio::spawn(async {});
// let ddl_task = tokio::spawn(async {});
let app_execution = Arc::new(AppExecution::new(Arc::new(execution)));

Self {
Expand All @@ -154,13 +154,18 @@ impl<'app> App<'app> {
event_tx,
cancellation_token,
execution: app_execution,
ddl_task: None,
}
}

pub fn event_tx(&self) -> UnboundedSender<AppEvent> {
self.event_tx.clone()
}

pub fn ddl_task(&mut self) -> &mut Option<JoinHandle<()>> {
&mut self.ddl_task
}

pub fn event_rx(&mut self) -> &mut UnboundedReceiver<AppEvent> {
&mut self.event_rx
}
Expand All @@ -181,6 +186,10 @@ impl<'app> App<'app> {
&self.state
}

pub fn state_mut(&mut self) -> &mut state::AppState<'app> {
&mut self.state
}

/// Enter app, optionally setup `crossterm` with UI settings such as alternative screen and
/// mouse capture, then start event loop.
pub fn enter(&mut self, ui: bool) -> Result<()> {
Expand All @@ -194,7 +203,7 @@ impl<'app> App<'app> {
ratatui::crossterm::execute!(std::io::stdout(), event::EnableBracketedPaste)?;
}
}
self.start_event_loop();
self.start_app_event_loop();
Ok(())
}

Expand Down Expand Up @@ -293,8 +302,10 @@ impl<'app> App<'app> {
});
}

/// Execute DDL from users DDL file
pub fn execute_ddl(&mut self) {
if let Some(user_dirs) = directories::UserDirs::new() {
// TODO: Move to ~/.config/ddl
let datafusion_rc_path = user_dirs
.home_dir()
.join(".datafusion")
Expand All @@ -321,11 +332,6 @@ impl<'app> App<'app> {
let _ = self.event_tx().send(AppEvent::EstablishFlightSQLConnection);
}

/// Dispatch to the appropriate event loop based on the command
pub fn start_event_loop(&mut self) {
self.start_app_event_loop()
}

/// Get the next event from event loop
pub async fn next(&mut self) -> Result<AppEvent> {
self.event_rx()
Expand All @@ -349,6 +355,20 @@ impl<'app> App<'app> {
.divider(" ")
.render(area, buf);
}

pub async fn loop_without_render(&mut self) -> Result<()> {
self.enter(true)?;
// Main loop for handling events
loop {
let event = self.next().await?;

self.handle_app_event(event)?;

if self.state.should_quit {
break Ok(());
}
}
}
}

impl Widget for &App<'_> {
Expand Down
98 changes: 12 additions & 86 deletions src/app/state/tabs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,93 +16,19 @@
// under the License.

use core::cell::RefCell;
use std::time::Duration;

use color_eyre::Result;
use datafusion::arrow::array::RecordBatch;
use datafusion::sql::sqlparser::keywords;
use ratatui::crossterm::event::KeyEvent;
use ratatui::style::palette::tailwind;
use ratatui::style::{Modifier, Style};
use ratatui::widgets::TableState;
use tokio::task::JoinHandle;
use tui_textarea::TextArea;

use crate::app::ExecutionError;
use crate::config::AppConfig;
use crate::execution::ExecutionStats;

#[derive(Clone, Debug)]
pub struct Query {
sql: String,
results: Option<Vec<RecordBatch>>,
num_rows: Option<usize>,
error: Option<String>,
execution_time: Duration,
execution_stats: Option<ExecutionStats>,
}
Comment on lines -34 to -41
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query was no longer used


impl Query {
pub fn new(
sql: String,
results: Option<Vec<RecordBatch>>,
num_rows: Option<usize>,
error: Option<String>,
execution_time: Duration,
execution_stats: Option<ExecutionStats>,
) -> Self {
Self {
sql,
results,
num_rows,
error,
execution_time,
execution_stats,
}
}

pub fn sql(&self) -> &String {
&self.sql
}

pub fn execution_time(&self) -> &Duration {
&self.execution_time
}

pub fn set_results(&mut self, results: Option<Vec<RecordBatch>>) {
self.results = results;
}

pub fn results(&self) -> &Option<Vec<RecordBatch>> {
&self.results
}

pub fn set_num_rows(&mut self, num_rows: Option<usize>) {
self.num_rows = num_rows;
}

pub fn num_rows(&self) -> &Option<usize> {
&self.num_rows
}

pub fn set_error(&mut self, error: Option<String>) {
self.error = error;
}

pub fn error(&self) -> &Option<String> {
&self.error
}

pub fn set_execution_time(&mut self, elapsed_time: Duration) {
self.execution_time = elapsed_time;
}

pub fn execution_stats(&self) -> &Option<ExecutionStats> {
&self.execution_stats
}

pub fn set_execution_stats(&mut self, stats: Option<ExecutionStats>) {
self.execution_stats = stats;
}
}

pub fn get_keywords() -> Vec<String> {
keywords::ALL_KEYWORDS
Expand All @@ -129,11 +55,11 @@ pub fn keyword_style() -> Style {
pub struct SQLTabState<'app> {
editor: TextArea<'app>,
editor_editable: bool,
query: Option<Query>,
query_results_state: Option<RefCell<TableState>>,
result_batches: Option<Vec<RecordBatch>>,
results_page: Option<usize>,
execution_error: Option<ExecutionError>,
execution_task: Option<JoinHandle<Result<()>>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Playing with the test code made me realize this would be useful to have for long running queries.

}

impl<'app> SQLTabState<'app> {
Expand All @@ -149,11 +75,11 @@ impl<'app> SQLTabState<'app> {
Self {
editor: textarea,
editor_editable: false,
query: None,
query_results_state: None,
result_batches: None,
results_page: None,
execution_error: None,
execution_task: None,
}
}

Expand Down Expand Up @@ -215,14 +141,6 @@ impl<'app> SQLTabState<'app> {
self.editor_editable
}

pub fn set_query(&mut self, query: Query) {
self.query = Some(query);
}

pub fn query(&self) -> &Option<Query> {
&self.query
}

// TODO: Create Editor struct and move this there
pub fn next_word(&mut self) {
self.editor
Expand Down Expand Up @@ -288,4 +206,12 @@ impl<'app> SQLTabState<'app> {
}
}
}

pub fn execution_task(&mut self) -> &mut Option<JoinHandle<Result<()>>> {
&mut self.execution_task
}

pub fn set_execution_task(&mut self, task: JoinHandle<Result<()>>) {
self.execution_task = Some(task);
}
}
Loading
Loading