Skip to content

Commit

Permalink
APIv2: visit:country_name, visit:region_name, visit:city_name dimensi…
Browse files Browse the repository at this point in the history
…ons (#4328)

* Add data migration for creating and syncing location_data table and dictionary

* Migration to populate location data

* Daily cron to refresh location dataset if changed

* Add support for visit:country_name, visit:region_name and visit:city_name dimensions

Under the hood this relies on a `location_data` table in clickhouse being regularly synced with
plausible/location repo and dictionary lookups used in ALIAS columns

* Update queue name

* Update documentation

* Explicit structs

* Improve docs further

* Migration comment

* Add queues

* Add error when already loaded

* Test for filtering by new dimensions

* Update deps

* dimension -> select_dimension

* Update a test
  • Loading branch information
macobo authored Aug 13, 2024
1 parent 6a7fab6 commit ee3d1e7
Show file tree
Hide file tree
Showing 21 changed files with 376 additions and 19 deletions.
8 changes: 6 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ base_cron = [
# Every day at 1am
{"0 1 * * *", Plausible.Workers.CleanInvitations},
# Every 2 hours
{"0 */2 * * *", Plausible.Workers.ExpireDomainChangeTransitions}
{"0 */2 * * *", Plausible.Workers.ExpireDomainChangeTransitions},
# Daily at midnight
{"0 0 * * *", Plausible.Workers.LocationsSync}
]

cloud_cron = [
Expand Down Expand Up @@ -621,7 +623,9 @@ base_queues = [
analytics_exports: 1,
notify_exported_analytics: 1,
domain_change_transition: 1,
check_accept_traffic_until: 1
check_accept_traffic_until: 1,
clickhouse_clean_sites: 1,
locations_sync: 1
]

cloud_queues = [
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/clean_clickhouse.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Mix.Tasks.CleanClickhouse do
def run(_) do
%{rows: rows} = IngestRepo.query!("show tables")
tables = Enum.map(rows, fn [table] -> table end)
to_truncate = tables -- ["schema_migrations"]
to_truncate = tables -- ["schema_migrations", "location_data", "location_data_dict"]

Enum.each(to_truncate, fn table ->
IngestRepo.query!("truncate #{table}")
Expand Down
16 changes: 16 additions & 0 deletions lib/plausible/clickhouse_location_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Plausible.ClickhouseLocationData do
@moduledoc """
Schema for storing location id <-> translation mappings in ClickHouse
Indirectly read via dictionary `location_data_dictionary` in ALIAS columns in
`events_v2`, `sessions_v2` and `imported_locations` table.
"""
use Ecto.Schema

@primary_key false
schema "location_data" do
field :type, Ch, type: "LowCardinality(String)"
field :id, :string
field :name, :string
end
end
140 changes: 140 additions & 0 deletions lib/plausible/data_migration/locations_sync.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Plausible.DataMigration.LocationsSync do
@moduledoc """
ClickHouse locations data migration for storing location names in ClickHouse.
Only run when `Location.version()` changes: either as a migration or in cron.
The migration:
1. Truncates existing `location_data` table (if exists)
2. Creates new table (if needed)
3. Inserts new data from Location module
4. (Re-)Creates dictionary to read location data from table
5. Creates ALIAS columns in `events_v2`, `sessions_v2` and `imported_locations` table to make reading location names easy
6. Updates table comment for `location_data` to indicate last version synced.
Note that the dictionary is large enough to cache the whole dataset in memory, making lookups fast.
This migration is intended to be idempotent and rerunnable - if run multiple times, it should always set things to the same
result as if run once.
SQL files available at: priv/data_migrations/LocationsSync/sql
"""
alias Plausible.ClickhouseLocationData

use Plausible.DataMigration, dir: "LocationsSync", repo: Plausible.IngestRepo

@columns [
%{
table: "events_v2",
column_name: "country_name",
type: "country",
input_column: "country_code"
},
%{
table: "events_v2",
column_name: "region_name",
type: "subdivision",
input_column: "subdivision1_code"
},
%{
table: "events_v2",
column_name: "city_name",
type: "city",
input_column: "city_geoname_id"
},
%{
table: "sessions_v2",
column_name: "country_name",
type: "country",
input_column: "country_code"
},
%{
table: "sessions_v2",
column_name: "region_name",
type: "subdivision",
input_column: "subdivision1_code"
},
%{
table: "sessions_v2",
column_name: "city_name",
type: "city",
input_column: "city_geoname_id"
},
%{
table: "imported_locations",
column_name: "country_name",
type: "country",
input_column: "country"
},
%{
table: "imported_locations",
column_name: "region_name",
type: "subdivision",
input_column: "region"
},
%{
table: "imported_locations",
column_name: "city_name",
type: "city",
input_column: "city"
}
]

def out_of_date?() do
case run_sql("get-location-data-table-comment") do
{:ok, %{rows: [[stored_version]]}} -> stored_version != Location.version()
_ -> true
end
end

def run() do
cluster? = Plausible.MigrationUtils.clustered_table?("sessions_v2")

{:ok, _} = run_sql("truncate-location-data-table", cluster?: cluster?)
{:ok, _} = run_sql("create-location-data-table", cluster?: cluster?)

countries =
Location.Country.all()
|> Enum.map(fn %Location.Country{alpha_2: alpha_2, name: name} ->
%{type: "country", id: alpha_2, name: name}
end)

subdivisions =
Location.Subdivision.all()
|> Enum.map(fn %Location.Subdivision{code: code, name: name} ->
%{type: "subdivision", id: code, name: name}
end)

cities =
Location.City.all()
|> Enum.map(fn %Location.City{id: id, name: name} ->
%{type: "city", id: Integer.to_string(id), name: name}
end)

insert_data = Enum.concat([countries, subdivisions, cities])
@repo.insert_all(ClickhouseLocationData, insert_data)

{:ok, _} =
run_sql("update-location-data-dictionary",
cluster?: cluster?,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params()
)

for column <- @columns do
{:ok, _} =
run_sql("add-alias-column",
cluster?: cluster?,
table: column.table,
column_name: column.column_name,
type: column.type,
input_column: column.input_column
)
end

{:ok, _} =
run_sql("update-location-data-table-comment",
cluster?: cluster?,
version: Location.version()
)
end
end
19 changes: 5 additions & 14 deletions lib/plausible/data_migration/populate_event_session_columns.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ defmodule Plausible.DataMigration.PopulateEventSessionColumns do
run_sql("create-sessions-dictionary",
cluster?: cluster?,
dictionary_connection_params:
Keyword.get(opts, :dictionary_connection_string, dictionary_connection_params()),
Keyword.get(
opts,
:dictionary_connection_string,
Plausible.MigrationUtils.dictionary_connection_params()
),
dictionary_config: dictionary_config(opts)
)

Expand Down Expand Up @@ -136,19 +140,6 @@ defmodule Plausible.DataMigration.PopulateEventSessionColumns do
|> Map.merge(Keyword.get(opts, :dictionary_config, %{}))
end

# See https://clickhouse.com/docs/en/sql-reference/dictionaries#clickhouse for context
defp dictionary_connection_params() do
Plausible.IngestRepo.config()
|> Enum.map(fn
{:database, database} -> "DB '#{database}'"
{:username, username} -> "USER '#{username}'"
{:password, password} -> "PASSWORD '#{password}'"
_ -> nil
end)
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end

defp get_partitions(opts) do
[min_partition, max_partition] = Keyword.get(opts, :partition_range, ["0", "999999"])

Expand Down
13 changes: 13 additions & 0 deletions lib/plausible/migration_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,17 @@ defmodule Plausible.MigrationUtils do
{:ok, _} -> true
end
end

# See https://clickhouse.com/docs/en/sql-reference/dictionaries#clickhouse for context
def dictionary_connection_params() do
Plausible.IngestRepo.config()
|> Enum.map(fn
{:database, database} -> "DB '#{database}'"
{:username, username} -> "USER '#{username}'"
{:password, password} -> "PASSWORD '#{password}'"
_ -> nil
end)
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end
end
3 changes: 3 additions & 0 deletions lib/plausible/stats/filters/filters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ defmodule Plausible.Stats.Filters do
:country,
:region,
:city,
:country_name,
:region_name,
:city_name,
:entry_page,
:exit_page,
:entry_page_hostname,
Expand Down
3 changes: 3 additions & 0 deletions lib/plausible/stats/imported/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ defmodule Plausible.Stats.Imported.Base do
"visit:country" => "imported_locations",
"visit:region" => "imported_locations",
"visit:city" => "imported_locations",
"visit:country_name" => "imported_locations",
"visit:region_name" => "imported_locations",
"visit:city_name" => "imported_locations",
"visit:device" => "imported_devices",
"visit:browser" => "imported_browsers",
"visit:browser_version" => "imported_browsers",
Expand Down
4 changes: 4 additions & 0 deletions lib/plausible/stats/imported/sql/builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ defmodule Plausible.Stats.Imported.SQL.Builder do
defp filter_group_values(q, "visit:region"), do: where(q, [i], i.region != "")
defp filter_group_values(q, "visit:city"), do: where(q, [i], i.city != 0 and not is_nil(i.city))

defp filter_group_values(q, "visit:country_name"), do: where(q, [i], i.country_name != "ZZ")
defp filter_group_values(q, "visit:region_name"), do: where(q, [i], i.region_name != "")
defp filter_group_values(q, "visit:city_name"), do: where(q, [i], i.city_name != "")

defp filter_group_values(q, _dimension), do: q

def select_joined_dimensions(q, query) do
Expand Down
9 changes: 9 additions & 0 deletions lib/plausible/stats/sql/expression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ defmodule Plausible.Stats.SQL.Expression do
def select_dimension(q, key, "visit:city", _table, _query),
do: select_merge_as(q, [t], %{key => t.city})

def select_dimension(q, key, "visit:country_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.country_name})

def select_dimension(q, key, "visit:region_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.region_name})

def select_dimension(q, key, "visit:city_name", _table, _query),
do: select_merge_as(q, [t], %{key => t.city_name})

def event_metric(:pageviews) do
wrap_alias([e], %{
pageviews:
Expand Down
15 changes: 15 additions & 0 deletions lib/workers/locations_sync.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Plausible.Workers.LocationsSync do
@moduledoc false

use Plausible.Repo
use Oban.Worker, queue: :locations_sync

@impl Oban.Worker
def perform(_job) do
if Plausible.DataMigration.LocationsSync.out_of_date?() do
Plausible.DataMigration.LocationsSync.run()
end

:ok
end
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"joken": {:hex, :joken, "2.6.0", "b9dd9b6d52e3e6fcb6c65e151ad38bf4bc286382b5b6f97079c47ade6b1bcc6a", [:mix], [{:jose, "~> 1.11.5", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "5a95b05a71cd0b54abd35378aeb1d487a23a52c324fa7efdffc512b655b5aaa7"},
"jose": {:hex, :jose, "1.11.6", "613fda82552128aa6fb804682e3a616f4bc15565a048dabd05b1ebd5827ed965", [:mix, :rebar3], [], "hexpm", "6275cb75504f9c1e60eeacb771adfeee4905a9e182103aa59b53fed651ff9738"},
"kaffy": {:hex, :kaffy, "0.10.2", "72e807c525323bd0cbc3ac0c127b7bde61caffdc576fb6554964d3fe6a2a6100", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0.2", [hex: :phoenix_view, repo: "hexpm", optional: false]}], "hexpm", "651cad5f3bcc91510a671c13c7a273b8b8195fdf2d809208708baecbb77300bf"},
"location": {:git, "https://github.com/plausible/location.git", "eddd52590f2423cd677d9206787468e1fb018668", []},
"location": {:git, "https://github.com/plausible/location.git", "a89bf79985c3c3d0830477ae587001156a646ce8", []},
"locus": {:hex, :locus, "2.3.6", "c9f53fd5df872fca66a54dc0aa2f8b2d3640388e56a0c39a741be0df6d8854bf", [:rebar3], [{:tls_certificate_check, "~> 1.9", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "6087aa9a69673e7011837fb4b3d7f756560adde76892c32f5f93904ee30064e2"},
"mail": {:hex, :mail, "0.3.1", "cb0a14e4ed8904e4e5a08214e686ccf6f9099346885db17d8c309381f865cc5c", [:mix], [], "hexpm", "1db701e89865c1d5fa296b2b57b1cd587587cca8d8a1a22892b35ef5a8e352a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE <%= @table %>
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
ADD COLUMN IF NOT EXISTS
<%= @column_name %> String
ALIAS dictGet('location_data_dict', 'name', tuple('<%= @type %>', <%= @input_column %>))
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS location_data <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`type` LowCardinality(String),
`id` String,
`name` String
)
<%= if @cluster? do %>
ENGINE = ReplicateMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/location_data', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY (type, id)
SETTINGS index_granularity = 128
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select comment from system.tables where database = currentDatabase() and table = 'location_data'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TRUNCATE TABLE IF EXISTS location_data <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE OR REPLACE DICTIONARY location_data_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
(
`type` String,
`id` String,
`name` String
)
PRIMARY KEY type, id
SOURCE(CLICKHOUSE(TABLE location_data <%= @dictionary_connection_params %>))
LIFETIME(0)
LAYOUT(complex_key_cache(size_in_cells 500000))
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE location_data
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
MODIFY COMMENT '<%= @version %>'
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Plausible.IngestRepo.Migrations.PopulateLocationData do
use Ecto.Migration

def up do
try do
Location.load_all()
rescue
# Already loaded
ArgumentError -> nil
end

Plausible.DataMigration.LocationsSync.run()
end

def down do
raise "Irreversible"
end
end
Loading

0 comments on commit ee3d1e7

Please sign in to comment.