Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Apr 16, 2024
1 parent f1fa3bd commit eacb74b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 40 deletions.
4 changes: 2 additions & 2 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
Expand All @@ -9,8 +11,6 @@ use pyo3::types::{IntoPyDict, PyBytes};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;
use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;

Expand Down
60 changes: 25 additions & 35 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::filesystem::FsConfig;
use crate::schema::schema_to_pyobject;
use crate::utils::rt;


#[derive(FromPyObject)]
enum PartitionFilterValue<'a> {
Single(&'a str),
Expand Down Expand Up @@ -114,9 +113,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;
}

let table = rt()
.block_on(builder.load())
.map_err(PythonError::from)?;
let table = rt().block_on(builder.load()).map_err(PythonError::from)?;
Ok(RawDeltaTable {
_table: table,
_config: FsConfig {
Expand Down Expand Up @@ -987,26 +984,25 @@ impl RawDeltaTable {
predicate: None,
};

rt()
.block_on(
CommitBuilder::from(
CommitProperties::default().with_metadata(
custom_metadata
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into())),
),
)
.with_actions(actions)
.build(
Some(self._table.snapshot().map_err(PythonError::from)?),
self._table.log_store(),
operation,
)
.map_err(|err| PythonError::from(DeltaTableError::from(err)))?
.into_future(),
rt().block_on(
CommitBuilder::from(
CommitProperties::default().with_metadata(
custom_metadata
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into())),
),
)
.map_err(PythonError::from)?;
.with_actions(actions)
.build(
Some(self._table.snapshot().map_err(PythonError::from)?),
self._table.log_store(),
operation,
)
.map_err(|err| PythonError::from(DeltaTableError::from(err)))?
.into_future(),
)
.map_err(PythonError::from)?;

Ok(())
}
Expand All @@ -1021,16 +1017,14 @@ impl RawDeltaTable {
}

pub fn create_checkpoint(&self) -> PyResult<()> {
rt()
.block_on(create_checkpoint(&self._table))
rt().block_on(create_checkpoint(&self._table))
.map_err(PythonError::from)?;

Ok(())
}

pub fn cleanup_metadata(&self) -> PyResult<()> {
rt()
.block_on(cleanup_metadata(&self._table))
rt().block_on(cleanup_metadata(&self._table))
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1464,8 +1458,7 @@ fn write_to_deltalake(
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

rt()
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1517,8 +1510,7 @@ fn create_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1569,8 +1561,7 @@ fn write_new_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1622,8 +1613,7 @@ fn convert_to_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;
Ok(())
}
Expand Down
4 changes: 1 addition & 3 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use tokio::runtime::Runtime;
#[inline]
pub fn rt() -> &'static Runtime {
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
TOKIO_RT.get_or_init(|| {
Runtime::new().expect("Failed to create a tokio runtime.")
})
TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
}

/// walk the "directory" tree along common prefixes in object store
Expand Down

0 comments on commit eacb74b

Please sign in to comment.