Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add log for query & page #302

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
clap = { version = "4.3", features = ["derive", "env"] }
comfy-table = "7.0"
csv = "1.2"
fern = "0.6"
indicatif = "0.17"
log = "0.4"
logos = "0.13"
once_cell = "1.18"
rustyline = "12.0"
Expand All @@ -37,6 +39,7 @@ tokio = { version = "1.28", features = [
] }
tokio-stream = "0.1"
toml = "0.8"
tracing-appender = "0.2"
unicode-segmentation = "1.10"
url = { version = "2.4", default-features = false }

Expand Down
13 changes: 13 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod config;
mod display;
mod helper;
mod session;
mod trace;

use std::{
collections::BTreeMap,
Expand All @@ -29,6 +30,7 @@ use crate::config::OutputQuoteStyle;
use anyhow::{anyhow, Result};
use clap::{ArgAction, CommandFactory, Parser, ValueEnum};
use config::{Config, OutputFormat, Settings, TimeOption};
use log::info;
use once_cell::sync::Lazy;

static VERSION: Lazy<String> = Lazy::new(|| {
Expand Down Expand Up @@ -172,6 +174,9 @@ struct Args {
help = "Only show execution time without results, will implicitly set output format to `null`."
)]
time: Option<TimeOption>,

#[clap(short = 'l', default_value = "info", long)]
log_level: String,
}

/// Parse a single key-value pair
Expand Down Expand Up @@ -334,6 +339,14 @@ pub async fn main() -> Result<()> {

let mut session = session::Session::try_new(dsn, settings, is_repl).await?;

let log_dir = format!(
"{}/.bendsql",
std::env::var("HOME").unwrap_or_else(|_| ".".to_string())
);

let _guards = trace::init_logging(&log_dir, &args.log_level).await?;
info!("bendsql version: {:?}", VERSION);

if is_repl {
session.handle_repl().await;
return Ok(());
Expand Down
58 changes: 58 additions & 0 deletions cli/src/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use log::LevelFilter;
use std::io::BufWriter;
use std::io::Write;
use std::str::FromStr;

use anyhow::Result;
use tracing_appender::rolling::RollingFileAppender;
use tracing_appender::rolling::Rotation;

#[allow(dyn_drop)]
pub async fn init_logging(
dir: &str,
level: &str,
) -> Result<Vec<Box<dyn Drop + Send + Sync + 'static>>> {
let mut guards: Vec<Box<dyn Drop + Send + Sync + 'static>> = Vec::new();
let mut logger = fern::Dispatch::new();

let rolling = RollingFileAppender::new(Rotation::DAILY, dir, "bendsql");
let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling);
let buffered_non_blocking = BufWriter::with_capacity(64 * 1024 * 1024, non_blocking);

guards.push(Box::new(flush_guard));
logger = logger.chain(
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{}] - {} - [{}] {}",
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
record.level(),
record.target(),
message
))
})
.level(LevelFilter::from_str(level)?)
.chain(Box::new(buffered_non_blocking) as Box<dyn Write + Send>),
);

if logger.apply().is_err() {
eprintln!("logger has already been set");
return Ok(Vec::new());
}

Ok(guards)
}
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ native-tls = ["reqwest/native-tls"]

[dependencies]
http = "0.2"
log = "0.4"
once_cell = "1.18"
percent-encoding = "2.3"
reqwest = { version = "0.11", default-features = false, features = ["json", "multipart", "stream"] }
Expand Down
12 changes: 12 additions & 0 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use http::StatusCode;
use log::info;
use once_cell::sync::Lazy;
use percent_encoding::percent_decode_str;
use reqwest::header::HeaderMap;
Expand Down Expand Up @@ -206,6 +207,7 @@ impl APIClient {
}

pub async fn start_query(&self, sql: &str) -> Result<QueryResponse> {
info!("start query: {}", sql);
let session_settings = self.make_session().await;
let req = QueryRequest::new(sql)
.with_pagination(self.make_pagination())
Expand Down Expand Up @@ -253,6 +255,7 @@ impl APIClient {
}

pub async fn query_page(&self, query_id: &str, next_uri: &str) -> Result<QueryResponse> {
info!("query page: {}", next_uri);
let endpoint = self.endpoint.join(next_uri)?;
let headers = self.make_headers(query_id).await?;
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(3);
Expand Down Expand Up @@ -286,6 +289,7 @@ impl APIClient {
}

pub async fn kill_query(&self, query_id: &str, kill_uri: &str) -> Result<()> {
info!("kill query: {}", kill_uri);
let endpoint = self.endpoint.join(kill_uri)?;
let headers = self.make_headers(query_id).await?;
let resp = self
Expand All @@ -307,6 +311,7 @@ impl APIClient {
}

pub async fn wait_for_query(&self, resp: QueryResponse) -> Result<QueryResponse> {
info!("wait for query: {}", resp.id);
if let Some(next_uri) = &resp.next_uri {
let schema = resp.schema;
let mut data = resp.data;
Expand All @@ -324,6 +329,7 @@ impl APIClient {
}

pub async fn query(&self, sql: &str) -> Result<QueryResponse> {
info!("query: {}", sql);
let resp = self.start_query(sql).await?;
self.wait_for_query(resp).await
}
Expand Down Expand Up @@ -388,6 +394,10 @@ impl APIClient {
file_format_options: BTreeMap<&str, &str>,
copy_options: BTreeMap<&str, &str>,
) -> Result<QueryResponse> {
info!(
"insert with stage: {}, format: {:?}, copy: {:?}",
sql, file_format_options, copy_options
);
let session_settings = self.make_session().await;
let stage_attachment = Some(StageAttachmentConfig {
location: stage,
Expand Down Expand Up @@ -440,6 +450,7 @@ impl APIClient {
}

async fn get_presigned_upload_url(&self, stage: &str) -> Result<PresignedResponse> {
info!("get presigned upload url: {}", stage);
let sql = format!("PRESIGN UPLOAD {}", stage);
let resp = self.query(&sql).await?;
if resp.data.len() != 1 {
Expand Down Expand Up @@ -486,6 +497,7 @@ impl APIClient {
data: Reader,
size: u64,
) -> Result<()> {
info!("upload to stage with stream: {}, size: {}", stage, size);
let endpoint = self.endpoint.join("v1/upload_to_stage")?;
let location = StageLocation::try_from(stage)?;
let query_id = self.gen_query_id();
Expand Down
1 change: 1 addition & 0 deletions driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
csv = "1.3"
dyn-clone = "1.0"
glob = "0.3"
log = "0.4"
percent-encoding = "2.3"
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.28", features = ["macros"] }
Expand Down
15 changes: 15 additions & 0 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use log::info;
use tokio::fs::File;
use tokio_stream::Stream;

Expand Down Expand Up @@ -51,6 +52,7 @@ impl Connection for RestAPIConnection {
}

async fn exec(&self, sql: &str) -> Result<i64> {
info!("exec: {}", sql);
let mut resp = self.client.start_query(sql).await?;
while let Some(next_uri) = resp.next_uri {
resp = self.client.query_page(&resp.id, &next_uri).await?;
Expand All @@ -59,18 +61,21 @@ impl Connection for RestAPIConnection {
}

async fn query_iter(&self, sql: &str) -> Result<RowIterator> {
info!("query iter: {}", sql);
let rows_with_progress = self.query_iter_ext(sql).await?;
let rows = rows_with_progress.filter_rows().await;
Ok(rows)
}

async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator> {
info!("query iter ext: {}", sql);
let resp = self.client.start_query(sql).await?;
let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?;
Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows)))
}

async fn query_row(&self, sql: &str) -> Result<Option<Row>> {
info!("query row: {}", sql);
let resp = self.client.start_query(sql).await?;
let resp = self.wait_for_data(resp).await?;
match resp.kill_uri {
Expand All @@ -91,6 +96,7 @@ impl Connection for RestAPIConnection {
}

async fn get_presigned_url(&self, operation: &str, stage: &str) -> Result<PresignedResponse> {
info!("get presigned url: {} {}", operation, stage);
let sql = format!("PRESIGN {} {}", operation, stage);
let row = self.query_row(&sql).await?.ok_or(Error::InvalidResponse(
"Empty response from server for presigned request".to_string(),
Expand Down Expand Up @@ -118,6 +124,10 @@ impl Connection for RestAPIConnection {
file_format_options: Option<BTreeMap<&str, &str>>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
info!(
"load data: {}, size: {}, format: {:?}, copy: {:?}",
sql, size, file_format_options, copy_options
);
let now = chrono::Utc::now()
.timestamp_nanos_opt()
.ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?;
Expand All @@ -140,6 +150,10 @@ impl Connection for RestAPIConnection {
mut format_options: BTreeMap<&str, &str>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
info!(
"load file: {}, file: {:?}, format: {:?}, copy: {:?}",
sql, fp, format_options, copy_options
);
let file = File::open(fp).await?;
let metadata = file.metadata().await?;
let data = Box::new(file);
Expand All @@ -157,6 +171,7 @@ impl Connection for RestAPIConnection {
}

async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats> {
info!("stream load: {}, length: {:?}", sql, data.len());
let mut wtr = csv::WriterBuilder::new().from_writer(vec![]);
for row in data {
wtr.write_record(row)
Expand Down