Skip to content

Commit

Permalink
feat(driver): support flight sql (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Apr 11, 2023
1 parent 6f15e4c commit a8436e9
Show file tree
Hide file tree
Showing 22 changed files with 805 additions and 307 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ jobs:
with:
cache-key: integration
- run: make -C tests
- run: make -C tests test-flight-sql
- run: sudo chown -R runner ~/.cargo/registry
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion core/tests/it/stage.rs → core/tests/core/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn get_dsn(presigned: bool) -> String {
}

async fn upload_to_stage(client: APIClient) {
let mut file = std::fs::File::open("tests/it/data/sample.csv").unwrap();
let mut file = std::fs::File::open("tests/core/data/sample.csv").unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).unwrap();
let path = chrono::Utc::now().format("%Y%m%d%H%M%S").to_string();
Expand Down
20 changes: 19 additions & 1 deletion driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "databend-driver"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
license = "Apache-2.0"
description = "Databend Driver for Rust"
Expand All @@ -16,14 +16,32 @@ rustls = ["databend-client/rustls"]
# Enable native-tls for TLS support
native-tls = ["databend-client/native-tls"]

flight-sql = ["dep:arrow", "dep:arrow-array", "dep:arrow-cast", "dep:arrow-flight", "dep:arrow-schema", "dep:tonic"]

[dependencies]
anyhow = "1.0.70"
async-trait = "0.1.68"
chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
databend-client = { version = "0.1.2", path = "../core" }
dyn-clone = "1.0.11"
http = "0.2.9"
serde = { version = "1.0.156", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.94", default-features = false, features = ["std"] }
tokio = { version = "1.26.0", features = ["macros"] }
tokio-stream = "0.1.12"
url = { version = "2.3.1", default-features = false }

arrow = { version = "36.0.0", optional = true }
arrow-array = { version = "36.0.0", optional = true }
arrow-cast = { version = "36.0.0", features = ["prettyprint"], optional = true }
arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"], optional = true }
arrow-schema = { version = "36.0.0", optional = true }
tonic = { version = "0.8.3", default-features = false, features = [
"transport",
"codegen",
"tls",
"tls-roots",
"prost",
], optional = true }

[dev-dependencies]
65 changes: 18 additions & 47 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,35 @@
use anyhow::Result;
use async_trait::async_trait;
use dyn_clone::DynClone;
use url::Url;

#[cfg(feature = "flight-sql")]
use crate::flight_sql::FlightSQLConnection;

use crate::rest_api::RestAPIConnection;
use crate::rows::{Row, RowIterator};
use crate::rows::{Row, RowIterator, RowProgressIterator};

#[async_trait]
pub trait Connection: DynClone {
async fn exec(&self, sql: &str) -> Result<()>;
async fn query_iter(&self, sql: &str) -> Result<RowIterator>;
async fn query_row(&self, sql: &str) -> Result<Option<Row>>;
async fn exec(&mut self, sql: &str) -> Result<()>;
async fn query_iter(&mut self, sql: &str) -> Result<RowIterator>;
async fn query_iter_with_progress(&mut self, sql: &str) -> Result<RowProgressIterator>;
async fn query_row(&mut self, sql: &str) -> Result<Option<Row>>;
}
dyn_clone::clone_trait_object!(Connection);

pub fn new_connection(dsn: &str) -> Result<Box<dyn Connection>> {
let uri = dsn.parse::<http::Uri>()?;
let scheme = uri.scheme_str();
match scheme {
None => Err(anyhow::anyhow!("Invalid DSN: {}", dsn)),
Some("databend") | Some("databend+http") | Some("databend+https") => {
pub async fn new_connection(dsn: &str) -> Result<Box<dyn Connection>> {
let u = Url::parse(dsn)?;
match u.scheme() {
"databend" | "databend+http" | "databend+https" => {
let conn = RestAPIConnection::try_create(dsn)?;
Ok(Box::new(conn))
}
Some(s) => Err(anyhow::anyhow!("Unsupported scheme: {}", s)),
}
}

pub enum DatabendConnection {
RestAPI(RestAPIConnection),
}

impl DatabendConnection {
pub fn try_create(dsn: &str) -> Result<Self> {
let uri = dsn.parse::<http::Uri>()?;
let scheme = uri.scheme_str();
match scheme {
None => Err(anyhow::anyhow!("Invalid DSN: {}", dsn)),
Some("databend") | Some("databend+http") | Some("databend+https") => {
let conn = RestAPIConnection::try_create(dsn)?;
Ok(Self::RestAPI(conn))
}
Some(s) => Err(anyhow::anyhow!("Unsupported scheme: {}", s)),
}
}

pub async fn exec(&self, sql: &str) -> Result<()> {
match self {
Self::RestAPI(conn) => conn.exec(sql).await,
}
}

pub async fn query_iter(&self, sql: &str) -> Result<RowIterator> {
match self {
Self::RestAPI(conn) => conn.query_iter(sql).await,
}
}

pub async fn query_row(&self, sql: &str) -> Result<Option<Row>> {
match self {
Self::RestAPI(conn) => conn.query_row(sql).await,
#[cfg(feature = "flight-sql")]
"databend+flight" | "databend+grpc" => {
let conn = FlightSQLConnection::try_create(dsn).await?;
Ok(Box::new(conn))
}
_ => Err(anyhow::anyhow!("Unsupported scheme: {}", u.scheme())),
}
}
Loading

0 comments on commit a8436e9

Please sign in to comment.