Skip to content

Commit

Permalink
feat(bindings/python): support pickle [de]serialization for Operator (#…
Browse files Browse the repository at this point in the history
…5324)

* feat(bindings/python): support pickle [de]serialization for Operator

* feat(core): add new cap shared

* add a missing file

* refine tests

* fix C binding

* fix java binding

* refine tests

* fix nodejs binding
  • Loading branch information
TennyZhuang authored Nov 16, 2024
1 parent a2db7f2 commit 73bbb85
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 39 deletions.
1 change: 1 addition & 0 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ impl AsyncFile {
#[pymethods]
impl AsyncFile {
/// Read and return at most size bytes, or if size is not given, until EOF.
#[pyo3(signature = (size=None))]
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

Expand Down
132 changes: 93 additions & 39 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::Duration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
use pyo3::types::PyTuple;
use pyo3_async_runtimes::tokio::future_into_py;

use crate::*;
Expand All @@ -45,7 +46,11 @@ fn build_operator(
///
/// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct Operator(ocore::BlockingOperator);
pub struct Operator {
core: ocore::BlockingOperator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}

#[pymethods]
impl Operator {
Expand All @@ -65,18 +70,26 @@ impl Operator {
})
.unwrap_or_default();

Ok(Operator(build_operator(scheme, map)?.blocking()))
Ok(Operator {
core: build_operator(scheme.clone(), map.clone())?.blocking(),
__scheme: scheme,
__map: map,
})
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone().into());
Ok(Self(op.blocking()))
let op = layer.0.layer(self.core.clone().into());
Ok(Self {
core: op.blocking(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

/// Open a file-like reader for the given path.
pub fn open(&self, path: String, mode: String) -> PyResult<File> {
let this = self.0.clone();
let this = self.core.clone();
if mode == "rb" {
let r = this
.reader(&path)
Expand All @@ -96,15 +109,15 @@ impl Operator {

/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<Bound<PyAny>> {
let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec();
let buffer = self.core.read(path).map_err(format_pyerr)?.to_vec();
Buffer::new(buffer).into_bytes_ref(py)
}

/// Write bytes into given path.
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<WriteOptions>) -> PyResult<()> {
let kwargs = kwargs.unwrap_or_default();
let mut write = self.0.write_with(path, bs).append(kwargs.append);
let mut write = self.core.write_with(path, bs).append(kwargs.append);
if let Some(chunk) = kwargs.chunk {
write = write.chunk(chunk);
}
Expand All @@ -123,22 +136,25 @@ impl Operator {

/// Get current path's metadata **without cache** directly.
pub fn stat(&self, path: &str) -> PyResult<Metadata> {
self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
self.core
.stat(path)
.map_err(format_pyerr)
.map(Metadata::new)
}

/// Copy source to target.
pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
self.0.copy(source, target).map_err(format_pyerr)
self.core.copy(source, target).map_err(format_pyerr)
}

/// Rename filename.
pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
self.0.rename(source, target).map_err(format_pyerr)
self.core.rename(source, target).map_err(format_pyerr)
}

/// Remove all file
pub fn remove_all(&self, path: &str) -> PyResult<()> {
self.0.remove_all(path).map_err(format_pyerr)
self.core.remove_all(path).map_err(format_pyerr)
}

/// Create a dir at given path.
Expand All @@ -154,7 +170,7 @@ impl Operator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir(&self, path: &str) -> PyResult<()> {
self.0.create_dir(path).map_err(format_pyerr)
self.core.create_dir(path).map_err(format_pyerr)
}

/// Delete given path.
Expand All @@ -163,19 +179,19 @@ impl Operator {
///
/// - Delete not existing error won't return errors.
pub fn delete(&self, path: &str) -> PyResult<()> {
self.0.delete(path).map_err(format_pyerr)
self.core.delete(path).map_err(format_pyerr)
}

/// List current dir path.
pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
let l = self.0.lister(path).map_err(format_pyerr)?;
let l = self.core.lister(path).map_err(format_pyerr)?;
Ok(BlockingLister::new(l))
}

/// List dir in flat way.
pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
let l = self
.0
.core
.lister_with(path)
.recursive(true)
.call()
Expand All @@ -184,15 +200,21 @@ impl Operator {
}

pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(self.0.info().full_capability()))
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}

pub fn to_async_operator(&self) -> PyResult<AsyncOperator> {
Ok(AsyncOperator(self.0.clone().into()))
Ok(AsyncOperator {
core: self.core.clone().into(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

fn __repr__(&self) -> String {
let info = self.0.info();
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root())
Expand All @@ -204,13 +226,24 @@ impl Operator {
)
}
}

fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string().to_object(py)];
let args = PyTuple::new_bound(py, args);
let kwargs = self.__map.clone().into_py(py);
Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py))
}
}

/// `AsyncOperator` is the entry for all public async APIs
///
/// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct AsyncOperator(ocore::Operator);
pub struct AsyncOperator {
core: ocore::Operator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}

#[pymethods]
impl AsyncOperator {
Expand All @@ -230,13 +263,21 @@ impl AsyncOperator {
})
.unwrap_or_default();

Ok(AsyncOperator(build_operator(scheme, map)?))
Ok(AsyncOperator {
core: build_operator(scheme.clone(), map.clone())?.into(),
__scheme: scheme,
__map: map,
})
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone());
Ok(Self(op))
let op = layer.0.layer(self.core.clone());
Ok(Self {
core: op,
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

/// Open a file-like reader for the given path.
Expand All @@ -246,7 +287,7 @@ impl AsyncOperator {
path: String,
mode: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();

future_into_py(py, async move {
if mode == "rb" {
Expand All @@ -271,7 +312,7 @@ impl AsyncOperator {

/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?.to_vec();
Python::with_gil(|py| Buffer::new(res).into_bytes(py))
Expand All @@ -288,7 +329,7 @@ impl AsyncOperator {
kwargs: Option<WriteOptions>,
) -> PyResult<Bound<PyAny>> {
let kwargs = kwargs.unwrap_or_default();
let this = self.0.clone();
let this = self.core.clone();
let bs = bs.as_bytes().to_vec();
future_into_py(py, async move {
let mut write = this.write_with(&path, bs).append(kwargs.append);
Expand All @@ -310,7 +351,7 @@ impl AsyncOperator {

/// Get current path's metadata **without cache** directly.
pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res: Metadata = this
.stat(&path)
Expand All @@ -329,7 +370,7 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.copy(&source, &target).await.map_err(format_pyerr)
})
Expand All @@ -342,15 +383,15 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.rename(&source, &target).await.map_err(format_pyerr)
})
}

/// Remove all file
pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.remove_all(&path).await.map_err(format_pyerr)
})
Expand All @@ -369,7 +410,7 @@ impl AsyncOperator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.create_dir(&path).await.map_err(format_pyerr)
})
Expand All @@ -381,7 +422,7 @@ impl AsyncOperator {
///
/// - Delete not existing error won't return errors.
pub fn delete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(
py,
async move { this.delete(&path).await.map_err(format_pyerr) },
Expand All @@ -390,7 +431,7 @@ impl AsyncOperator {

/// List current dir path.
pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let lister = this.lister(&path).await.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Expand All @@ -400,7 +441,7 @@ impl AsyncOperator {

/// List dir in flat way.
pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let lister = this
.lister_with(&path)
Expand All @@ -419,7 +460,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_stat(&path, Duration::from_secs(expire_second))
Expand All @@ -438,7 +479,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_read(&path, Duration::from_secs(expire_second))
Expand All @@ -457,7 +498,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_write(&path, Duration::from_secs(expire_second))
Expand All @@ -470,15 +511,21 @@ impl AsyncOperator {
}

pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(self.0.info().full_capability()))
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}

pub fn to_operator(&self) -> PyResult<Operator> {
Ok(Operator(self.0.clone().blocking()))
Ok(Operator {
core: self.core.clone().blocking(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

fn __repr__(&self) -> String {
let info = self.0.info();
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!(
Expand All @@ -494,6 +541,13 @@ impl AsyncOperator {
)
}
}

fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string().to_object(py)];
let args = PyTuple::new_bound(py, args);
let kwargs = self.__map.clone().into_py(py);
Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py))
}
}

#[pyclass(module = "opendal")]
Expand Down
Loading

0 comments on commit 73bbb85

Please sign in to comment.