Skip to content

Commit

Permalink
Add Support for Multiple Pageview Sources (#11)
Browse files Browse the repository at this point in the history
* refactor models, update readme

refactoring the models, adding the ability to support multiple segment data sources

* allowing backwards compat, update readme

* testing artifacts config

* use update postgres image

* add pg_user

* simplify for loop, update readme

---------

Co-authored-by: John Mizerany <jmizerany@fleetio.com>
  • Loading branch information
0adamjones and jmmizerany authored Nov 1, 2023
1 parent 1c2c1b2 commit 68b5545
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 13 deletions.
18 changes: 17 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ jobs:
build:
docker:
- image: cimg/python:3.9.9
- image: circleci/postgres:9.6.5-alpine-ram
- image: cimg/postgres:16.0
environment:
POSTGRES_USER: root

steps:
- checkout
Expand Down Expand Up @@ -52,6 +54,20 @@ jobs:
dbt --warn-error seed --target postgres
dbt --warn-error run --target postgres --full-refresh
dbt --warn-error run --target postgres
- run:
name: Creating Dummy Artifacts
command: |
echo "my artifact file" > /tmp/artifact-1;
mkdir /tmp/artifacts;
echo "my artifact files in a dir" > /tmp/artifacts/artifact-2;
- store_artifacts:
path: /tmp/artifact-1
destination: artifact-file

- store_artifacts:
path: /tmp/artifacts

# - run:
# name: "Run Tests - Redshift"
Expand Down
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ New to dbt packages? Read more about them [here](https://docs.getdbt.com/docs/bu
2. Run `dbt deps`
3. Include the following in your `dbt_project.yml` directly within your `vars:` block (making sure to handle indenting appropriately). **Update the value to point to your segment page views table**.

### Option 1 (Sessionize pageviews from a single source)
```YAML
# dbt_project.yml
config-version: 2
Expand All @@ -22,12 +23,41 @@ vars:
segment_page_views_table: "{{ source('segment', 'pages') }}"

```
OR
```YAML
# dbt_project.yml
config-version: 2
...

vars:
dbt_segment:
segment_page_views_table:
- upstream_model_with_formatted_pageview_data

```
### Option 2 (Sessionize pageviews from multiple sources, only accepts model names--no source data)
```YAML
# dbt_project.yml
config-version: 2
...

vars:
dbt_segment:
segment_page_views_table:
- segment_marketing_site_page_views
- segment_web_app_page_views

```

This package assumes that your data is in a structure similar to the test
file included in [example_segment_pages](integration_tests/seeds/example_segment_pages.csv).
You may have to do some pre-processing in an upstream model to get it into this shape.
Similarly, if you need to union multiple sources, de-duplicate records, or filter
out bad records, do this in an upstream model.

This package previously supported the ability to directly reference source data to use with the package. However, by introducing
the ability to support multiple segment data sources, only models that can be called with a `ref()` function will be supported. If you were using source data before, simply create a new model called `segment_pages` for example, select * from your source data in that model, and list the name of that model under the segment_page_views_table variable.

4. Optionally configure extra parameters by adding them to your own `dbt_project.yml` file – see [dbt_project.yml](dbt_project.yml)
for more details:

Expand All @@ -39,7 +69,9 @@ config-version: 2

vars:
dbt_segment:
segment_page_views_table: "{{ source('segment', 'pages') }}"
segment_page_views_table:
- segment_marketing_site_pages
- segment_web_app_pages
segment_sessionization_trailing_window: 3
segment_inactivity_cutoff: 30 * 60
segment_pass_through_columns: []
Expand All @@ -50,8 +82,19 @@ vars:
the package to run successfully.
6. Execute `dbt run` – the Segment models will get built as part of your run!

## Using Multiple Segment Data Sources
As of November 2023, this package supports the ability to sessionize data from multiple Segment data sources. If you're listing more than Segment data source in the `segment_page_views_table` variable, you'll need to indicate which `source_name` you wish to reference when querying any of the models.

For example:
```
select * from segment_web_page_views where source_name = 'segment_marketing_site_pages'
select * from segment_web_sessions where source_name = 'segment_web_app_pages'
```

Additionally, if the Segment tables you're using don't have the same column count/order, you will need to do some re-factoring in an upstream model to get them into a format where they can be unioned together.

## Database support
This package has been tested on Redshift, Snowflake, BigQuery, and Postgres.
This package should work with Redshift, BigQuery, and Postgres. However, it is only being tested for compatibility with Snowflake.

### Contributing
Additional contributions to this repo are very welcome! Check out [this post](https://discourse.getdbt.com/t/contributing-to-a-dbt-package/657) on the best workflow for contributing to a package. All PRs should only include functionality that is contained within all Segment deployments; no implementation-specific details should be included. CI jobs run on PRs will only test for postgres compatibility.
41 changes: 36 additions & 5 deletions models/base/segment_web_page_views.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
with source as (
with

select * from {{var('segment_page_views_table')}}
{#
The if statement below checks to see if segment_page_views_table is a string or a list, and then builds the model accordingly
#}

{% if var('segment_page_views_table') is string %}

unioned_sources AS (
select 'segment_page_views_table' as source_name, * from {{var('segment_page_views_table')}}
),


{% elif var('segment_page_views_table') is iterable %}

{#
The section below takes each of the items listed for the segment_page_views_table variable, creates CTEs for them,
and then adds a field to note the name of the source table that the records are related to.
#}

unioned_sources as (
{% for table_ref in var('segment_page_views_table', default=[]) %}
SELECT
'{{ table_ref }}' as source_name
, *
FROM
{{ ref(table_ref) }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{% endfor %}
),

{% endif %}

),

row_numbering as (

select
*,
row_number() over (partition by id order by received_at asc) as row_num
from source
row_number() over (partition by source_name, id order by received_at asc) as row_num
from unioned_sources

),

Expand All @@ -26,6 +56,7 @@ renamed as (

select

source_name,
id as page_view_id,
anonymous_id,
user_id,
Expand Down
6 changes: 3 additions & 3 deletions models/sessionization/segment_web_page_views__sessionized.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ numbered as (
*,

row_number() over (
partition by anonymous_id
partition by source_name, anonymous_id
order by tstamp
) as page_view_number

Expand All @@ -63,7 +63,7 @@ lagged as (
*,

lag(tstamp) over (
partition by anonymous_id
partition by source_name, anonymous_id
order by page_view_number
) as previous_tstamp

Expand Down Expand Up @@ -109,7 +109,7 @@ session_numbers as (
*,

sum(new_session) over (
partition by anonymous_id
partition by source_name, anonymous_id
order by page_view_number
rows between unbounded preceding and current row
) as session_number
Expand Down
5 changes: 3 additions & 2 deletions models/sessionization/segment_web_sessions__initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
cluster_by = 'session_id'
)}}

{% set partition_by = "partition by session_id" %}
{% set partition_by = "partition by source_name, session_id" %}

{% set window_clause = "
partition by session_id
partition by source_name, session_id
order by page_view_number
rows between unbounded preceding and unbounded following
" %}
Expand Down Expand Up @@ -66,6 +66,7 @@ agg as (

select distinct

source_name,
session_id,
anonymous_id,
min(tstamp) over ( {{partition_by}} ) as session_start_tstamp,
Expand Down

0 comments on commit 68b5545

Please sign in to comment.