Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into update_deletion_req…
Browse files Browse the repository at this point in the history
…uest_volume
  • Loading branch information
scholtzan committed Nov 22, 2023
2 parents fcc8175 + 57013e2 commit cef4304
Show file tree
Hide file tree
Showing 175 changed files with 5,570 additions and 932 deletions.
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -598,16 +598,19 @@ jobs:
- run:
name: Generate diff
command: |
diff -bur --no-dereference \
diff -bur --no-dereference --new-file \
/tmp/workspace/main-generated-sql/dags/ /tmp/workspace/generated-sql/dags/ \
> /tmp/workspace/generated-sql/sql.diff || true
diff -bur --no-dereference \
diff -bur --no-dereference --new-file \
/tmp/workspace/main-generated-sql/sql/ /tmp/workspace/generated-sql/sql/ \
>> /tmp/workspace/generated-sql/sql.diff || true
- persist_to_workspace:
root: /tmp/workspace
paths:
- generated-sql
- store_artifacts:
path: /tmp/workspace/generated-sql/sql.diff
destination: sql.diff
post-diff:
docker:
- image: circleci/node:8.10.0
Expand All @@ -617,9 +620,6 @@ jobs:
at: /tmp/workspace
- run: npm i circle-github-bot
- run: .circleci/post-diff.js
- store_artifacts:
path: /tmp/integration
destination: /app/integration
reset-stage-env:
docker: *docker
steps:
Expand Down
2 changes: 2 additions & 0 deletions .circleci/post-diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ ${warnings}
bot.comment(process.env.GH_AUTH_TOKEN, `
### Integration report for "${bot.env.commitMessage}"
${diff()}
[Link to full diff](https://output.circle-artifacts.com/output/job/${process.env.CIRCLE_WORKFLOW_JOB_ID}/artifacts/${process.env.CIRCLE_NODE_INDEX}/sql.diff)
`);
12 changes: 1 addition & 11 deletions bigquery_etl/cli/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,7 @@ def _parse_check_output(output: str) -> str:
@click.group(
help="""
Commands for managing and running bqetl data checks.
––––––––––––––––––––––––––––––––––––––––––––––
IN ACTIVE DEVELOPMENT
The current progress can be found under:
\thttps://mozilla-hub.atlassian.net/browse/DENG-919
––––––––––––––––––––––––––––––––––––––––––––––
"""
"""
)
@click.pass_context
def check(ctx):
Expand Down
131 changes: 92 additions & 39 deletions bigquery_etl/cli/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
VERSION_RE = re.compile(r"_v[0-9]+")
DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$")
DEFAULT_DAG_NAME = "bqetl_default"
DEFAULT_INIT_PARALLELISM = 10


@click.group(help="Commands for managing queries.")
Expand Down Expand Up @@ -339,8 +340,6 @@ def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):

dags = DagCollection.from_file(sql_dir.parent / "dags.yaml")

dags_to_be_generated = set()

for query_file in query_files:
try:
metadata = Metadata.of_query_file(query_file)
Expand Down Expand Up @@ -381,21 +380,11 @@ def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):

# update dags since new task has been added
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
dags_to_be_generated.add(dag)
else:
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
if metadata.scheduling == {}:
click.echo(f"No scheduling information for: {query_file}", err=True)
sys.exit(1)
else:
dags_to_be_generated.add(metadata.scheduling["dag_name"])

# re-run DAG generation for the affected DAG
for d in dags_to_be_generated:
existing_dag = dags.dag_by_name(d)
logging.info(f"Running DAG generation for {existing_dag.name}")
output_dir = sql_dir.parent / "dags"
dags.dag_to_airflow(output_dir, existing_dag)


@query.command(
Expand Down Expand Up @@ -1265,14 +1254,45 @@ def validate(
validate_metadata.validate_datasets(dataset_dir)


@query.command(
help="""Create and initialize the destination table for the query.
Only for queries that have an `init.sql` file.
def _initialize_in_parallel(
project,
table,
dataset,
query_file,
arguments,
parallelism,
sample_ids,
addl_templates,
):
with ThreadPool(parallelism) as pool:
# Process all sample_ids in parallel.
pool.map(
partial(
_run_query,
[query_file],
project,
None,
table,
dataset,
addl_templates=addl_templates,
),
[arguments + [f"--parameter=sample_id:INT64:{i}"] for i in sample_ids],
)

Examples:

./bqetl query initialize telemetry_derived.ssl_ratios_v1
""",
@query.command(
help="""Run a full backfill on the destination table for the query.
Using this command will:
- Create the table if it doesn't exist and run a full backfill.
- Run a full backfill if the table exists and is empty.
- Raise an exception if the table exists and has data, or if the table exists and the schema doesn't match the query.
It supports `query.sql` files that use the is_init() pattern, and `init.sql` files.
To run in parallel per sample_id, include a @sample_id parameter in the query.
Examples:
- For init.sql files: ./bqetl query initialize telemetry_derived.ssl_ratios_v1
- For query.sql files and parallel run: ./bqetl query initialize sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v2/query.sql
""",
)
@click.argument("name")
@sql_dir_option
Expand All @@ -1282,8 +1302,12 @@ def validate(
"--dry-run/--no-dry-run",
help="Dry run the initialization",
)
def initialize(name, sql_dir, project_id, dry_run):
@parallelism_option(default=DEFAULT_INIT_PARALLELISM)
@click.pass_context
def initialize(ctx, name, sql_dir, project_id, dry_run, parallelism):
"""Create the destination table for the provided query."""
client = bigquery.Client()

if not is_authenticated():
click.echo("Authentication required for creating tables.", err=True)
sys.exit(1)
Expand All @@ -1310,33 +1334,62 @@ def initialize(name, sql_dir, project_id, dry_run):
for query_file in query_files:
sql_content = query_file.read_text()

# Enable init from query.sql files
# First deploys the schema, then runs the init
# This does not currently verify the accuracy of the schema
# Enable initialization from query.sql files
# Create the table by deploying the schema and metadata, then run the init.
# This does not currently verify the accuracy of the schema or that it
# matches the query.
if "is_init()" in sql_content:
project = query_file.parent.parent.parent.name
dataset = query_file.parent.parent.name
destination_table = query_file.parent.name
Schema.from_schema_file(query_file.parent / SCHEMA_FILE).deploy(
f"{project}.{dataset}.{destination_table}"
)
full_table_id = f"{project}.{dataset}.{destination_table}"

try:
table = client.get_table(full_table_id)
if table.num_rows > 0:
raise click.ClickException(
f"Table {full_table_id} already exists and contains data. The initialization process is terminated."
)
except NotFound:
ctx.invoke(deploy, name=full_table_id, force=True)

arguments = [
"query",
"--use_legacy_sql=false",
"--replace",
"--format=none",
"--append_table",
"--noreplace",
]
_run_query(
query_files=[query_file],
project_id=project,
public_project_id=None,
destination_table=destination_table,
dataset_id=dataset,
query_arguments=arguments,
addl_templates={
"is_init": lambda: True,
},
)
if dry_run:
arguments += ["--dry_run"]

if "@sample_id" in sql_content:
sample_ids = list(range(0, 100))

_initialize_in_parallel(
project=project,
table=destination_table,
dataset=dataset,
query_file=query_file,
arguments=arguments,
parallelism=parallelism,
sample_ids=sample_ids,
addl_templates={
"is_init": lambda: True,
},
)
else:
_run_query(
query_files=[query_file],
project_id=project,
public_project_id=None,
destination_table=destination_table,
dataset_id=dataset,
query_arguments=arguments,
addl_templates={
"is_init": lambda: True,
},
)
else:
init_files = Path(query_file.parent).rglob("init.sql")

Expand Down Expand Up @@ -1513,7 +1566,7 @@ def schema():
)
@use_cloud_function_option
@respect_dryrun_skip_option(default=True)
@parallelism_option
@parallelism_option()
def update(
name,
sql_dir,
Expand Down Expand Up @@ -1892,7 +1945,7 @@ def _update_query_schema(
+ "Must be fully qualified (project.dataset.table)."
),
)
@parallelism_option
@parallelism_option()
@click.pass_context
def deploy(
ctx,
Expand Down
32 changes: 18 additions & 14 deletions bigquery_etl/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ def paths_matching_checks_pattern(
pattern, sql_path, project_id, ["checks.sql"], CHECKS_FILE_RE
)

for checks_file in checks_files:
match = CHECKS_FILE_RE.match(str(checks_file))
if match:
project = match.group(1)
dataset = match.group(2)
table = match.group(3)
yield checks_file, project, dataset, table
if checks_files:
for checks_file in checks_files:
match = CHECKS_FILE_RE.match(str(checks_file))
if match:
project = match.group(1)
dataset = match.group(2)
table = match.group(3)
yield checks_file, project, dataset, table
else:
print(f"No checks.sql file found in {sql_path}/{project_id}/{pattern}")

Expand Down Expand Up @@ -163,13 +164,16 @@ def paths_matching_name_pattern(
default=True,
)

parallelism_option = click.option(
"--parallelism",
"-p",
default=8,
type=int,
help="Number of threads for parallel processing",
)

def parallelism_option(default=8):
"""Generate a parallelism option, with optional default."""
return click.option(
"--parallelism",
"-p",
default=default,
type=int,
help="Number of threads for parallel processing",
)


def project_id_option(default=None, required=False):
Expand Down
8 changes: 4 additions & 4 deletions bigquery_etl/cli/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def create(name, sql_dir, project_id, owner):
is_flag=True,
default=False,
)
@parallelism_option
@parallelism_option()
@respect_dryrun_skip_option()
@no_dryrun_option(default=False)
@click.pass_context
Expand Down Expand Up @@ -170,7 +170,7 @@ def _view_is_valid(view):
),
)
@click.option("--log-level", default="INFO", help="Defaults to INFO")
@parallelism_option
@parallelism_option()
@click.option(
"--dry_run",
"--dry-run",
Expand Down Expand Up @@ -311,7 +311,7 @@ def _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized)
),
)
@click.option("--log-level", default="INFO", help="Defaults to INFO")
@parallelism_option
@parallelism_option()
@click.option(
"--dry_run",
"--dry-run",
Expand Down Expand Up @@ -426,7 +426,7 @@ def _remove_view(client, view_id, dry_run):
"""
)
@project_id_option()
@parallelism_option
@parallelism_option()
@click.option(
"--only",
"-o",
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/copy_deduplicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _list_live_tables(client, pool, project_id, only_tables, table_filter):
type=lambda d: datetime.strptime(d, "%Y-%m-%d").date(),
help="One or more days of data to copy, in format 2019-01-01",
)
@parallelism_option
@parallelism_option()
@click.option(
"--dry_run",
"--dry-run",
Expand Down
2 changes: 1 addition & 1 deletion bigquery_etl/dryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def validate_schema(self):
if not existing_schema.equal(query_schema):
click.echo(
click.style(
f"Schema defined in {existing_schema_path} "
f"ERROR: Schema defined in {existing_schema_path} "
f"incompatible with query {query_file_path}",
fg="red",
),
Expand Down
13 changes: 10 additions & 3 deletions bigquery_etl/metadata/parse_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import enum
import os
import re
import string
from pathlib import Path
from typing import Any, Dict, List, Optional

import attr
Expand Down Expand Up @@ -230,9 +232,14 @@ def from_file(cls, metadata_file):
with open(metadata_file, "r") as yaml_stream:
try:
metadata = yaml.safe_load(yaml_stream)

friendly_name = metadata.get("friendly_name", None)
description = metadata.get("description", None)
table_name = str(Path(metadata_file).parent.name)
friendly_name = metadata.get(
"friendly_name", string.capwords(table_name.replace("_", " "))
)
description = metadata.get(
"description",
"Please provide a description for the query",
)

if "labels" in metadata:
for key, label in metadata["labels"].items():
Expand Down
1 change: 1 addition & 0 deletions bigquery_etl/query_scheduling/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ def of_dq_check(cls, query_file, is_check_fail, metadata=None, dag_collection=No
task.query_file_path = query_file
task.is_dq_check = True
task.is_dq_check_fail = is_check_fail
task.depends_on_past = False
task.retries = 0
task.depends_on_fivetran = []
if task.is_dq_check_fail:
Expand Down
Loading

0 comments on commit cef4304

Please sign in to comment.