Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/python): add blocking client #308

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@ maturin develop

## Usage


### Blocking

```python
from databend_driver import BlockingDatabendClient

client = BlockingDatabendClient('databend+http://root:root@localhost:8000/?sslmode=disable')
conn = client.get_conn()
conn.exec(
"""
CREATE TABLE test (
i64 Int64,
u64 UInt64,
f64 Float64,
s String,
s2 String,
d Date,
t DateTime
)
"""
)
rows = conn.query_iter("SELECT * FROM test")
for row in rows:
print(row.values())
```

### Asyncio

```python
import asyncio
from databend_driver import AsyncDatabendClient
Expand Down Expand Up @@ -58,6 +86,27 @@ class AsyncDatabendConnection:
async def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ...
```

### BlockingDatabendClient

```python
class BlockingDatabendClient:
def __init__(self, dsn: str): ...
def get_conn(self) -> BlockingDatabendConnection: ...
```

### BlockingDatabendConnection

```python
class BlockingDatabendConnection:
def info(self) -> ConnectionInfo: ...
def version(self) -> str: ...
def exec(self, sql: str) -> int: ...
def query_row(self, sql: str) -> Row: ...
def query_iter(self, sql: str) -> RowIterator: ...
def stream_load(self, sql: str, data: list[list[str]]) -> ServerStats: ...
```


### Row

```python
Expand All @@ -69,9 +118,13 @@ class Row:

```python
class RowIterator:
def schema(self) -> Schema: ...

def __iter__(self) -> RowIterator: ...
def __next__(self) -> Row: ...

def __aiter__(self) -> RowIterator: ...
async def __anext__(self) -> Row: ...
def schema(self) -> Schema: ...
```

### Field
Expand Down Expand Up @@ -131,8 +184,14 @@ class ConnectionInfo:

## Development

```
cd tests
make up
```

```shell
cd bindings/python
pipenv install --dev
maturin develop
pipenv run behave tests
pipenv run maturin develop
pipenv run behave tests/*
```
106 changes: 106 additions & 0 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;

use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats};

#[pyclass(module = "databend_driver")]
pub struct AsyncDatabendClient(databend_driver::Client);

#[pymethods]
impl AsyncDatabendClient {
#[new]
#[pyo3(signature = (dsn))]
pub fn new(dsn: String) -> PyResult<Self> {
let client = databend_driver::Client::new(dsn);
Ok(Self(client))
}

pub fn get_conn<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let conn = this.get_conn().await.unwrap();
Ok(AsyncDatabendConnection(conn))
})
}
}

#[pyclass(module = "databend_driver")]
pub struct AsyncDatabendConnection(Box<dyn databend_driver::Connection>);

#[pymethods]
impl AsyncDatabendConnection {
pub fn info<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let info = this.info().await;
Ok(ConnectionInfo::new(info))
})
}

pub fn version<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let version = this.version().await.unwrap();
Ok(version)
})
}

pub fn exec<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let res = this.exec(&sql).await.unwrap();
Ok(res)
})
}

pub fn query_row<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let row = this.query_row(&sql).await.unwrap();
Ok(Row::new(row.unwrap()))
})
}

pub fn query_iter<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let streamer = this.query_iter(&sql).await.unwrap();
Ok(RowIterator::new(streamer))
})
}

pub fn stream_load<'p>(
&self,
py: Python<'p>,
sql: String,
data: Vec<Vec<String>>,
) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let data = data
.iter()
.map(|v| v.iter().map(|s| s.as_ref()).collect())
.collect();
let ss = this
.stream_load(&sql, data)
.await
.map_err(|e| PyException::new_err(format!("{}", e)))?;
Ok(ServerStats::new(ss))
})
}
}
114 changes: 114 additions & 0 deletions bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::prelude::*;

use crate::types::{ConnectionInfo, Row, RowIterator, ServerStats};

#[pyclass(module = "databend_driver")]
pub struct BlockingDatabendClient(databend_driver::Client);

#[pymethods]
impl BlockingDatabendClient {
#[new]
#[pyo3(signature = (dsn))]
pub fn new(dsn: String) -> PyResult<Self> {
let client = databend_driver::Client::new(dsn);
Ok(Self(client))
}

pub fn get_conn(&self) -> PyResult<BlockingDatabendConnection> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let conn = this.get_conn().await.unwrap();
conn
});
Ok(BlockingDatabendConnection(ret))
}
}

#[pyclass(module = "databend_driver")]
pub struct BlockingDatabendConnection(Box<dyn databend_driver::Connection>);

#[pymethods]
impl BlockingDatabendConnection {
pub fn info(&self) -> PyResult<ConnectionInfo> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let info = this.info().await;
info
});
Ok(ConnectionInfo::new(ret))
}

pub fn version(&self) -> PyResult<String> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let version = this.version().await.unwrap();
version
});
Ok(ret)
}

pub fn exec(&self, sql: String) -> PyResult<i64> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let res = this.exec(&sql).await.unwrap();
res
});
Ok(ret)
}

pub fn query_row(&self, sql: String) -> PyResult<Option<Row>> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let row = this.query_row(&sql).await.unwrap();
row
});
Ok(ret.map(Row::new))
}

pub fn query_iter(&self, sql: String) -> PyResult<RowIterator> {
let this = self.0.clone();
// future_into_py(py, async move {
// let streamer = this.query_iter(&sql).await.unwrap();
// Ok(RowIterator::new(streamer))
// })
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let streamer = this.query_iter(&sql).await.unwrap();
streamer
});
Ok(RowIterator::new(ret))
}

pub fn stream_load(&self, sql: String, data: Vec<Vec<String>>) -> PyResult<ServerStats> {
let this = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let data = data
.iter()
.map(|v| v.iter().map(|s| s.as_ref()).collect())
.collect();
let ss = this.stream_load(&sql, data).await.unwrap();
ss
});
Ok(ServerStats::new(ret))
}
}
Loading