Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for iceberg table with snowflake catalog #539

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Source code is also available at:
- Fixed SAWarning when registering functions with existing name in default namespace
- Update options to be defined in key arguments instead of arguments.
- Add support for refresh_mode option in DynamicTable
- Add support for iceberg table with Snowflake Catalog
- Fix cluster by option to support explicit expressions

- v1.6.1(July 9, 2024)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ This example shows how to create a table with two columns, `id` and `name`, as t
t = Table('myuser', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
snowflake_clusterby=['id', 'name'], ...
snowflake_clusterby=['id', 'name', text('id > 5')], ...
)
metadata.create_all(engine)
```
Expand Down
48 changes: 33 additions & 15 deletions src/snowflake/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
else:
import importlib.metadata as importlib_metadata

from sqlalchemy.types import (
from sqlalchemy.types import ( # noqa
BIGINT,
BINARY,
BOOLEAN,
Expand All @@ -27,8 +27,8 @@
VARCHAR,
)

from . import base, snowdialect
from .custom_commands import (
from . import base, snowdialect # noqa
from .custom_commands import ( # noqa
AWSBucket,
AzureContainer,
CopyFormatter,
Expand All @@ -41,7 +41,7 @@
MergeInto,
PARQUETFormatter,
)
from .custom_types import (
from .custom_types import ( # noqa
ARRAY,
BYTEINT,
CHARACTER,
Expand All @@ -61,9 +61,15 @@
VARBINARY,
VARIANT,
)
from .sql.custom_schema import DynamicTable, HybridTable
from .sql.custom_schema.options import (
from .sql.custom_schema import ( # noqa
DynamicTable,
HybridTable,
IcebergTable,
SnowflakeTable,
)
from .sql.custom_schema.options import ( # noqa
AsQueryOption,
ClusterByOption,
IdentifierOption,
KeywordOption,
LiteralOption,
Expand All @@ -72,14 +78,13 @@
TargetLagOption,
TimeUnit,
)
from .util import _url as URL
from .util import _url as URL # noqa

base.dialect = dialect = snowdialect.dialect

__version__ = importlib_metadata.version("snowflake-sqlalchemy")

__all__ = (
# Custom Types
_custom_types = (
"BIGINT",
"BINARY",
"BOOLEAN",
Expand Down Expand Up @@ -114,7 +119,9 @@
"TINYINT",
"VARBINARY",
"VARIANT",
# Custom Commands
)

_custom_commands = (
"MergeInto",
"CSVFormatter",
"JSONFormatter",
Expand All @@ -126,17 +133,28 @@
"ExternalStage",
"CreateStage",
"CreateFileFormat",
# Custom Tables
"HybridTable",
"DynamicTable",
# Custom Table Options
)

_custom_tables = ("HybridTable", "DynamicTable", "IcebergTable", "SnowflakeTable")

_custom_table_options = (
"AsQueryOption",
"TargetLagOption",
"LiteralOption",
"IdentifierOption",
"KeywordOption",
# Enums
"ClusterByOption",
)

_enums = (
"TimeUnit",
"TableOptionKey",
"SnowflakeKeyword",
)
__all__ = (
*_custom_types,
*_custom_commands,
*_custom_tables,
*_custom_table_options,
*_enums,
)
13 changes: 10 additions & 3 deletions src/snowflake/sqlalchemy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,15 @@ def handle_cluster_by(self, table):
... metadata,
... sa.Column('id', sa.Integer, primary_key=True),
... sa.Column('name', sa.String),
... snowflake_clusterby=['id', 'name']
... snowflake_clusterby=['id', 'name', text("id > 5")]
... )
>>> print(CreateTable(user).compile(engine))
<BLANKLINE>
CREATE TABLE "user" (
id INTEGER NOT NULL AUTOINCREMENT,
name VARCHAR,
PRIMARY KEY (id)
) CLUSTER BY (id, name)
) CLUSTER BY (id, name, id > 5)
<BLANKLINE>
<BLANKLINE>
"""
Expand All @@ -925,7 +925,14 @@ def handle_cluster_by(self, table):
cluster = info.get("clusterby")
if cluster:
text += " CLUSTER BY ({})".format(
", ".join(self.denormalize_column_name(key) for key in cluster)
", ".join(
(
self.denormalize_column_name(key)
if isinstance(key, str)
else str(key)
)
for key in cluster
)
)
return text

Expand Down
4 changes: 3 additions & 1 deletion src/snowflake/sqlalchemy/sql/custom_schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
#
from .dynamic_table import DynamicTable
from .hybrid_table import HybridTable
from .iceberg_table import IcebergTable
from .snowflake_table import SnowflakeTable

__all__ = ["DynamicTable", "HybridTable"]
__all__ = ["DynamicTable", "HybridTable", "IcebergTable", "SnowflakeTable"]
37 changes: 37 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/clustered_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from typing import Any, Optional

from sqlalchemy.sql.schema import MetaData, SchemaItem

from .custom_table_base import CustomTableBase
from .options.as_query_option import AsQueryOption
from .options.cluster_by_option import ClusterByOption, ClusterByOptionType
from .options.table_option import TableOptionKey


class ClusteredTableBase(CustomTableBase):

@property
def cluster_by(self) -> Optional[AsQueryOption]:
return self._get_dialect_option(TableOptionKey.CLUSTER_BY)

def __init__(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
cluster_by: ClusterByOptionType = None,
**kw: Any,
) -> None:
if kw.get("_no_init", True):
return

options = [
ClusterByOption.create(cluster_by),
]

kw.update(self._as_dialect_options(options))
super().__init__(name, metadata, *args, **kw)
6 changes: 3 additions & 3 deletions src/snowflake/sqlalchemy/sql/custom_schema/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
IdentifierOption,
IdentifierOptionType,
KeywordOptionType,
LiteralOption,
TableOptionKey,
TargetLagOption,
TargetLagOptionType,
Expand Down Expand Up @@ -45,7 +44,7 @@ class DynamicTable(TableFromQueryBase):
as_query="SELECT id, name from test_table_1;"
)

Example using full options:
Example using explicit options:
DynamicTable(
"dynamic_test_table_1",
metadata,
Expand All @@ -67,7 +66,7 @@ class DynamicTable(TableFromQueryBase):
]

@property
def warehouse(self) -> typing.Optional[LiteralOption]:
def warehouse(self) -> typing.Optional[IdentifierOption]:
return self._get_dialect_option(TableOptionKey.WAREHOUSE)

@property
Expand Down Expand Up @@ -112,6 +111,7 @@ def __repr__(self) -> str:
+ [repr(x) for x in self.columns]
+ [repr(self.target_lag)]
+ [repr(self.warehouse)]
+ [repr(self.cluster_by)]
+ [repr(self.as_query)]
+ [f"{k}={repr(getattr(self, k))}" for k in ["schema"]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class HybridTable(CustomTableBase):
The `HybridTable` class allows for the creation and querying of OLTP Snowflake Tables .

While it does not support reflection at this time, it provides a flexible
interface for creating dynamic tables and management.
interface for creating hybrid tables and management.

For further information on this clause, please refer to: https://docs.snowflake.com/en/sql-reference/sql/create-hybrid-table

Expand Down
101 changes: 101 additions & 0 deletions src/snowflake/sqlalchemy/sql/custom_schema/iceberg_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

import typing
from typing import Any

from sqlalchemy.sql.schema import MetaData, SchemaItem

from .custom_table_prefix import CustomTablePrefix
from .options import LiteralOption, LiteralOptionType, TableOptionKey
from .table_from_query import TableFromQueryBase


class IcebergTable(TableFromQueryBase):
"""
A class representing an iceberg table with configurable options and settings.

While it does not support reflection at this time, it provides a flexible
interface for creating iceberg tables and management.

For further information on this clause, please refer to: https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table

Example using option values:

IcebergTable(
"dynamic_test_table_1",
metadata,
Column("id", Integer),
Column("name", String),
external_volume='my_external_volume',
base_location='my_iceberg_table'"
)

Example using explicit options:
DynamicTable(
"dynamic_test_table_1",
metadata,
Column("id", Integer),
Column("name", String),
external_volume=LiteralOption('my_external_volume')
base_location=LiteralOption('my_iceberg_table')
)
"""

__table_prefixes__ = [CustomTablePrefix.ICEBERG]

@property
def external_volume(self) -> typing.Optional[LiteralOption]:
return self._get_dialect_option(TableOptionKey.EXTERNAL_VOLUME)

@property
def base_location(self) -> typing.Optional[LiteralOption]:
return self._get_dialect_option(TableOptionKey.BASE_LOCATION)

@property
def catalog(self) -> typing.Optional[LiteralOption]:
return self._get_dialect_option(TableOptionKey.CATALOG)

def __init__(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
external_volume: LiteralOptionType = None,
base_location: LiteralOptionType = None,
**kw: Any,
) -> None:
if kw.get("_no_init", True):
return

options = [
LiteralOption.create(TableOptionKey.EXTERNAL_VOLUME, external_volume),
LiteralOption.create(TableOptionKey.BASE_LOCATION, base_location),
LiteralOption.create(TableOptionKey.CATALOG, "SNOWFLAKE"),
]

kw.update(self._as_dialect_options(options))
super().__init__(name, metadata, *args, **kw)

def _init(
self,
name: str,
metadata: MetaData,
*args: SchemaItem,
**kw: Any,
) -> None:
self.__init__(name, metadata, *args, _no_init=False, **kw)

def __repr__(self) -> str:
return "IcebergTable(%s)" % ", ".join(
[repr(self.name)]
+ [repr(self.metadata)]
+ [repr(x) for x in self.columns]
+ [repr(self.external_volume)]
+ [repr(self.base_location)]
+ [repr(self.catalog)]
+ [repr(self.cluster_by)]
+ [repr(self.as_query)]
+ [f"{k}={repr(getattr(self, k))}" for k in ["schema"]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from .as_query_option import AsQueryOption, AsQueryOptionType
from .cluster_by_option import ClusterByOption, ClusterByOptionType
from .identifier_option import IdentifierOption, IdentifierOptionType
from .keyword_option import KeywordOption, KeywordOptionType
from .keywords import SnowflakeKeyword
Expand All @@ -17,6 +18,7 @@
"KeywordOption",
"AsQueryOption",
"TargetLagOption",
"ClusterByOption",
# Enums
"TimeUnit",
"SnowflakeKeyword",
Expand All @@ -27,4 +29,5 @@
"AsQueryOptionType",
"TargetLagOptionType",
"KeywordOptionType",
"ClusterByOptionType",
]
Loading
Loading