Skip to content

Commit

Permalink
Materialized views and aggregated tables for event monitoring (#4478)
Browse files Browse the repository at this point in the history
* WIP event monitoring

* Add FxA custom events to view definition (#4483)

* Add FxA custom events to view definition

* Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql

* Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql

* Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql

* Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql

---------

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

* Move event monitoring to glean_usage generator

* Add cross-app event monitoring view

* Generate cross app monitoring

* Simplyfy event monitoring aggregation

---------

Co-authored-by: akkomar <akkomar@users.noreply.github.com>
  • Loading branch information
2 people authored and irrationalagent committed Dec 11, 2023
1 parent 54dada9 commit 5712712
Show file tree
Hide file tree
Showing 16 changed files with 578 additions and 10 deletions.
8 changes: 7 additions & 1 deletion bigquery_etl/cli/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,13 @@ def initialize(name, sql_dir, project_id, dry_run):
# allow name to be a path
query_files = [Path(name)]
else:
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
file_regex = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|init\.sql)$"
)
query_files = paths_matching_name_pattern(
name, sql_dir, project_id, file_regex=file_regex
)

if not query_files:
click.echo(
Expand Down
3 changes: 1 addition & 2 deletions bigquery_etl/cli/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

VIEW_FILE = "view.sql"
QUERY_FILE = "query.sql"
INIT_FILE = "init.sql"
QUERY_SCRIPT = "query.py"
ROOT = Path(__file__).parent.parent.parent
TEST_DIR = ROOT / "tests" / "sql"
Expand Down Expand Up @@ -356,7 +355,7 @@ def _deploy_artifacts(ctx, artifact_files, project_id, dataset_suffix, sql_dir):
query_files = [
file
for file in artifact_files
if file.name in [INIT_FILE, QUERY_FILE, QUERY_SCRIPT]
if file.name in [QUERY_FILE, QUERY_SCRIPT]
# don't attempt to deploy wildcard or metadata tables
and "*" not in file.parent.name and file.parent.name != "INFORMATION_SCHEMA"
]
Expand Down
7 changes: 7 additions & 0 deletions bqetl_project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ dry_run:
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_beta_derived/experiment_events_live_v1/init.sql
- sql/moz-fx-data-shared-prod/telemetry_derived/experiment_enrollment_cumulative_population_estimate_v1/view.sql
- sql/moz-fx-data-shared-prod/telemetry/experiment_enrollment_cumulative_population_estimate/view.sql
- sql/moz-fx-data-shared-prod/**/event_monitoring_live_v1/init.sql
- sql/moz-fx-data-shared-prod/monitoring/event_monitoring_live/view.sql
# Already exists (and lacks an "OR REPLACE" clause)
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql
Expand Down Expand Up @@ -426,3 +428,8 @@ generate:
- sql/moz-fx-data-shared-prod/mozilla_vpn/events/**
- sql/moz-fx-data-shared-prod/mozilla_vpn/main/**
- sql/moz-fx-data-shared-prod/fenix/client_deduplication/**
event_monitoring:
skip_apps:
- mlhackweek_search
- regrets_reporter
- regrets_reporter_ucs
15 changes: 15 additions & 0 deletions dags/bqetl_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@
email=["ascholtz@mozilla.com"],
)

monitoring_derived__event_monitoring_aggregates__v1 = bigquery_etl_query(
task_id="monitoring_derived__event_monitoring_aggregates__v1",
destination_table="event_monitoring_aggregates_v1",
dataset_id="monitoring_derived",
project_id="moz-fx-data-shared-prod",
owner="ascholtz@mozilla.com",
email=["akomar@mozilla.com", "ascholtz@mozilla.com"],
date_partition_parameter="submission_date",
depends_on_past=False,
)

monitoring_derived__jobs_by_organization__v1 = gke_command(
task_id="monitoring_derived__jobs_by_organization__v1",
command=[
Expand Down Expand Up @@ -323,6 +334,10 @@
wait_for_copy_deduplicate_main_ping
)

monitoring_derived__event_monitoring_aggregates__v1.set_upstream(
wait_for_copy_deduplicate_all
)

monitoring_derived__stable_and_derived_table_sizes__v1.set_upstream(
wait_for_copy_deduplicate_all
)
Expand Down
21 changes: 20 additions & 1 deletion sql_generators/glean_usage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
baseline_clients_first_seen,
baseline_clients_last_seen,
clients_last_seen_joined,
event_monitoring_live,
events_unnested,
glean_app_ping_views,
metrics_clients_daily,
Expand All @@ -32,6 +33,7 @@
metrics_clients_daily.MetricsClientsDaily(),
metrics_clients_last_seen.MetricsClientsLastSeen(),
clients_last_seen_joined.ClientsLastSeenJoined(),
event_monitoring_live.EventMonitoringLive(),
]

# * mlhackweek_search was an experiment that we don't want to generate tables
Expand Down Expand Up @@ -152,5 +154,22 @@ def generate(
for table in GLEAN_TABLES
]

# Parameters to generate datasets that union all app datasets
generate_across_apps = [
(
partial(
table.generate_across_apps,
target_project,
output_dir=output_dir,
use_cloud_function=use_cloud_function,
),
app_info,
)
for table in GLEAN_TABLES
]

with ProcessingPool(parallelism) as pool:
pool.map(lambda f: f[0](f[1]), generate_per_app_id + generate_per_app)
pool.map(
lambda f: f[0](f[1]),
generate_per_app_id + generate_per_app + generate_across_apps,
)
13 changes: 11 additions & 2 deletions sql_generators/glean_usage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def table_names_from_baseline(baseline_table, include_project_id=True):
daily_view=f"{prefix}.baseline_clients_daily",
last_seen_view=f"{prefix}.baseline_clients_last_seen",
first_seen_view=f"{prefix}.baseline_clients_first_seen",
event_monitoring=f"{prefix}_derived.event_monitoring_live_v1",
)


Expand Down Expand Up @@ -160,6 +161,7 @@ def __init__(self):
self.no_init = True
self.per_app_id_enabled = True
self.per_app_enabled = True
self.across_apps_enabled = True
self.cross_channel_template = "cross_channel.view.sql"

def skip_existing(self, output_dir="sql/", project_id="moz-fx-data-shared-prod"):
Expand Down Expand Up @@ -255,8 +257,7 @@ def generate_per_app_id(
if not (referenced_table_exists(view_sql)):
logging.info("Skipping view for table which doesn't exist:" f" {table}")
else:
artifacts.append(
Artifact(view, "view.sql", view_sql))
artifacts.append(Artifact(view, "view.sql", view_sql))

skip_existing_artifact = self.skip_existing(output_dir, project_id)

Expand Down Expand Up @@ -394,3 +395,11 @@ def generate_per_app(

write_dataset_metadata(output_dir, view)
write_dataset_metadata(output_dir, table, derived_dataset_metadata=True)

def generate_across_apps(
self, project_id, apps, output_dir=None, use_cloud_function=True
):
"""Generate a query across all apps."""
# logic for implementing cross-app queries needs to be implemented in the
# individual classes
return
180 changes: 180 additions & 0 deletions sql_generators/glean_usage/event_monitoring_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Generate Materialized Views and aggregate queries for event monitoring."""

import os
from collections import namedtuple
from datetime import datetime
from pathlib import Path

from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
from sql_generators.glean_usage.common import (
GleanTable,
get_app_info,
get_table_dir,
render,
table_names_from_baseline,
write_sql,
)

TARGET_TABLE_ID = "event_monitoring_live_v1"
TARGET_DATASET_CROSS_APP = "monitoring"
PREFIX = "event_monitoring"
PATH = Path(os.path.dirname(__file__))


class EventMonitoringLive(GleanTable):
"""Represents the generated materialized view for event monitoring."""

def __init__(self) -> None:
"""Initialize materialized view generation."""
self.no_init = False
self.per_app_id_enabled = True
self.per_app_enabled = False
self.across_apps_enabled = True
self.prefix = PREFIX
self.target_table_id = TARGET_TABLE_ID
self.custom_render_kwargs = {}

def generate_per_app_id(
self, project_id, baseline_table, output_dir=None, use_cloud_function=True
):
tables = table_names_from_baseline(baseline_table, include_project_id=False)

init_filename = f"{self.target_table_id}.init.sql"
metadata_filename = f"{self.target_table_id}.metadata.yaml"

table = tables[f"{self.prefix}"]
dataset = tables[self.prefix].split(".")[-2].replace("_derived", "")

render_kwargs = dict(
header="-- Generated via bigquery_etl.glean_usage\n",
header_yaml="---\n# Generated via bigquery_etl.glean_usage\n",
project_id=project_id,
derived_dataset=tables[self.prefix].split(".")[-2],
dataset=dataset,
current_date=datetime.today().strftime("%Y-%m-%d"),
app_name=[
app_dataset["canonical_app_name"]
for _, app in get_app_info().items()
for app_dataset in app
if dataset == app_dataset["bq_dataset_family"]
][0],
)

render_kwargs.update(self.custom_render_kwargs)
render_kwargs.update(tables)

# generated files to update
Artifact = namedtuple("Artifact", "table_id basename sql")
artifacts = []

if not self.no_init:
init_sql = render(
init_filename, template_folder=PATH / "templates", **render_kwargs
)
metadata = render(
metadata_filename,
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)
artifacts.append(Artifact(table, "metadata.yaml", metadata))

skip_existing_artifact = self.skip_existing(output_dir, project_id)

if output_dir:
if not self.no_init:
artifacts.append(Artifact(table, "init.sql", init_sql))

for artifact in artifacts:
destination = (
get_table_dir(output_dir, artifact.table_id) / artifact.basename
)
skip_existing = str(destination) in skip_existing_artifact

write_sql(
output_dir,
artifact.table_id,
artifact.basename,
artifact.sql,
skip_existing=skip_existing,
)

def generate_across_apps(
self, project_id, apps, output_dir=None, use_cloud_function=True
):
"""Generate a query across all apps."""
if not self.across_apps_enabled:
return

prod_datasets_with_baseline = [
s.bq_dataset_family
for s in get_stable_table_schemas()
if s.schema_id == "moz://mozilla.org/schemas/glean/ping/1"
and s.bq_table == "baseline_v1"
]

aggregate_table = "event_monitoring_aggregates_v1"
target_view_name = "_".join(self.target_table_id.split("_")[:-1])

render_kwargs = dict(
header="-- Generated via bigquery_etl.glean_usage\n",
header_yaml="---\n# Generated via bigquery_etl.glean_usage\n",
project_id=project_id,
target_view=f"{TARGET_DATASET_CROSS_APP}.{target_view_name}",
table=target_view_name,
target_table=f"{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}",
apps=apps,
prod_datasets=prod_datasets_with_baseline,
)
render_kwargs.update(self.custom_render_kwargs)

skip_existing_artifacts = self.skip_existing(output_dir, project_id)

Artifact = namedtuple("Artifact", "table_id basename sql")

query_filename = f"{aggregate_table}.query.sql"
query_sql = render(
query_filename, template_folder=PATH / "templates", **render_kwargs
)
metadata = render(
f"{aggregate_table}.metadata.yaml",
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)
table = f"{project_id}.{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}"

view_sql = render(
"event_monitoring_live.view.sql",
template_folder=PATH / "templates",
**render_kwargs,
)
view_metadata = render(
"event_monitoring_live.metadata.yaml",
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)

view = f"{project_id}.{TARGET_DATASET_CROSS_APP}.{target_view_name}"
if output_dir:
artifacts = [
Artifact(table, "metadata.yaml", metadata),
Artifact(table, "query.sql", query_sql),
Artifact(view, "metadata.yaml", view_metadata),
Artifact(view, "view.sql", view_sql),
]

for artifact in artifacts:
destination = (
get_table_dir(output_dir, artifact.table_id) / artifact.basename
)
skip_existing = destination in skip_existing_artifacts

write_sql(
output_dir,
artifact.table_id,
artifact.basename,
artifact.sql,
skip_existing=skip_existing,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Generated via ./bqetl generate glean_usage
CREATE OR REPLACE VIEW
`{{ project_id }}.{{ target_view }}`
AS
{% for (dataset, channel) in datasets -%}
{% if not loop.first -%}
UNION ALL
{% endif -%}
SELECT
{% if app_name == "fenix" -%}
mozfun.norm.fenix_app_info("{{ dataset }}", client_info.app_build).channel AS normalized_channel,
{% elif datasets|length > 1 -%}
"{{ channel }}" AS normalized_channel,
{% endif -%}
normalized_app_name,
window_start,
window_end,
event_category,
event_name,
event_extra_key,
country,
version,
total_events
FROM
`{{ project_id }}.{{ dataset }}_derived.event_monitoring_live_v1`
{% endfor %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
friendly_name: Event Monitoring Aggregates
description: |-
Materialized view of experimentation related events
coming from all Glean apps.
owners:
- ascholtz@mozilla.com
- akomar@mozilla.com
labels:
incremental: true
scheduling:
dag_name: bqetl_monitoring
bigquery:
time_partitioning:
type: day
field: submission_date
require_partitions_filter: false
clustering:
fields:
- event_name
- normalized_channel
- normalized_app_name
Loading

0 comments on commit 5712712

Please sign in to comment.