Skip to content

Commit

Permalink
Support Spatial Types for PostGIS (dlt-hub#1927)
Browse files Browse the repository at this point in the history
* Add dependencies

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add shapely dependency

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Move sample geodata to correct folder

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Make smaller

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Enhance PostgresTableBuilder test suite with geometry type handling.

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add tests

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add geometry columns with default SRID 4326.

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* resource can't serialize shapely objects

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Expand geom test

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Comments

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Update lock file

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* schema

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* [fix](database): remove unused hex validation method

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Create custom insert job for geom types

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove hanging client parameter

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add a TODO comment to address the issue in the splitting logic

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove unnecessary init override

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add debugging points

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* [test](database): add tests for geometry parsing in Postgres

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Correct row parsing in Postgres destination

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Yield from supermethod

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add control flow for geom

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add test

* refactor geo parsing

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* [fix](test): correct schema name in PostGIS geometry test

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove stale test

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove geopandas test until resolution

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add docs and raise on malformed values

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add postgis dependency to ci

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* fix postgis image repo

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add postgis to dbt runner

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Change snippet to py instead of python

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* add postgis

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove unused geodata file

* Remove unnecessary INSERT class

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Add WKB format handling

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Packaging

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Move import to local

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Comment

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* postgis docs

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Update lockfile

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* fix(deps): remove shapely dependency from postgis extra

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* format

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* feat(postgres): add support for CSV loading of geometry columns

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* Remove wkb examples in docs

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

* format

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>

---------

Signed-off-by: Marcel Coetzee <marcel@mooncoon.com>
  • Loading branch information
Pipboyguy authored Nov 30, 2024
1 parent eefe77b commit 09914a3
Show file tree
Hide file tree
Showing 13 changed files with 547 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt
run: poetry install --no-interaction -E postgres -E postgis --with sentry-sdk,dbt

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:

# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
Expand Down
9 changes: 8 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,14 @@ def row_tuples_to_arrow(
" extracting an SQL VIEW that selects with cast."
)
json_str_array = pa.array(
[None if s is None else json.dumps(s) if not issubclass(type(s), set) else json.dumps(list(s)) for s in columnar_known_types[field.name]]
[
(
None
if s is None
else json.dumps(s) if not issubclass(type(s), set) else json.dumps(list(s))
)
for s in columnar_known_types[field.name]
]
)
columnar_known_types[field.name] = json_str_array

Expand Down
22 changes: 15 additions & 7 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import typing as t

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.configuration import CsvFormatConfiguration
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_postgres_literal
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema.typing import TColumnSchema, TColumnType
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.type_mapping import TypeMapperImpl
from dlt.destinations.impl.postgres.configuration import (
PostgresCredentials,
PostgresClientConfiguration,
)
from dlt.destinations.impl.postgres.postgres_adapter import GEOMETRY_HINT, SRID_HINT
from dlt.destinations.type_mapping import TypeMapperImpl

if t.TYPE_CHECKING:
from dlt.destinations.impl.postgres.postgres import PostgresClient
Expand Down Expand Up @@ -55,6 +55,7 @@ class PostgresTypeMapper(TypeMapperImpl):
"character varying": "text",
"smallint": "bigint",
"integer": "bigint",
"geometry": "text",
}

def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
Expand Down Expand Up @@ -108,11 +109,18 @@ def to_db_datetime_type(
def from_destination_type(
self, db_type: str, precision: t.Optional[int] = None, scale: t.Optional[int] = None
) -> TColumnType:
if db_type == "numeric":
if (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
if db_type == "numeric" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
if db_type.startswith("geometry"):
return dict(data_type="text")
return super().from_destination_type(db_type, precision, scale)

def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
if column.get(GEOMETRY_HINT):
srid = column.get(SRID_HINT, 4326)
return f"geometry(Geometry, {srid})"
return super().to_destination_type(column, table)


class postgres(Destination[PostgresClientConfiguration, "PostgresClient"]):
spec = PostgresClientConfiguration
Expand Down
54 changes: 22 additions & 32 deletions dlt/destinations/impl/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,26 @@

from dlt.common import logger
from dlt.common.data_writers.configuration import CsvFormatConfiguration
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
DestinationInvalidFileFormat,
DestinationTerminalException,
)
from dlt.common.destination.reference import (
HasFollowupJobs,
PreparedTableSchema,
RunnableLoadJob,
FollowupJobRequest,
LoadJob,
TLoadJobState,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.schema.typing import TColumnType, TTableFormat
from dlt.common.schema.typing import TColumnType
from dlt.common.schema.utils import is_nullable_column
from dlt.common.storages.file_storage import FileStorage

from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient
from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams

HINT_TO_POSTGRES_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"}

Expand All @@ -43,15 +39,16 @@ def generate_sql(
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE IF EXISTS {table_name};")
# moving staging table to destination schema
sql.append(
f"ALTER TABLE {staging_table_name} SET SCHEMA"
f" {sql_client.fully_qualified_dataset_name()};"
sql.extend(
(
f"DROP TABLE IF EXISTS {table_name};",
(
f"ALTER TABLE {staging_table_name} SET SCHEMA"
f" {sql_client.fully_qualified_dataset_name()};"
),
f"CREATE TABLE {staging_table_name} (like {table_name} including all);",
)
)
# recreate staging table
sql.append(f"CREATE TABLE {staging_table_name} (like {table_name} including all);")
return sql


Expand Down Expand Up @@ -111,8 +108,7 @@ def run(self) -> None:
split_columns.append(norm_col)
if norm_col in split_headers and is_nullable_column(col):
split_null_headers.append(norm_col)
split_unknown_headers = set(split_headers).difference(split_columns)
if split_unknown_headers:
if split_unknown_headers := set(split_headers).difference(split_columns):
raise DestinationInvalidFileFormat(
"postgres",
"csv",
Expand All @@ -130,15 +126,8 @@ def run(self) -> None:

qualified_table_name = sql_client.make_qualified_table_name(table_name)
copy_sql = (
"COPY %s (%s) FROM STDIN WITH (FORMAT CSV, DELIMITER '%s', NULL '',"
" %s ENCODING '%s')"
% (
qualified_table_name,
headers,
sep,
null_headers,
csv_format.encoding,
)
f"COPY {qualified_table_name} ({headers}) FROM STDIN WITH (FORMAT CSV, DELIMITER"
f" '{sep}', NULL '', {null_headers} ENCODING '{csv_format.encoding}')"
)
with sql_client.begin_transaction():
with sql_client.native_connection.cursor() as cursor:
Expand Down Expand Up @@ -173,15 +162,16 @@ def create_load_job(
return job

def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str:
hints_str = " ".join(
hints_ = " ".join(
self.active_hints.get(h, "")
for h in self.active_hints.keys()
if c.get(h, False) is True
)
column_name = self.sql_client.escape_column_name(c["name"])
return (
f"{column_name} {self.type_mapper.to_destination_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
)
nullability = self._gen_not_null(c.get("nullable", True))
column_type = self.type_mapper.to_destination_type(c, table)

return f"{column_name} {column_type} {hints_} {nullability}"

def _create_replace_followup_jobs(
self, table_chain: Sequence[PreparedTableSchema]
Expand Down
63 changes: 63 additions & 0 deletions dlt/destinations/impl/postgres/postgres_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Any, Optional

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource

GEOMETRY_HINT = "x-postgres-geometry"
SRID_HINT = "x-postgres-srid"


def postgres_adapter(
data: Any,
geometry: TColumnNames = None,
srid: Optional[int] = 4326,
) -> DltResource:
"""Prepares data for the postgres destination by specifying which columns should
be cast to PostGIS geometry types.
Args:
data (Any): The data to be transformed. It can be raw data or an instance
of DltResource. If raw data, the function wraps it into a DltResource
object.
geometry (TColumnNames, optional): Specify columns to cast to geometries.
It can be a single column name as a string, or a list of column names.
srid (int, optional): The Spatial Reference System Identifier (SRID) to be
used for the geometry columns. If not provided, SRID 4326 will be used.
Returns:
DltResource: A resource with applied postgres-specific hints.
Raises:
ValueError: If input for `geometry` is invalid, or if no geometry columns are specified.
Examples:
>>> data = [{"town": "Null Island", "loc": "POINT(0 0)"}]
>>> postgres_adapter(data, geometry="loc", srid=4326)
[DltResource with hints applied]
"""
resource = get_resource_for_adapter(data)

column_hints: TTableSchemaColumns = {}

if geometry:
if isinstance(geometry, str):
geometry = [geometry]
if not isinstance(geometry, list):
raise ValueError(
"'geometry' must be a list of column names or a single column name as a string."
)

for column_name in geometry:
column_hints[column_name] = {
"name": column_name,
GEOMETRY_HINT: True, # type: ignore[misc]
}
if srid is not None:
column_hints[column_name][SRID_HINT] = srid # type: ignore

if not column_hints:
raise ValueError("A value for 'geometry' must be specified.")
else:
resource.apply_hints(columns=column_hints)
return resource
52 changes: 51 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,57 @@ In the example above, `arrow_table` will be converted to CSV with **pyarrow** an
## Supported column hints
`postgres` will create unique indexes for all columns with `unique` hints. This behavior **may be disabled**.

### Table and column identifiers
### Spatial Types

To enable GIS capabilities in your Postgres destination, use the `x-postgres-geometry` and `x-postgres-srid` hints for columns containing geometric data.
The `postgres_adapter` facilitates applying these hints conveniently, with a default SRID of `4326`.

**Supported Geometry Types:**

- WKT (Well-Known Text)
- Hex Representation

If you have geometry data in binary format, you will need to convert it to hexadecimal representation before loading.

**Example:** Using `postgres_adapter` with Different Geometry Types

```py
from dlt.destinations.impl.postgres.postgres_adapter import postgres_adapter

# Sample data with various geometry types
data_wkt = [
{"type": "Point_wkt", "geom": "POINT (1 1)"},
{"type": "Point_wkt", "geom": "Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)])"},
]

data_wkb_hex = [
{"type": "Point_wkb_hex", "geom": "0101000000000000000000F03F000000000000F03F"},
{"type": "Point_wkb_hex", "geom": "01020000000300000000000000000000000000000000000000000000000000F03F000000000000F03F00000000000000400000000000000040"},
]



# Apply postgres_adapter to the 'geom' column with default SRID 4326
resource_wkt = postgres_adapter(data_wkt, geometry="geom")
resource_wkb_hex = postgres_adapter(data_wkb_hex, geometry="geom")

# If you need a different SRID
resource_wkt = postgres_adapter(data_wkt, geometry="geom", srid=3242)
```

Ensure that the PostGIS extension is enabled in your Postgres database:

```sql
CREATE EXTENSION postgis;
```

This configuration allows `dlt` to map the `geom` column to the PostGIS `geometry` type for spatial queries and analyses.

:::warning
`LinearRing` geometry type isn't supported.
:::

## Table and column identifiers
Postgres supports both case-sensitive and case-insensitive identifiers. All unquoted and lowercase identifiers resolve case-insensitively in SQL statements. Case insensitive [naming conventions](../../general-usage/naming-convention.md#case-sensitive-and-insensitive-destinations) like the default **snake_case** will generate case-insensitive identifiers. Case sensitive (like **sql_cs_v1**) will generate case-sensitive identifiers that must be quoted in SQL statements.

## Additional destination options
Expand Down
Loading

0 comments on commit 09914a3

Please sign in to comment.