Skip to content

Commit

Permalink
Update fenix import and models
Browse files Browse the repository at this point in the history
This includes combining rows for histograms and percentiles and adding a
build_date column.
  • Loading branch information
robhudson committed Jul 7, 2020
1 parent d90eac8 commit 9adef25
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 91 deletions.
1 change: 1 addition & 0 deletions glam/api/management/commands/import_desktop_aggs.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def import_file(self, tmp_table, fp, model):
if f not in ["id", "total_users", "histogram", "percentiles"]
]

log(" Importing file into temp table.")
with connection.cursor() as cursor:
with open(fp.name, "r") as tmp_file:
sql = f"""
Expand Down
22 changes: 16 additions & 6 deletions glam/api/management/commands/import_glean_aggs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


GCS_BUCKET = "glam-dev-bespoke-nonprod-dataops-mozgcp-net"
PRODUCT_MODEL_MAP = {"fenix": "api.FenixAggregation"}
PRODUCT_MODEL_MAP = {"org_mozilla_fenix": "api.FenixAggregation"}


def log(message):
Expand All @@ -21,7 +21,7 @@ def log(message):

class Command(BaseCommand):

help = "Imports user counts"
help = "Imports Glean product aggregations"

def add_arguments(self, parser):
parser.add_argument(
Expand Down Expand Up @@ -67,7 +67,7 @@ def handle(self, product, bucket, *args, **options):

# Load CSV into temp table & insert data from temp table into
# aggregation tables, using upserts.
self.import_file(tmp_table, model, fp)
self.import_file(tmp_table, fp, model)

# Drop temp table and remove file.
log("Dropping temp table.")
Expand All @@ -76,13 +76,22 @@ def handle(self, product, bucket, *args, **options):
log(f"Deleting local file: {fp.name}.")
fp.close()

def import_file(self, tmp_table, model, fp):
# Once all files are loaded, refresh the materialized views.

if blobs:
with connection.cursor() as cursor:
view = f"view_{model._meta.db_table}"
log(f"Refreshing materialized view for {view}")
cursor.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view}")
log("Refresh completed.")

def import_file(self, tmp_table, fp, model):

csv_columns = [f.name for f in model._meta.get_fields() if f.name not in ["id"]]
conflict_columns = [
f
for f in model._meta.constraints[0].fields
if f not in ["id", "total_users", "data"]
if f not in ["id", "total_users", "histogram", "percentiles"]
]

log(" Importing file into temp table.")
Expand All @@ -101,6 +110,7 @@ def import_file(self, tmp_table, model, fp):
ON CONFLICT ({", ".join(conflict_columns)})
DO UPDATE SET
total_users = EXCLUDED.total_users,
data = EXCLUDED.data
histogram = EXCLUDED.histogram,
percentiles = EXCLUDED.percentiles
"""
cursor.execute(sql)
67 changes: 67 additions & 0 deletions glam/api/migrations/0011_transpose_fenix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Generated by Django 3.0.6 on 2020-05-22 21:29

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api', '0010_remove_old_tables'),
]

operations = [
migrations.DeleteModel(
name='FenixAggregation',
),

migrations.CreateModel(
name='FenixAggregation',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('channel', models.CharField(max_length=100)),
('version', models.CharField(max_length=100)),
('ping_type', models.CharField(max_length=100)),
('os', models.CharField(max_length=100)),
('build_id', models.CharField(max_length=100)),
('build_date', models.DateTimeField(default=None, null=True)),
('metric', models.CharField(max_length=200)),
('metric_type', models.CharField(max_length=100)),
('metric_key', models.CharField(blank=True, max_length=200)),
('client_agg_type', models.CharField(blank=True, max_length=100)),
('total_users', models.IntegerField()),
('histogram', models.TextField(blank=True, null=True)),
('percentiles', models.TextField(blank=True, null=True)),
],
options={
'db_table': 'glam_fenix_aggregation',
'abstract': False,
},
),
migrations.AddConstraint(
model_name='fenixaggregation',
constraint=models.UniqueConstraint(fields=('channel', 'version', 'ping_type', 'os', 'build_id', 'metric', 'metric_type', 'metric_key', 'client_agg_type'), name='fenix_unique_dimensions'),
),
migrations.CreateModel(
name='FenixAggregationView',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('channel', models.CharField(max_length=100)),
('version', models.CharField(max_length=100)),
('ping_type', models.CharField(max_length=100)),
('os', models.CharField(max_length=100)),
('build_id', models.CharField(max_length=100)),
('build_date', models.DateTimeField(default=None, null=True)),
('metric', models.CharField(max_length=200)),
('metric_type', models.CharField(max_length=100)),
('metric_key', models.CharField(blank=True, max_length=200)),
('client_agg_type', models.CharField(blank=True, max_length=100)),
('total_users', models.IntegerField()),
('histogram', models.TextField(blank=True, null=True)),
('percentiles', models.TextField(blank=True, null=True)),
],
options={
'db_table': 'view_glam_fenix_aggregation',
'managed': False,
},
),
]
21 changes: 21 additions & 0 deletions glam/api/migrations/0012_fenix_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("api", "0011_transpose_fenix"),
]

operations = [
migrations.RunSQL(
[
"CREATE MATERIALIZED VIEW view_glam_fenix_aggregation AS SELECT * FROM glam_fenix_aggregation",
"CREATE UNIQUE INDEX ON view_glam_fenix_aggregation (id)",
"CREATE INDEX ON view_glam_fenix_aggregation (version)",
"CREATE INDEX ON view_glam_fenix_aggregation USING HASH (metric)",
"CREATE INDEX ON view_glam_fenix_aggregation USING HASH (os)",
],
reverse_sql=["DROP MATERIALIZED VIEW view_glam_fenix_aggregation"],
)
]
85 changes: 49 additions & 36 deletions glam/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,55 @@ def populate_labels_cache(cls):
cache.set("__labels__", True)


class AbstractFenixAggregation(models.Model):
id = models.BigAutoField(primary_key=True)
# Dimensions.
channel = models.CharField(max_length=100)
version = models.CharField(max_length=100)
ping_type = models.CharField(max_length=100)
os = models.CharField(max_length=100)
build_id = models.CharField(max_length=100)
build_date = models.DateTimeField(null=True)
metric = models.CharField(max_length=200)
metric_type = models.CharField(max_length=100)
metric_key = models.CharField(max_length=200, blank=True)
client_agg_type = models.CharField(max_length=100, blank=True)
# Data.
total_users = models.IntegerField()
histogram = models.TextField(null=True, blank=True)
percentiles = models.TextField(null=True, blank=True)

class Meta:
abstract = True


class FenixAggregation(AbstractFenixAggregation):
class Meta(AbstractFenixAggregation.Meta):
db_table = "glam_fenix_aggregation"
constraints = [
models.UniqueConstraint(
name="fenix_unique_dimensions",
fields=[
"channel",
"version",
"ping_type",
"os",
"build_id",
"metric",
"metric_type",
"metric_key",
"client_agg_type",
],
)
]


class FenixAggregationView(AbstractFenixAggregation):
class Meta:
managed = False
db_table = "view_glam_fenix_aggregation"


class AbstractDesktopAggregation(models.Model):
id = models.BigAutoField(primary_key=True)
# Dimensions.
Expand Down Expand Up @@ -127,39 +176,3 @@ class Meta:
fields=["channel", "version", "build_id", "os"],
)
]


class FenixAggregation(models.Model):
id = models.BigAutoField(primary_key=True)
channel = models.CharField(max_length=100)
version = models.CharField(max_length=100)
ping_type = models.CharField(max_length=100)
os = models.CharField(max_length=100)
build_id = models.CharField(max_length=100)
metric = models.CharField(max_length=200)
metric_type = models.CharField(max_length=100)
metric_key = models.CharField(max_length=200, blank=True)
client_agg_type = models.CharField(max_length=100, blank=True)
agg_type = models.CharField(max_length=100)
total_users = models.IntegerField()
data = JSONField()

class Meta:
db_table = "glam_fenix_aggregation"
constraints = [
models.UniqueConstraint(
name="fenix_unique_dimensions",
fields=[
"channel",
"version",
"ping_type",
"os",
"build_id",
"metric",
"metric_type",
"metric_key",
"client_agg_type",
"agg_type",
],
)
]
71 changes: 22 additions & 49 deletions glam/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
DesktopBetaAggregationView,
DesktopNightlyAggregationView,
DesktopReleaseAggregationView,
FenixAggregation,
FenixAggregationView,
FirefoxCounts,
Probe,
)
Expand Down Expand Up @@ -145,13 +145,10 @@ def get_glean_aggregations(request, **kwargs):
)

MODEL_MAP = {
"fenix": FenixAggregation,
"fenix": FenixAggregationView,
}

try:
model = MODEL_MAP[kwargs.get("product")]
except KeyError:
raise ValidationError("Product not currently supported.")
model = MODEL_MAP[kwargs.get("product")]

num_versions = kwargs.get("versions", 3)
try:
Expand All @@ -160,74 +157,50 @@ def get_glean_aggregations(request, **kwargs):
raise ValidationError("Query version cannot be determined")
versions = list(map(str, range(max_version, max_version - num_versions, -1)))

probe = kwargs["probe"]
os = kwargs.get("os", "*")

dimensions = [
Q(metric=kwargs["probe"]),
Q(metric=probe),
Q(version__in=versions),
Q(os=kwargs.get("os") or "*"),
Q(os=os),
]

aggregation_level = kwargs["aggregationLevel"]
# Whether to pull aggregations by version or build_id.
if aggregation_level == "version":
dimensions.append(Q(build_id="*"))
# counts = _get_fenix_counts(channel, os, versions, by_build=False)
elif aggregation_level == "build_id":
dimensions.append(~Q(build_id="*"))
# counts = _get_fenix_counts(channel, os, versions, by_build=True)

result = model.objects.filter(*dimensions)

response = {}
response = []

for row in result:

metadata = {
data = {
"channel": row.channel,
"version": row.version,
"ping_type": row.ping_type,
"os": row.os,
"build_id": row.build_id,
"build_date": row.build_date,
"metric": row.metric,
"metric_type": row.metric_type,
"ping_type": row.ping_type,
"metric_key": row.metric_key,
"client_agg_type": row.client_agg_type,
"total_users": row.total_users,
"histogram": row.histogram and orjson.loads(row.histogram) or "",
"percentiles": row.percentiles and orjson.loads(row.percentiles) or "",
}
aggs = {d["key"]: round(d["value"], 4) for d in row.data}

# We use these keys to merge data dictionaries.
key = "{channel}-{version}-{metric}-{os}-{build_id}-{ping_type}".format(
**metadata
)
sub_key = "{key}-{client_agg_type}".format(
key=row.metric_key, client_agg_type=row.client_agg_type
)

record = response.get(key, {})
if "metadata" not in record:
record["metadata"] = metadata

if sub_key not in record:
record[sub_key] = {}

new_data = {}
new_data[row.agg_type] = aggs

if row.agg_type == constants.AGGREGATION_HISTOGRAM:
new_data["total_users"] = row.total_users

if row.metric_key:
new_data["key"] = row.metric_key

if row.client_agg_type:
new_data["client_agg_type"] = row.client_agg_type

data = record[sub_key].get("data", {})
data.update(new_data)

record[sub_key]["data"] = data
response[key] = record
# Get the total distinct client IDs for this set of dimensions.
# data["total_addressable_market"] = counts.get(f"{row.version}-{row.build_id}")

# Restructure data and remove keys only used for merging data.
response = [
{"metadata": r.pop("metadata"), "data": [d["data"] for d in r.values()]}
for r in response.values()
]
response.append(data)

return response

Expand Down

0 comments on commit 9adef25

Please sign in to comment.