Skip to content

Commit

Permalink
feat(python): expose create to DeltaTable class (#1932)
Browse files Browse the repository at this point in the history
Allows one to create a table without writing. 

(created a new one, I keep messing up these rebases..)

- closes #1892
  • Loading branch information
ion-elgreco authored Dec 2, 2023
1 parent 18c4834 commit 20214a5
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 2 deletions.
10 changes: 10 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def convert_to_deltalake(
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def create_deltalake(
table_uri: str,
schema: pyarrow.Schema,
partition_by: List[str],
mode: str,
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

# Can't implement inheritance (see note in src/schema.rs), so this is next
Expand Down
74 changes: 72 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Generator,
Iterable,
List,
Literal,
Mapping,
NamedTuple,
Optional,
Tuple,
Expand All @@ -37,11 +39,12 @@

from deltalake._internal import DeltaDataChecker as _DeltaDataChecker
from deltalake._internal import RawDeltaTable
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
from deltalake.exceptions import DeltaProtocolError
from deltalake.fs import DeltaStorageHandler
from deltalake.schema import Schema
from deltalake.schema import Schema as DeltaSchema

MAX_SUPPORTED_READER_VERSION = 1
MAX_SUPPORTED_WRITER_VERSION = 2
Expand Down Expand Up @@ -297,6 +300,73 @@ def from_data_catalog(
table_uri=table_uri, version=version, log_buffer_size=log_buffer_size
)

@classmethod
def create(
cls,
table_uri: Union[str, Path],
schema: Union[pyarrow.Schema, DeltaSchema],
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
partition_by: Optional[Union[List[str], str]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> "DeltaTable":
"""`CREATE` or `CREATE_OR_REPLACE` a delta table given a table_uri.
Args:
table_uri: URI of a table
schema: Table schema
mode: How to handle existing data. Default is to error if table already exists.
If 'append', returns not support error if table exists.
If 'overwrite', will `CREATE_OR_REPLACE` table.
If 'ignore', will not do anything if table already exists. Defaults to "error".
partition_by: List of columns to partition the table by.
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
storage_options: options passed to the object store crate.
Returns:
DeltaTable: created delta table
Example:
```python
import pyarrow as pa
from deltalake import DeltaTable
dt = DeltaTable.create(
table_uri="my_local_table",
schema=pa.schema(
[pa.field("foo", pa.string()), pa.field("bar", pa.string())]
),
mode="error",
partition_by="bar",
)
```
"""
if isinstance(schema, DeltaSchema):
schema = schema.to_pyarrow()
if isinstance(partition_by, str):
partition_by = [partition_by]

if isinstance(table_uri, Path):
table_uri = str(table_uri)

_create_deltalake(
table_uri,
schema,
partition_by or [],
mode,
name,
description,
configuration,
storage_options,
)

return cls(table_uri=table_uri, storage_options=storage_options)

def version(self) -> int:
"""
Get the version of the DeltaTable.
Expand Down Expand Up @@ -412,7 +482,7 @@ def load_with_datetime(self, datetime_string: str) -> None:
def table_uri(self) -> str:
return self._table.table_uri()

def schema(self) -> Schema:
def schema(self) -> DeltaSchema:
"""
Get the current schema of the DeltaTable.
Expand Down
46 changes: 46 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,51 @@ fn write_to_deltalake(
Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn create_deltalake(
table_uri: String,
schema: PyArrowType<ArrowSchema>,
partition_by: Vec<String>,
mode: String,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build()
.map_err(PythonError::from)?;

let mode = mode.parse().map_err(PythonError::from)?;
let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let mut builder = DeltaOps(table)
.create()
.with_columns(schema.fields().clone())
.with_save_mode(mode)
.with_partition_columns(partition_by);

if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_comment(description);
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn write_new_deltalake(
Expand Down Expand Up @@ -1325,6 +1370,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(create_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?;
Expand Down
54 changes: 54 additions & 0 deletions python/tests/test_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable
from deltalake.exceptions import DeltaError


def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(
tmp_path,
sample_data.schema,
name="test_name",
description="test_desc",
configuration={"delta.appendOnly": "false", "foo": "bar"},
)

metadata = dt.metadata()

assert metadata.name == "test_name"
assert metadata.description == "test_desc"
assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"}


def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error")
last_action = dt.history(1)[0]

with pytest.raises(DeltaError):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error")

assert last_action["operation"] == "CREATE TABLE"
with pytest.raises(DeltaError):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="append")

dt = DeltaTable.create(tmp_path, sample_data.schema, mode="ignore")
assert dt.version() == 0

dt = DeltaTable.create(tmp_path, sample_data.schema, mode="overwrite")
assert dt.version() == 1

last_action = dt.history(1)[0]

assert last_action["operation"] == "CREATE OR REPLACE TABLE"


def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(
tmp_path,
sample_data.schema,
)

assert dt.schema().to_pyarrow() == sample_data.schema

0 comments on commit 20214a5

Please sign in to comment.