Skip to content

Commit

Permalink
Don't make an async runtime within the API
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Sep 28, 2022
1 parent c91d58e commit 6ef1086
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
11 changes: 8 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ fn write_new_deltalake(
#[pyclass(name = "DeltaDataChecker", text_signature = "(invariants)")]
struct PyDeltaDataChecker {
inner: DeltaDataChecker,
rt: tokio::runtime::Runtime,
}

#[pymethods]
Expand All @@ -605,13 +606,17 @@ impl PyDeltaDataChecker {
.collect();
Self {
inner: DeltaDataChecker::new(invariants),
rt: tokio::runtime::Runtime::new().unwrap(),
}
}

fn check_batch(&self, batch: RecordBatch) -> PyResult<()> {
self.inner
.check_batch(&batch)
.map_err(PyDeltaTableError::from_raw)
self.rt.block_on(async {
self.inner
.check_batch(&batch)
.await
.map_err(PyDeltaTableError::from_raw)
})
}
}

Expand Down
12 changes: 4 additions & 8 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option<bool>
pub struct DeltaDataChecker {
invariants: Vec<Invariant>,
ctx: SessionContext,
rt: tokio::runtime::Runtime,
}

impl DeltaDataChecker {
Expand All @@ -619,20 +618,19 @@ impl DeltaDataChecker {
Self {
invariants,
ctx: SessionContext::new(),
rt: tokio::runtime::Runtime::new().unwrap(),
}
}

/// Check that a record batch conforms to table's invariants.
///
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.enforce_invariants(record_batch)
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.enforce_invariants(record_batch).await
// TODO: for support for Protocol V3, check constraints
}

fn enforce_invariants(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
async fn enforce_invariants(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
// Invariants are deprecated, so let's not pay the overhead for any of this
// if we can avoid it.
if self.invariants.is_empty() {
Expand All @@ -656,9 +654,7 @@ impl DeltaDataChecker {
invariant.field_name, invariant.invariant_sql
);

let dfs: Vec<RecordBatch> = self
.rt
.block_on(async { self.ctx.sql(&sql).await?.collect().await })?;
let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
if !dfs.is_empty() && dfs[0].num_rows() > 0 {
let value = format!("{:?}", dfs[0].column(0));
let msg = format!(
Expand Down

0 comments on commit 6ef1086

Please sign in to comment.