-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
93 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import geopandas as gpd # noqa: F401 | ||
import pandas as pd # noqa: F401 | ||
import pyspark.sql.functions as F | ||
import pyspark.sql.types as T | ||
from shapely import wkt | ||
from shapely.geometry import LineString | ||
|
||
|
||
@F.udf(returnType=T.FloatType()) | ||
def point_array_to_linestring(arr): | ||
if len(arr) == 1: | ||
return 0 | ||
return LineString([wkt.loads(p) for p in arr]).length | ||
|
||
|
||
def model(dbt, session): | ||
dbt.config(materialized="incremental", packages=["pandas", "geopandas"]) | ||
|
||
df = dbt.ref("dim_shapes_arrays_wkt") | ||
|
||
if dbt.is_incremental and session.catalog.tableExists(repr(dbt.this)): | ||
# only new rows compared to max in current table | ||
max_from_this = f"select max(_feed_valid_from) from `{dbt.this}`" | ||
df = df.filter(df._feed_valid_from > session.sql(max_from_this).collect()[0][0]) | ||
|
||
df = df.withColumn( | ||
"length_meters", point_array_to_linestring(F.col("pt_array_wkt")) | ||
) | ||
|
||
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
{{ | ||
config( | ||
materialized='table', | ||
cluster_by='feed_key', | ||
) | ||
}} | ||
|
||
|
||
WITH dim_shapes AS ( | ||
SELECT * FROM {{ ref('dim_shapes') }} | ||
), | ||
|
||
-- first, cast lat/long to geography | ||
lat_long AS ( | ||
SELECT | ||
feed_key, | ||
base64_url, | ||
shape_id, | ||
shape_pt_sequence, | ||
ST_GEOGPOINT( | ||
shape_pt_lon, | ||
shape_pt_lat | ||
) AS pt_geom, | ||
_feed_valid_from, | ||
FROM dim_shapes | ||
), | ||
|
||
-- collect points into an array | ||
initial_pt_array AS ( | ||
SELECT | ||
feed_key, | ||
base64_url, | ||
shape_id, | ||
_feed_valid_from, | ||
-- don't try to make LINESTRING because of this issue: | ||
-- https://stackoverflow.com/questions/58234223/st-makeline-discarding-duplicate-points-even-if-not-consecutive | ||
-- also: https://gis.stackexchange.com/questions/426188/can-i-represent-a-route-that-doubles-back-on-itself-in-bigquery-with-a-linestrin | ||
-- so instead this is just an array of WKT points | ||
ARRAY_AGG( | ||
-- ignore nulls so it doesn't error out if there's a null point | ||
ST_ASTEXT(pt_geom) IGNORE NULLS | ||
ORDER BY shape_pt_sequence) | ||
AS pt_array_wkt, | ||
-- count number of rows so we can check for nulls (drops) later | ||
COUNT(*) AS ct | ||
FROM lat_long | ||
GROUP BY feed_key, base64_url, shape_id, _feed_valid_from | ||
), | ||
|
||
dim_shapes_arrays_wkt AS ( | ||
SELECT | ||
{{ dbt_utils.generate_surrogate_key(['feed_key', 'shape_id']) }} AS key, | ||
feed_key, | ||
shape_id, | ||
pt_array_wkt, | ||
base64_url, | ||
_feed_valid_from, | ||
FROM initial_pt_array | ||
-- drop shapes that had nulls | ||
WHERE ARRAY_LENGTH(pt_array_wkt) = ct | ||
) | ||
|
||
SELECT * FROM dim_shapes_arrays_wkt |