diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index e1f5288b81..228488d91a 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -163,6 +163,16 @@ def convert_to_deltalake( storage_options: Optional[Dict[str, str]], custom_metadata: Optional[Dict[str, str]], ) -> None: ... +def create_deltalake( + table_uri: str, + schema: pyarrow.Schema, + partition_by: List[str], + mode: str, + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], +) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next diff --git a/python/deltalake/table.py b/python/deltalake/table.py index adf3ca92af..e7b7613599 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -12,6 +12,8 @@ Generator, Iterable, List, + Literal, + Mapping, NamedTuple, Optional, Tuple, @@ -37,11 +39,12 @@ from deltalake._internal import DeltaDataChecker as _DeltaDataChecker from deltalake._internal import RawDeltaTable +from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog from deltalake.exceptions import DeltaProtocolError from deltalake.fs import DeltaStorageHandler -from deltalake.schema import Schema +from deltalake.schema import Schema as DeltaSchema MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 @@ -297,6 +300,73 @@ def from_data_catalog( table_uri=table_uri, version=version, log_buffer_size=log_buffer_size ) + @classmethod + def create( + cls, + table_uri: Union[str, Path], + schema: Union[pyarrow.Schema, DeltaSchema], + mode: Literal["error", "append", "overwrite", "ignore"] = "error", + partition_by: Optional[Union[List[str], str]] = None, + name: Optional[str] = None, + description: Optional[str] = None, + configuration: Optional[Mapping[str, Optional[str]]] = None, + storage_options: Optional[Dict[str, str]] = None, + ) -> "DeltaTable": + """`CREATE` or `CREATE_OR_REPLACE` a delta table given a table_uri. + + Args: + table_uri: URI of a table + schema: Table schema + mode: How to handle existing data. Default is to error if table already exists. + If 'append', returns not support error if table exists. + If 'overwrite', will `CREATE_OR_REPLACE` table. + If 'ignore', will not do anything if table already exists. Defaults to "error". + partition_by: List of columns to partition the table by. + name: User-provided identifier for this table. + description: User-provided description for this table. + configuration: A map containing configuration options for the metadata action. + storage_options: options passed to the object store crate. + + Returns: + DeltaTable: created delta table + + Example: + ```python + import pyarrow as pa + + from deltalake import DeltaTable + + dt = DeltaTable.create( + table_uri="my_local_table", + schema=pa.schema( + [pa.field("foo", pa.string()), pa.field("bar", pa.string())] + ), + mode="error", + partition_by="bar", + ) + ``` + """ + if isinstance(schema, DeltaSchema): + schema = schema.to_pyarrow() + if isinstance(partition_by, str): + partition_by = [partition_by] + + if isinstance(table_uri, Path): + table_uri = str(table_uri) + + _create_deltalake( + table_uri, + schema, + partition_by or [], + mode, + name, + description, + configuration, + storage_options, + ) + + return cls(table_uri=table_uri, storage_options=storage_options) + def version(self) -> int: """ Get the version of the DeltaTable. @@ -412,7 +482,7 @@ def load_with_datetime(self, datetime_string: str) -> None: def table_uri(self) -> str: return self._table.table_uri() - def schema(self) -> Schema: + def schema(self) -> DeltaSchema: """ Get the current schema of the DeltaTable. diff --git a/python/src/lib.rs b/python/src/lib.rs index e7d5ec818d..9da42f1170 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1184,6 +1184,51 @@ fn write_to_deltalake( Ok(()) } +#[pyfunction] +#[allow(clippy::too_many_arguments)] +fn create_deltalake( + table_uri: String, + schema: PyArrowType, + partition_by: Vec, + mode: String, + name: Option, + description: Option, + configuration: Option>>, + storage_options: Option>, +) -> PyResult<()> { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; + + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_save_mode(mode) + .with_partition_columns(partition_by); + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_comment(description); + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_new_deltalake( @@ -1325,6 +1370,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(create_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; diff --git a/python/tests/test_create.py b/python/tests/test_create.py new file mode 100644 index 0000000000..a618d741a1 --- /dev/null +++ b/python/tests/test_create.py @@ -0,0 +1,54 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable +from deltalake.exceptions import DeltaError + + +def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + name="test_name", + description="test_desc", + configuration={"delta.appendOnly": "false", "foo": "bar"}, + ) + + metadata = dt.metadata() + + assert metadata.name == "test_name" + assert metadata.description == "test_desc" + assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} + + +def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + last_action = dt.history(1)[0] + + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + + assert last_action["operation"] == "CREATE TABLE" + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="append") + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="ignore") + assert dt.version() == 0 + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="overwrite") + assert dt.version() == 1 + + last_action = dt.history(1)[0] + + assert last_action["operation"] == "CREATE OR REPLACE TABLE" + + +def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + ) + + assert dt.schema().to_pyarrow() == sample_data.schema