Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add richer metadata to the analysis aggregation query #922

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion ooniapi/services/oonimeasurements/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ imagedefinitions.json:
echo '[{"name":"${ECS_CONTAINER_NAME}","imageUri":"${IMAGE_NAME}:${BUILD_LABEL}"}]' > imagedefinitions.json

test:
hatch run test
hatch run test $(ARGS)

test-cov:
hatch run test-cov
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel

from .utils import get_measurement_start_day_agg, TimeGrains
from .utils import get_measurement_start_day_agg, TimeGrains, parse_probe_asn
from ...sql import format_aggregate_query
from ...dependencies import (
get_clickhouse_session,
)
Expand All @@ -25,11 +26,7 @@


AggregationKeys = Literal[
"measurement_start_day",
"domain",
"probe_cc",
"probe_asn",
"test_name",
"measurement_start_day", "domain", "probe_cc", "probe_asn", "test_name", "input"
]


Expand All @@ -40,20 +37,43 @@ class DBStats(BaseModel):
total_row_count: int


class Loni(BaseModel):
dns_isp_blocked: float
dns_isp_down: float
dns_isp_ok: float
dns_other_blocked: float
dns_other_down: float
dns_other_ok: float
tls_blocked: float
tls_down: float
tls_ok: float
tcp_blocked: float
tcp_down: float
tcp_ok: float

dns_isp_outcome: str
dns_other_outcome: str
tcp_outcome: str
tls_outcome: str


class AggregationEntry(BaseModel):
anomaly_count: float
confirmed_count: float
failure_count: float
ok_count: float
measurement_count: float
count: float

measurement_start_day: Optional[datetime] = None

measurement_start_day: date
outcome_label: str
outcome_value: float
outcome_ok: float
outcome_blocked: float
outcome_down: float

loni: Loni

domain: Optional[str] = None
probe_cc: Optional[str] = None
probe_asn: Optional[int] = None
test_name: Optional[str] = None
input: Optional[str] = None


class AggregationResponse(BaseModel):
Expand All @@ -67,7 +87,6 @@ class AggregationResponse(BaseModel):
async def get_aggregation_analysis(
axis_x: Annotated[AggregationKeys, Query()] = "measurement_start_day",
axis_y: Annotated[Optional[AggregationKeys], Query()] = None,
category_code: Annotated[Optional[str], Query()] = None,
test_name: Annotated[Optional[str], Query()] = None,
domain: Annotated[Optional[str], Query()] = None,
input: Annotated[Optional[str], Query()] = None,
Expand Down Expand Up @@ -97,8 +116,7 @@ async def get_aggregation_analysis(
extra_cols[axis_x] = axis_x

if probe_asn is not None:
if isinstance(probe_asn, str) and probe_asn.startswith("AS"):
probe_asn = int(probe_asn[2:])
probe_asn = parse_probe_asn(probe_asn)
q_args["probe_asn"] = probe_asn
and_clauses.append("probe_asn = %(probe_asn)d")
extra_cols["probe_asn"] = "probe_asn"
Expand Down Expand Up @@ -147,150 +165,77 @@ async def get_aggregation_analysis(
where += " WHERE "
where += " AND ".join(and_clauses)

q = f"""
WITH
mapFilter((k, v) -> v != 0, dns_nok_outcomes) as dns_outcomes,
mapFilter((k, v) -> v != 0, tcp_nok_outcomes) as tcp_outcomes,
mapFilter((k, v) -> v != 0, tls_nok_outcomes) as tls_outcomes,

arrayZip(mapKeys(dns_outcomes), mapValues(dns_outcomes)) as dns_outcome_list,
arraySum((v) -> v.2, dns_outcome_list) as dns_nok_sum,
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/dns_nok_sum), dns_outcome_list)) as dns_outcomes_norm,

arrayZip(mapKeys(tcp_outcomes), mapValues(tcp_outcomes)) as tcp_outcome_list,
arraySum((v) -> v.2, tcp_outcome_list) as tcp_nok_sum,
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tcp_nok_sum), tcp_outcome_list)) as tcp_outcomes_norm,

arrayZip(mapKeys(tls_outcomes), mapValues(tls_outcomes)) as tls_outcome_list,
arraySum((v) -> v.2, tls_outcome_list) as tls_nok_sum,
arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tls_nok_sum), tls_outcome_list)) as tls_outcomes_norm,

arraySort(
(v) -> -v.2,
[
(dns_outcome_nok_label, dns_outcome_nok_value),
(tcp_outcome_nok_label, tcp_outcome_nok_value),
(tls_outcome_nok_label, tls_outcome_nok_value),
IF(
tls_ok_sum = 0 AND tls_outcome_nok_value = 0,
-- Special case for when the tested target was not supporting HTTPS and hence the TLS outcome is not so relevant
('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value])),
('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value, tls_outcome_ok_value]))
)
]
) as all_outcomes_sorted,

arrayConcat(dns_outcomes_norm, tcp_outcomes_norm, tls_outcomes_norm) as all_nok_outcomes,

dns_outcomes_norm[1].1 as dns_outcome_nok_label,
dns_outcomes_norm[1].2 as dns_outcome_nok_value,

tcp_outcomes_norm[1].1 as tcp_outcome_nok_label,
tcp_outcomes_norm[1].2 as tcp_outcome_nok_value,

tls_outcomes_norm[1].1 as tls_outcome_nok_label,
tls_outcomes_norm[1].2 as tls_outcome_nok_value,

IF(dns_ok_sum > 0, 1 - dns_outcome_nok_value, 0) as dns_outcome_ok_value,
IF(tcp_ok_sum > 0, 1 - tcp_outcome_nok_value, 0) as tcp_outcome_ok_value,
IF(tls_ok_sum > 0, 1 - tls_outcome_nok_value, 0) as tls_outcome_ok_value,

all_outcomes_sorted[1].1 as final_outcome_label,
IF(final_outcome_label = 'ok', all_outcomes_sorted[1].2, all_outcomes_sorted[1].2) as final_outcome_value

SELECT

{",".join(extra_cols.keys())},
probe_analysis,
all_nok_outcomes as all_outcomes,
final_outcome_label as outcome_label,
final_outcome_value as outcome_value

FROM (
WITH
IF(resolver_asn = probe_asn, 1, 0) as is_isp_resolver,
multiIf(
top_dns_failure IN ('android_dns_cache_no_data', 'dns_nxdomain_error'),
'nxdomain',
coalesce(top_dns_failure, 'got_answer')
) as dns_failure
SELECT
{",".join(extra_cols.values())},

anyHeavy(top_probe_analysis) as probe_analysis,

sumMap(
map(
CONCAT(IF(is_isp_resolver, 'dns_isp.blocked.', 'dns_other.blocked.'), dns_failure), dns_blocked,
CONCAT(IF(is_isp_resolver, 'dns_isp.down.', 'dns_other.down.'), dns_failure), dns_down
)
) as dns_nok_outcomes,
sum(dns_ok) as dns_ok_sum,

sumMap(
map(
CONCAT('tcp.blocked.', coalesce(top_tcp_failure, '')), tcp_blocked,
CONCAT('tcp.down.', coalesce(top_tcp_failure, '')), tcp_down
)
) as tcp_nok_outcomes,
sum(tcp_ok) as tcp_ok_sum,

sumMap(
map(
CONCAT('tls.blocked.', coalesce(top_tls_failure, '')), tls_blocked,
CONCAT('tls.down.', coalesce(top_tls_failure, '')), tls_down
)
) as tls_nok_outcomes,
sum(tls_ok) as tls_ok_sum

FROM analysis_web_measurement
{where}
GROUP BY {", ".join(extra_cols.keys())}
ORDER BY {", ".join(extra_cols.keys())}
)
"""
q = format_aggregate_query(extra_cols, where)

t = time.perf_counter()
log.info(f"running query {q} with {q_args}")
rows = db.execute(q, q_args)

fixed_cols = ["probe_analysis", "all_outcomes", "outcome_label", "outcome_value"]
fixed_cols = [
"probe_analysis",
"count",
"dns_isp_blocked",
"dns_isp_down",
"dns_isp_ok",
"dns_other_blocked",
"dns_other_down",
"dns_other_ok",
"tls_blocked",
"tls_down",
"tls_ok",
"tcp_blocked",
"tcp_down",
"tcp_ok",
"dns_isp_outcome",
"dns_other_outcome",
"tcp_outcome",
"tls_outcome",
"most_likely_ok",
"most_likely_down",
"most_likely_blocked",
"most_likely_label",
]

results: List[AggregationEntry] = []
if rows and isinstance(rows, list):
for row in rows:
print(row)
d = dict(zip(list(extra_cols.keys()) + fixed_cols, row))
outcome_value = d["outcome_value"]
outcome_label = d["outcome_label"]
anomaly_count = 0
confirmed_count = 0
failure_count = 0
ok_count = 0
if outcome_label == "ok":
ok_count = outcome_value
elif "blocked." in outcome_label:
if outcome_value >= anomaly_sensitivity:
confirmed_count = outcome_value
else:
anomaly_count = outcome_value

# Map "down" to failures
else:
failure_count = outcome_value
loni = Loni(
dns_isp_blocked=d.get("dns_isp_blocked", 0.0),
dns_isp_down=d.get("dns_isp_down", 0.0),
dns_isp_ok=d.get("dns_isp_ok", 0.0),
dns_other_blocked=d.get("dns_other_blocked", 0.0),
dns_other_down=d.get("dns_other_down", 0.0),
dns_other_ok=d.get("dns_other_ok", 0.0),
tls_blocked=d.get("tls_blocked", 0.0),
tls_down=d.get("tls_down", 0.0),
tls_ok=d.get("tls_ok", 0.0),
tcp_blocked=d.get("tcp_blocked", 0.0),
tcp_down=d.get("tcp_down", 0.0),
tcp_ok=d.get("tcp_ok", 0.0),
dns_isp_outcome=d.get("dns_isp_outcome", ""),
dns_other_outcome=d.get("dns_other_outcome", ""),
tcp_outcome=d.get("tcp_outcome", ""),
tls_outcome=d.get("tls_outcome", ""),
)
outcome_label = d["most_likely_label"]
outcome_blocked = d["most_likely_blocked"]
outcome_down = d["most_likely_down"]
outcome_ok = d["most_likely_ok"]

entry = AggregationEntry(
anomaly_count=anomaly_count,
confirmed_count=confirmed_count,
failure_count=failure_count,
ok_count=ok_count,
measurement_count=1.0,
measurement_start_day=d["measurement_start_day"],
count=d["count"],
measurement_start_day=d.get("measurement_start_day"),
loni=loni,
outcome_label=outcome_label,
outcome_value=outcome_value,
outcome_blocked=outcome_blocked,
outcome_down=outcome_down,
outcome_ok=outcome_ok,
domain=d.get("domain"),
probe_cc=d.get("probe_cc"),
probe_asn=d.get("probe_asn"),
test_name=d.get("test_name"),
input=d.get("input"),
)
results.append(entry)
return AggregationResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ...common.dependencies import get_settings
from ...dependencies import get_clickhouse_session
from .utils import SinceUntil, test_name_to_group, utc_30_days_ago, utc_today
from .utils import SinceUntil, parse_probe_asn, test_name_to_group, utc_30_days_ago, utc_today

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,8 +92,7 @@ async def list_measurements(
q_args["measurement_uid"] = measurement_uid
and_clauses.append("measurement_uid = %(measurement_uid)s")
if probe_asn is not None:
if isinstance(probe_asn, str) and probe_asn.startswith("AS"):
probe_asn = int(probe_asn[2:])
probe_asn = parse_probe_asn(probe_asn)
q_args["probe_asn"] = probe_asn
and_clauses.append("probe_asn = %(probe_asn)d")
if probe_cc is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ...common.dependencies import get_settings
from ...dependencies import get_clickhouse_session
from .utils import parse_probe_asn

from fastapi import APIRouter

Expand Down Expand Up @@ -204,16 +205,15 @@ async def list_observations(
q_args["report_id"] = report_id
and_clauses.append("report_id = %(report_id)s")
if probe_asn is not None:
if isinstance(probe_asn, str) and probe_asn.startswith("AS"):
probe_asn = int(probe_asn[2:])
probe_asn = parse_probe_asn(probe_asn)
q_args["probe_asn"] = probe_asn
and_clauses.append("probe_asn = %(probe_asn)d")
if probe_cc is not None:
q_args["probe_cc"] = probe_cc
and_clauses.append("probe_cc = %(probe_cc)s")

if software_name is not None:
q_args["software_name"] = software_version
q_args["software_name"] = software_name
and_clauses.append("software_name = %(software_name)s")
if software_version is not None:
q_args["software_version"] = software_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def get_measurement_start_day_agg(
return f"toStartOfWeek({column_name})"
if time_grain == "month":
return f"toStartOfMonth({column_name})"
if time_grain == "year":
return f"toStartOfYear({column_name})"
return f"toStartOfDay({column_name})"


Expand Down Expand Up @@ -50,3 +52,11 @@ def test_name_to_group(tn):
return "websites"
# TODO(arturo): currently we only support websites
return ""


def parse_probe_asn(probe_asn):
if probe_asn.startswith("AS"):
probe_asn = probe_asn[2:]
if isinstance(probe_asn, str):
probe_asn = int(probe_asn)
return probe_asn
Loading
Loading