diff --git a/ooniapi/services/oonifindings/Makefile b/ooniapi/services/oonifindings/Makefile index 53fc7db3..ac9bd5fb 100644 --- a/ooniapi/services/oonifindings/Makefile +++ b/ooniapi/services/oonifindings/Makefile @@ -61,4 +61,7 @@ clean: run: hatch run uvicorn $(SERVICE_NAME).main:app +apidocs: + hatch run python -m oonifindings.mkapidocs apidocs.json + .PHONY: init test build clean docker print-labels diff --git a/ooniapi/services/oonifindings/pyproject.toml b/ooniapi/services/oonifindings/pyproject.toml index ac444d23..eb123e18 100644 --- a/ooniapi/services/oonifindings/pyproject.toml +++ b/ooniapi/services/oonifindings/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "psycopg2 ~= 2.9.9", "pyjwt ~= 2.8.0", "alembic ~= 1.13.1", + "python-dateutil ~= 2.9.0", "prometheus-fastapi-instrumentator ~= 6.1.0", "prometheus-client", ] @@ -65,6 +66,8 @@ dependencies = [ "black", "pytest-asyncio", "pytest-postgresql", + "pytest-docker", + "requests" ] path = ".venv/" diff --git a/ooniapi/services/oonifindings/src/oonifindings/dependencies.py b/ooniapi/services/oonifindings/src/oonifindings/dependencies.py index 8e23f796..21eff244 100644 --- a/ooniapi/services/oonifindings/src/oonifindings/dependencies.py +++ b/ooniapi/services/oonifindings/src/oonifindings/dependencies.py @@ -1,5 +1,7 @@ from typing import Annotated +from clickhouse_driver import Client as Clickhouse + from fastapi import Depends from sqlalchemy import create_engine @@ -18,3 +20,11 @@ def get_postgresql_session(settings: Annotated[Settings, Depends(get_settings)]) yield db finally: db.close() + + +def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]): + db = Clickhouse.from_url(settings.clickhouse_url) + try: + yield db + finally: + db.disconnect() diff --git a/ooniapi/services/oonifindings/src/oonifindings/main.py b/ooniapi/services/oonifindings/src/oonifindings/main.py index 935b469e..4511c4f5 100644 --- a/ooniapi/services/oonifindings/src/oonifindings/main.py +++ b/ooniapi/services/oonifindings/src/oonifindings/main.py @@ -10,6 +10,12 @@ from . import models from .routers import v1 +from .routers.data import ( + list_analysis, + list_observations, + aggregate_observations, + aggregate_analysis, +) from .dependencies import get_settings, get_postgresql_session from .common.version import get_build_label, get_pkg_version @@ -46,6 +52,10 @@ async def lifespan(app: FastAPI): ) app.include_router(v1.router, prefix="/api") +app.include_router(list_analysis.router, prefix="/api") +app.include_router(list_observations.router, prefix="/api") +app.include_router(aggregate_observations.router, prefix="/api") +app.include_router(aggregate_analysis.router, prefix="/api") @app.get("/version") diff --git a/ooniapi/services/oonifindings/src/oonifindings/mkapidocs.py b/ooniapi/services/oonifindings/src/oonifindings/mkapidocs.py new file mode 100644 index 00000000..f809bdae --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/mkapidocs.py @@ -0,0 +1,13 @@ +import sys +import json + +from fastapi.openapi.utils import get_openapi +from .main import app +from .__about__ import VERSION + +if __name__ == "__main__": + openapi = get_openapi(title="OONI Findings", version=VERSION, routes=app.routes) + assert len(sys.argv) == 2, "must specify outfile file" + with open(sys.argv[1], "w") as out_file: + out_file.write(json.dumps(openapi)) + out_file.write("\n") diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/__init__.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_analysis.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_analysis.py new file mode 100644 index 00000000..4c16a4e2 --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_analysis.py @@ -0,0 +1,305 @@ +import time +from datetime import date, datetime, timedelta, timezone +from typing import List, Literal, Optional, Union, Dict +from typing_extensions import Annotated +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel + +from .utils import get_measurement_start_day_agg, TimeGrains +from ...dependencies import ( + get_clickhouse_session, +) +from .list_analysis import ( + SinceUntil, + utc_30_days_ago, + utc_today, +) + +import logging + +from fastapi import APIRouter + +router = APIRouter() + +log = logging.getLogger(__name__) + + +AggregationKeys = Literal[ + "measurement_start_day", + "domain", + "probe_cc", + "probe_asn", + "test_name", +] + + +class DBStats(BaseModel): + bytes: int + elapsed_seconds: float + row_count: int + total_row_count: int + + +class AggregationEntry(BaseModel): + anomaly_count: float + confirmed_count: float + failure_count: float + ok_count: float + measurement_count: float + + measurement_start_day: date + outcome_label: str + outcome_value: float + + domain: Optional[str] = None + probe_cc: Optional[str] = None + probe_asn: Optional[int] = None + + +class AggregationResponse(BaseModel): + # TODO(arturo): these keys are inconsistent with the other APIs + db_stats: DBStats + dimension_count: int + results: List[AggregationEntry] + + +@router.get("/v1/aggregation/analysis", tags=["aggregation", "analysis"]) +async def get_aggregation_analysis( + axis_x: Annotated[AggregationKeys, Query()] = "measurement_start_day", + axis_y: Annotated[Optional[AggregationKeys], Query()] = None, + category_code: Annotated[Optional[str], Query()] = None, + test_name: Annotated[Optional[str], Query()] = None, + domain: Annotated[Optional[str], Query()] = None, + input: Annotated[Optional[str], Query()] = None, + probe_asn: Annotated[Union[int, str, None], Query()] = None, + probe_cc: Annotated[Optional[str], Query(min_length=2, max_length=2)] = None, + ooni_run_link_id: Annotated[Optional[str], Query()] = None, + since: SinceUntil = utc_30_days_ago(), + until: SinceUntil = utc_today(), + time_grain: Annotated[TimeGrains, Query()] = "day", + anomaly_sensitivity: Annotated[float, Query()] = 0.9, + format: Annotated[Literal["JSON", "CSV"], Query()] = "JSON", + download: Annotated[bool, Query()] = False, + db=Depends(get_clickhouse_session), +) -> AggregationResponse: + q_args = {} + and_clauses = [] + extra_cols = {} + dimension_count = 1 + if axis_x == "measurement_start_day": + # TODO(arturo): wouldn't it be nicer if we dropped the time_grain + # argument and instead used axis_x IN (measurement_start_day, + # measurement_start_hour, ..)? + extra_cols["measurement_start_day"] = ( + f"{get_measurement_start_day_agg(time_grain)} as measurement_start_day" + ) + elif axis_x: + extra_cols[axis_x] = axis_x + + if probe_asn is not None: + if isinstance(probe_asn, str) and probe_asn.startswith("AS"): + probe_asn = int(probe_asn[2:]) + q_args["probe_asn"] = probe_asn + and_clauses.append("probe_asn = %(probe_asn)d") + extra_cols["probe_asn"] = "probe_asn" + if probe_cc is not None: + q_args["probe_cc"] = probe_cc + and_clauses.append("probe_cc = %(probe_cc)s") + extra_cols["probe_cc"] = "probe_cc" + if test_name is not None: + q_args["test_name"] = test_name + and_clauses.append("test_name = %(test_name)s") + extra_cols["test_name"] = "test_name" + if ooni_run_link_id is not None: + q_args["ooni_run_link_id"] = ooni_run_link_id + and_clauses.append("%(ooni_run_link_id)s") + extra_cols["ooni_run_link_id"] = "ooni_run_link_id" + if domain is not None: + q_args["domain"] = domain + and_clauses.append("domain = %(domain)s") + extra_cols["domain"] = "domain" + if input is not None: + q_args["input"] = input + and_clauses.append("input = %(input)s") + extra_cols["input"] = "input" + + if axis_y: + dimension_count += 1 + if axis_y == "measurement_start_day": + # TODO(arturo): wouldn't it be nicer if we dropped the time_grain + # argument and instead used axis_x IN (measurement_start_day, + # measurement_start_hour, ..)? + extra_cols["measurement_start_day"] = ( + f"{get_measurement_start_day_agg(time_grain)} as measurement_start_day" + ) + else: + extra_cols[axis_y] = axis_y + + if since is not None: + q_args["since"] = since + and_clauses.append("measurement_start_time >= %(since)s") + if until is not None: + and_clauses.append("measurement_start_time <= %(until)s") + q_args["until"] = until + + where = "" + if len(and_clauses) > 0: + where += " WHERE " + where += " AND ".join(and_clauses) + + q = f""" + WITH + mapFilter((k, v) -> v != 0, dns_nok_outcomes) as dns_outcomes, + mapFilter((k, v) -> v != 0, tcp_nok_outcomes) as tcp_outcomes, + mapFilter((k, v) -> v != 0, tls_nok_outcomes) as tls_outcomes, + + arrayZip(mapKeys(dns_outcomes), mapValues(dns_outcomes)) as dns_outcome_list, + arraySum((v) -> v.2, dns_outcome_list) as dns_nok_sum, + arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/dns_nok_sum), dns_outcome_list)) as dns_outcomes_norm, + + arrayZip(mapKeys(tcp_outcomes), mapValues(tcp_outcomes)) as tcp_outcome_list, + arraySum((v) -> v.2, tcp_outcome_list) as tcp_nok_sum, + arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tcp_nok_sum), tcp_outcome_list)) as tcp_outcomes_norm, + + arrayZip(mapKeys(tls_outcomes), mapValues(tls_outcomes)) as tls_outcome_list, + arraySum((v) -> v.2, tls_outcome_list) as tls_nok_sum, + arraySort((v) -> -v.2, arrayMap((v) -> (v.1, v.2/tls_nok_sum), tls_outcome_list)) as tls_outcomes_norm, + + arraySort( + (v) -> -v.2, + [ + (dns_outcome_nok_label, dns_outcome_nok_value), + (tcp_outcome_nok_label, tcp_outcome_nok_value), + (tls_outcome_nok_label, tls_outcome_nok_value), + IF( + tls_ok_sum = 0 AND tls_outcome_nok_value = 0, + -- Special case for when the tested target was not supporting HTTPS and hence the TLS outcome is not so relevant + ('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value])), + ('ok', arrayMin([dns_outcome_ok_value, tcp_outcome_ok_value, tls_outcome_ok_value])) + ) + ] + ) as all_outcomes_sorted, + + arrayConcat(dns_outcomes_norm, tcp_outcomes_norm, tls_outcomes_norm) as all_nok_outcomes, + + dns_outcomes_norm[1].1 as dns_outcome_nok_label, + dns_outcomes_norm[1].2 as dns_outcome_nok_value, + + tcp_outcomes_norm[1].1 as tcp_outcome_nok_label, + tcp_outcomes_norm[1].2 as tcp_outcome_nok_value, + + tls_outcomes_norm[1].1 as tls_outcome_nok_label, + tls_outcomes_norm[1].2 as tls_outcome_nok_value, + + IF(dns_ok_sum > 0, 1 - dns_outcome_nok_value, 0) as dns_outcome_ok_value, + IF(tcp_ok_sum > 0, 1 - tcp_outcome_nok_value, 0) as tcp_outcome_ok_value, + IF(tls_ok_sum > 0, 1 - tls_outcome_nok_value, 0) as tls_outcome_ok_value, + + all_outcomes_sorted[1].1 as final_outcome_label, + IF(final_outcome_label = 'ok', all_outcomes_sorted[1].2, all_outcomes_sorted[1].2) as final_outcome_value + + SELECT + + {",".join(extra_cols.keys())}, + probe_analysis, + all_nok_outcomes as all_outcomes, + final_outcome_label as outcome_label, + final_outcome_value as outcome_value + + FROM ( + WITH + IF(resolver_asn = probe_asn, 1, 0) as is_isp_resolver, + multiIf( + top_dns_failure IN ('android_dns_cache_no_data', 'dns_nxdomain_error'), + 'nxdomain', + coalesce(top_dns_failure, 'got_answer') + ) as dns_failure + SELECT + {",".join(extra_cols.values())}, + + anyHeavy(top_probe_analysis) as probe_analysis, + + sumMap( + map( + CONCAT(IF(is_isp_resolver, 'dns_isp.blocked.', 'dns_other.blocked.'), dns_failure), dns_blocked, + CONCAT(IF(is_isp_resolver, 'dns_isp.down.', 'dns_other.down.'), dns_failure), dns_down + ) + ) as dns_nok_outcomes, + sum(dns_ok) as dns_ok_sum, + + sumMap( + map( + CONCAT('tcp.blocked.', coalesce(top_tcp_failure, '')), tcp_blocked, + CONCAT('tcp.down.', coalesce(top_tcp_failure, '')), tcp_down + ) + ) as tcp_nok_outcomes, + sum(tcp_ok) as tcp_ok_sum, + + sumMap( + map( + CONCAT('tls.blocked.', coalesce(top_tls_failure, '')), tls_blocked, + CONCAT('tls.down.', coalesce(top_tls_failure, '')), tls_down + ) + ) as tls_nok_outcomes, + sum(tls_ok) as tls_ok_sum + + FROM analysis_web_measurement + {where} + GROUP BY {", ".join(extra_cols.keys())} + ORDER BY {", ".join(extra_cols.keys())} + ) + """ + + t = time.perf_counter() + log.info(f"running query {q} with {q_args}") + rows = db.execute(q, q_args) + + fixed_cols = ["probe_analysis", "all_outcomes", "outcome_label", "outcome_value"] + + results: List[AggregationEntry] = [] + if rows and isinstance(rows, list): + for row in rows: + print(row) + d = dict(zip(list(extra_cols.keys()) + fixed_cols, row)) + outcome_value = d["outcome_value"] + outcome_label = d["outcome_label"] + anomaly_count = 0 + confirmed_count = 0 + failure_count = 0 + ok_count = 0 + if outcome_label == "ok": + ok_count = outcome_value + elif "blocked." in outcome_label: + if outcome_value >= anomaly_sensitivity: + confirmed_count = outcome_value + else: + anomaly_count = outcome_value + + # Map "down" to failures + else: + failure_count = outcome_value + + entry = AggregationEntry( + anomaly_count=anomaly_count, + confirmed_count=confirmed_count, + failure_count=failure_count, + ok_count=ok_count, + measurement_count=1.0, + measurement_start_day=d["measurement_start_day"], + outcome_label=outcome_label, + outcome_value=outcome_value, + domain=d.get("domain"), + probe_cc=d.get("probe_cc"), + probe_asn=d.get("probe_asn"), + ) + results.append(entry) + return AggregationResponse( + db_stats=DBStats( + bytes=-1, + elapsed_seconds=time.perf_counter() - t, + row_count=len(results), + total_row_count=len(results), + ), + dimension_count=dimension_count, + results=results, + ) diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_observations.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_observations.py new file mode 100644 index 00000000..353dbb65 --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/routers/data/aggregate_observations.py @@ -0,0 +1,177 @@ +from datetime import date, datetime, timedelta, timezone +from typing import Any, List, Literal, Optional, Union, Dict +from typing_extensions import Annotated +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel + + +from ...dependencies import get_clickhouse_session +from .utils import ( + SinceUntil, + get_measurement_start_day_agg, + TimeGrains, + utc_30_days_ago, + utc_today, +) + +from fastapi import APIRouter + +router = APIRouter() + +import logging + +log = logging.getLogger(__name__) + + +class AggregationEntry(BaseModel): + observation_count: int + failure: Optional[str] = None + ip: Optional[str] = None + + hostname: Optional[str] = None + probe_cc: Optional[str] = None + probe_asn: Optional[int] = None + test_name: Optional[str] = None + timestamp: Optional[datetime] = None + + +class AggregationResponse(BaseModel): + results: List[AggregationEntry] + + +AggregationKeys = Literal[ + "timestamp", + "failure", + "hostname", + "ip", + "probe_cc", + "probe_asn", + "test_name", +] + + +@router.get( + "/v1/aggregation/observations", + response_model_exclude_none=True, + tags=["aggregation", "observations"], +) +async def get_aggregation_observations( + group_by: Annotated[List[AggregationKeys], Query()] = [ + "failure", + ], + test_name: Annotated[List[str] | None, Query()] = None, + hostname: Annotated[List[str] | None, Query()] = None, + probe_asn: Annotated[List[int] | None, Query()] = None, + probe_cc: Annotated[List[str] | None, Query()] = None, + ip: Annotated[List[str] | None, Query()] = None, + ooni_run_link_id: Annotated[Optional[str], Query()] = None, + since: SinceUntil = utc_30_days_ago(), + until: SinceUntil = utc_today(), + time_grain: Annotated[TimeGrains, Query()] = "day", + db=Depends(get_clickhouse_session), +) -> AggregationResponse: + timestamp_str = get_measurement_start_day_agg(time_grain) + column_keys = ["observation_count"] + columns = [] + and_list = [] + order_by = ["obs_count"] + params_filter: Dict[str, Any] = {"since": since, "until": until} + selected_columns = "" + group_by_str = "" + order_by_str = "" + and_str = "" + + if len(order_by) > 0: + order_by_str = "ORDER BY " + ",".join(order_by) + " DESC" + + if probe_cc: + and_list.append(f"probe_cc IN %(probe_cc)s") + params_filter["probe_cc"] = probe_cc + group_by.append("probe_cc") + columns.append("probe_cc") + column_keys.append("probe_cc") + if probe_asn: + and_list.append(f"probe_asn IN %(probe_asn)s") + params_filter["probe_asn"] = probe_asn + group_by.append("probe_asn") + columns.append("probe_asn") + column_keys.append("probe_asn") + if hostname: + and_list.append(f"hostname IN %(hostname)s") + params_filter["hostname"] = hostname + group_by.append("hostname") + columns.append("hostname") + column_keys.append("hostname") + if test_name: + and_list.append(f"test_name IN %(test_name)s") + params_filter["test_name"] = test_name + group_by.append("test_name") + columns.append("test_name") + column_keys.append("test_name") + if ip: + and_list.append(f"ip IN %(ip)s") + params_filter["ip"] = ip + group_by.append("ip") + columns.append("ip") + column_keys.append("ip") + + if "timestamp" in group_by: + columns.append(f"{timestamp_str} as timestamp") + column_keys.append("timestamp") + order_by = ["timestamp"] + order_by + + if "failure" in group_by: + columns.append( + f"""multiIf( +dns_failure IS NOT NULL, +CONCAT('dns_', dns_failure), +tcp_failure IS NOT NULL, +CONCAT('tcp_', tcp_failure), +multiIf( +tls_handshake_last_operation = 'write_1', +CONCAT('tls_', tls_failure, '_after_CH'), +tls_failure IS NULL AND tls_server_name IS NOT NULL, +'none', +CONCAT('tls_', tls_failure) +) +) as failure +""" + ) + column_keys.append("failure") + and_list.append( + # We exclude observations where we only have the HTTP/HTTPS observations + """(dns_failure IS NOT NULL +OR tcp_failure IS NOT NULL +OR tls_failure IS NOT NULL +OR tls_server_name IS NOT NULL +) +""" + ) + + for column in group_by: + if column not in column_keys: + columns.append(column) + column_keys.append(column) + + selected_columns = ",".join(columns) + if len(and_list) > 0: + and_str = "AND " + "AND ".join(and_list) + + group_by_str = "GROUP BY " + ",".join(group_by) + + query = f""" + SELECT +COUNT() as obs_count, +{selected_columns} +FROM obs_web +WHERE measurement_start_time > %(since)s +AND measurement_start_time < %(until)s +{and_str} +{group_by_str} +{order_by_str} +""" + entries = [] + for row in db.execute_iter(query, params_filter): + d = dict(zip(column_keys, row)) + entries.append(AggregationEntry(**d)) + return AggregationResponse(results=entries) diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_analysis.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_analysis.py new file mode 100644 index 00000000..e0d56910 --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_analysis.py @@ -0,0 +1,146 @@ +from datetime import date, datetime, timedelta, timezone +import logging +import math +import time +from typing import List, Literal, Optional, Union +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel +from pydantic.functional_validators import AfterValidator +from typing_extensions import Annotated + +from ...common.dependencies import get_settings +from ...dependencies import get_clickhouse_session +from .utils import SinceUntil, test_name_to_group, utc_30_days_ago, utc_today + +log = logging.getLogger(__name__) + +from fastapi import APIRouter + +router = APIRouter() + + +class ResponseMetadata(BaseModel): + count: int + current_page: int + limit: int + next_url: str + offset: int + pages: int + query_time: float + + +class AnalysisEntry(BaseModel): + measurement_uid: str + measurement_start_time: datetime + network_type: str + probe_asn: int + probe_cc: str + probe_as_org_name: str + resolver_asn: int + resolver_as_cc: str + domain: str + input: str + test_name: str + top_probe_analysis: Optional[str] + top_dns_failure: Optional[str] + top_tcp_failure: Optional[str] + top_tls_failure: Optional[str] + dns_blocked: float + dns_down: float + dns_ok: float + tcp_blocked: float + tcp_down: float + tcp_ok: float + tls_blocked: float + tls_down: float + tls_ok: float + + +class ListAnalysisResponse(BaseModel): + metadata: ResponseMetadata + results: List[AnalysisEntry] + + +@router.get("/v1/analysis", tags=["analysis", "list_data"]) +async def list_measurements( + measurement_uid: Annotated[Optional[str], Query()] = None, + probe_asn: Annotated[Union[int, str, None], Query()] = None, + probe_cc: Annotated[Optional[str], Query(max_length=2, min_length=2)] = None, + test_name: Annotated[Optional[str], Query()] = None, + since: SinceUntil = utc_30_days_ago(), + until: SinceUntil = utc_today(), + order_by: Annotated[ + Literal[ + "measurement_start_time", + "input", + "probe_cc", + "probe_asn", + "test_name", + ], + Query(), + ] = "measurement_start_time", + order: Annotated[Optional[Literal["asc", "desc", "ASC", "DESC"]], Query()] = "DESC", + offset: Annotated[int, Query()] = 0, + limit: Annotated[int, Query()] = 100, + ooni_run_link_id: Annotated[Optional[str], Query()] = None, + db=Depends(get_clickhouse_session), + settings=Depends(get_settings), +) -> ListAnalysisResponse: + q_args = {} + and_clauses = [] + if measurement_uid is not None: + q_args["measurement_uid"] = measurement_uid + and_clauses.append("measurement_uid = %(measurement_uid)s") + if probe_asn is not None: + if isinstance(probe_asn, str) and probe_asn.startswith("AS"): + probe_asn = int(probe_asn[2:]) + q_args["probe_asn"] = probe_asn + and_clauses.append("probe_asn = %(probe_asn)d") + if probe_cc is not None: + q_args["probe_cc"] = probe_cc + and_clauses.append("probe_cc = %(probe_cc)s") + if test_name is not None: + q_args["test_name"] = test_name + and_clauses.append("test_name = %(test_name)s") + if ooni_run_link_id is not None: + q_args["ooni_run_link_id"] = ooni_run_link_id + and_clauses.append("ooni_run_link_id = %(ooni_run_link_id)s") + + if since is not None: + q_args["since"] = since + and_clauses.append("measurement_start_time >= %(since)s") + if until is not None: + and_clauses.append("measurement_start_time <= %(until)s") + q_args["until"] = until + + cols = list(AnalysisEntry.model_json_schema()["properties"].keys()) + q = f"SELECT {','.join(cols)} FROM analysis_web_measurement" + if len(and_clauses) > 0: + q += " WHERE " + q += " AND ".join(and_clauses) + q += f" ORDER BY {order_by} {order} LIMIT {limit} OFFSET {offset}" + + t = time.perf_counter() + log.info(f"running query {q} with {q_args}") + rows = db.execute(q, q_args) + + results: List[AnalysisEntry] = [] + if rows and isinstance(rows, list): + for row in rows: + d = dict(zip(cols, row)) + results.append(AnalysisEntry(**d)) + + response = ListAnalysisResponse( + metadata=ResponseMetadata( + count=-1, + current_page=math.ceil(offset / limit) + 1, + limit=limit, + next_url=f"{settings.base_url}/api/v1/analysis?offset=100&limit=100", + offset=offset, + pages=-1, + query_time=time.perf_counter() - t, + ), + results=results, + ) + + return response diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_observations.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_observations.py new file mode 100644 index 00000000..63f17a78 --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/routers/data/list_observations.py @@ -0,0 +1,272 @@ +import dataclasses +import time +from datetime import date, datetime, timedelta +import math +from typing import List, Literal, Optional, Union +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel +from typing_extensions import Annotated + +from ...common.dependencies import get_settings +from ...dependencies import get_clickhouse_session + +from fastapi import APIRouter + +router = APIRouter() + + +class ResponseMetadata(BaseModel): + count: int + current_page: int + limit: int + next_url: str + offset: int + pages: int + query_time: float + + +class WebObservationEntry(BaseModel): + measurement_uid: str + input: Optional[str] + report_id: str + ooni_run_link_id: str + + measurement_start_time: datetime + + software_name: str + software_version: str + test_name: str + test_version: str + + bucket_date: str + + probe_asn: int + probe_cc: str + + probe_as_org_name: str + probe_as_cc: str + probe_as_name: str + + network_type: str + platform: str + origin: str + engine_name: str + engine_version: str + architecture: str + + resolver_ip: str + resolver_asn: int + resolver_cc: str + resolver_as_org_name: str + resolver_as_cc: str + + observation_idx: int = 0 + created_at: Optional[datetime] = None + + target_id: Optional[str] = None + hostname: Optional[str] = None + + transaction_id: Optional[int] = None + + ip: Optional[str] = None + port: Optional[int] = None + + ip_asn: Optional[int] = None + ip_as_org_name: Optional[str] = None + ip_as_cc: Optional[str] = None + ip_cc: Optional[str] = None + ip_is_bogon: Optional[bool] = None + + # DNS related observation + dns_query_type: Optional[str] = None + dns_failure: Optional[str] = None + dns_engine: Optional[str] = None + dns_engine_resolver_address: Optional[str] = None + + dns_answer_type: Optional[str] = None + dns_answer: Optional[str] = None + # These should match those in the IP field, but are the annotations coming + # from the probe + dns_answer_asn: Optional[int] = None + dns_answer_as_org_name: Optional[str] = None + dns_t: Optional[float] = None + + # TCP related observation + tcp_failure: Optional[str] = None + tcp_success: Optional[bool] = None + tcp_t: Optional[float] = None + + # TLS related observation + tls_failure: Optional[str] = None + + tls_server_name: Optional[str] = None + tls_outer_server_name: Optional[str] = None + tls_echconfig: Optional[str] = None + tls_version: Optional[str] = None + tls_cipher_suite: Optional[str] = None + tls_is_certificate_valid: Optional[bool] = None + + tls_end_entity_certificate_fingerprint: Optional[str] = None + tls_end_entity_certificate_subject: Optional[str] = None + tls_end_entity_certificate_subject_common_name: Optional[str] = None + tls_end_entity_certificate_issuer: Optional[str] = None + tls_end_entity_certificate_issuer_common_name: Optional[str] = None + tls_end_entity_certificate_san_list: List[str] = dataclasses.field( + default_factory=list + ) + tls_end_entity_certificate_not_valid_after: Optional[datetime] = None + tls_end_entity_certificate_not_valid_before: Optional[datetime] = None + tls_certificate_chain_length: Optional[int] = None + tls_certificate_chain_fingerprints: List[str] = dataclasses.field( + default_factory=list + ) + + tls_handshake_read_count: Optional[int] = None + tls_handshake_write_count: Optional[int] = None + tls_handshake_read_bytes: Optional[float] = None + tls_handshake_write_bytes: Optional[float] = None + tls_handshake_last_operation: Optional[str] = None + tls_handshake_time: Optional[float] = None + tls_t: Optional[float] = None + + # HTTP related observation + http_request_url: Optional[str] = None + + http_network: Optional[str] = None + http_alpn: Optional[str] = None + + http_failure: Optional[str] = None + + http_request_body_length: Optional[int] = None + http_request_method: Optional[str] = None + + http_runtime: Optional[float] = None + + http_response_body_length: Optional[int] = None + http_response_body_is_truncated: Optional[bool] = None + http_response_body_sha1: Optional[str] = None + + http_response_status_code: Optional[int] = None + http_response_header_location: Optional[str] = None + http_response_header_server: Optional[str] = None + http_request_redirect_from: Optional[str] = None + http_request_body_is_truncated: Optional[bool] = None + http_t: Optional[float] = None + + probe_analysis: Optional[str] = None + + +ObservationEntry = Union[WebObservationEntry, BaseModel] + + +class ListObservationsResponse(BaseModel): + metadata: ResponseMetadata + results: List[ObservationEntry] + + +@router.get("/v1/observations", tags=["observations", "list_data"]) +async def list_observations( + report_id: Annotated[Optional[str], Query()] = None, + probe_asn: Annotated[Union[int, str, None], Query()] = None, + probe_cc: Annotated[Optional[str], Query(max_length=2, min_length=2)] = None, + test_name: Annotated[Optional[str], Query()] = None, + since: Annotated[Optional[date], Query()] = None, + until: Annotated[Optional[date], Query()] = None, + order_by: Annotated[ + Literal[ + "measurement_start_time", + "input", + "probe_cc", + "probe_asn", + "test_name", + ], + Query(), + ] = "measurement_start_time", + order: Annotated[Optional[Literal["asc", "desc", "ASC", "DESC"]], Query()] = "DESC", + offset: Annotated[int, Query()] = 0, + limit: Annotated[int, Query()] = 100, + software_name: Annotated[Optional[str], Query()] = None, + software_version: Annotated[Optional[str], Query()] = None, + test_version: Annotated[Optional[str], Query()] = None, + engine_version: Annotated[Optional[str], Query()] = None, + ooni_run_link_id: Annotated[Optional[str], Query()] = None, + db=Depends(get_clickhouse_session), + settings=Depends(get_settings), +) -> ListObservationsResponse: + if since is None: + since = date.today() - timedelta(days=7) + if until is None: + until = date.today() + + q_args = {} + and_clauses = [] + if report_id is not None: + q_args["report_id"] = report_id + and_clauses.append("report_id = %(report_id)s") + if probe_asn is not None: + if isinstance(probe_asn, str) and probe_asn.startswith("AS"): + probe_asn = int(probe_asn[2:]) + q_args["probe_asn"] = probe_asn + and_clauses.append("probe_asn = %(probe_asn)d") + if probe_cc is not None: + q_args["probe_cc"] = probe_cc + and_clauses.append("probe_cc = %(probe_cc)s") + + if software_name is not None: + q_args["software_name"] = software_version + and_clauses.append("software_name = %(software_name)s") + if software_version is not None: + q_args["software_version"] = software_version + and_clauses.append("software_version = %(software_version)s") + + if test_name is not None: + q_args["test_name"] = test_name + and_clauses.append("test_name = %(test_name)s") + if test_version is not None: + q_args["test_version"] = test_version + and_clauses.append("test_version = %(test_version)s") + if engine_version is not None: + q_args["engine_version"] = engine_version + and_clauses.append("engine_version = %(engine_version)s") + + if ooni_run_link_id is not None: + q_args["ooni_run_link_id"] = ooni_run_link_id + and_clauses.append("ooni_run_link_id = %(ooni_run_link_id)s") + + if since is not None: + q_args["since"] = since + and_clauses.append("measurement_start_time >= %(since)s") + if until is not None: + and_clauses.append("measurement_start_time <= %(until)s") + q_args["until"] = until + + cols = list(WebObservationEntry.model_json_schema()["properties"].keys()) + q = f"SELECT {','.join(cols)} FROM obs_web" + if len(and_clauses) > 0: + q += " WHERE " + q += " AND ".join(and_clauses) + q += f" ORDER BY {order_by} {order} LIMIT {limit} OFFSET {offset}" + + t = time.perf_counter() + rows = db.execute(q, q_args) + + results: List[ObservationEntry] = [] + if rows and isinstance(rows, list): + for row in rows: + d = dict(zip(cols, row)) + results.append(WebObservationEntry(**d)) + + response = ListObservationsResponse( + metadata=ResponseMetadata( + count=-1, + current_page=math.ceil(offset / limit) + 1, + limit=limit, + next_url=f"{settings.base_url}/api/v1/observations?offset={offset+limit}&limit={limit}", + offset=offset, + pages=-1, + query_time=time.perf_counter() - t, + ), + results=results, + ) + + return response diff --git a/ooniapi/services/oonifindings/src/oonifindings/routers/data/utils.py b/ooniapi/services/oonifindings/src/oonifindings/routers/data/utils.py new file mode 100644 index 00000000..714f1363 --- /dev/null +++ b/ooniapi/services/oonifindings/src/oonifindings/routers/data/utils.py @@ -0,0 +1,52 @@ +from datetime import datetime, timedelta, timezone +from typing import Annotated, Literal, Union + +from fastapi import Query +from pydantic import AfterValidator + + +TimeGrains = Literal["hour", "day", "week", "month", "year", "auto"] + + +def get_measurement_start_day_agg( + time_grain: TimeGrains, column_name: str = "measurement_start_time" +): + if time_grain == "hour": + return f"toStartOfHour({column_name})" + if time_grain == "day": + return f"toStartOfDay({column_name})" + if time_grain == "week": + return f"toStartOfWeek({column_name})" + if time_grain == "month": + return f"toStartOfMonth({column_name})" + return f"toStartOfDay({column_name})" + + +def parse_date(d: Union[datetime, str]) -> datetime: + from dateutil.parser import parse as parse_date + + if isinstance(d, str): + return parse_date(d) + return d + + +SinceUntil = Annotated[Union[str, datetime], AfterValidator(parse_date), Query()] + + +def utc_30_days_ago(): + return datetime.combine( + datetime.now(timezone.utc) - timedelta(days=30), datetime.min.time() + ).replace(tzinfo=None) + + +def utc_today(): + return datetime.combine(datetime.now(timezone.utc), datetime.min.time()).replace( + tzinfo=None + ) + + +def test_name_to_group(tn): + if tn in ("web_connectivity", "http_requests"): + return "websites" + # TODO(arturo): currently we only support websites + return "" diff --git a/ooniapi/services/oonifindings/tests/conftest.py b/ooniapi/services/oonifindings/tests/conftest.py index 5b217d07..5afd31b8 100644 --- a/ooniapi/services/oonifindings/tests/conftest.py +++ b/ooniapi/services/oonifindings/tests/conftest.py @@ -1,9 +1,12 @@ from pathlib import Path -import pytest - import time + +import pytest +import requests import jwt +from clickhouse_driver import Client as ClickhouseClient + from fastapi.testclient import TestClient from oonifindings.common.config import Settings @@ -14,6 +17,47 @@ THIS_DIR = Path(__file__).parent.resolve() +def get_file_path(file_path: str): + return Path(__file__).parent / file_path + + +@pytest.fixture(scope="session") +def maybe_download_fixtures(): + base_url = "https://ooni-data-eu-fra.s3.eu-central-1.amazonaws.com/samples/" + filenames = [ + "analysis_web_measurement-sample.sql.gz", + "obs_web-sample.sql.gz", + ] + for fn in filenames: + dst_path = get_file_path(f"fixtures/{fn}") + if dst_path.exists(): + continue + url = base_url + fn + print(f"Downloading {url} to {dst_path}") + r = requests.get(url) + dst_path.write_bytes(r.content) + + +def is_clickhouse_running(url): + try: + with ClickhouseClient.from_url(url) as client: + client.execute("SELECT 1") + return True + except Exception: + return False + + +@pytest.fixture(scope="session") +def clickhouse_server(maybe_download_fixtures, docker_ip, docker_services): + """Ensure that HTTP service is up and responsive.""" + port = docker_services.port_for("clickhouse", 9000) + url = "clickhouse://{}:{}/default".format(docker_ip, port) + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) + ) + yield url + + def make_override_get_settings(**kw): def override_get_settings(): return Settings(**kw) @@ -51,12 +95,13 @@ def client_with_bad_settings(): @pytest.fixture -def client(alembic_migration): +def client(alembic_migration, clickhouse_server): app.dependency_overrides[get_settings] = make_override_get_settings( postgresql_url=alembic_migration, + clickhouse_url=clickhouse_server, jwt_encryption_key="super_secure", prometheus_metrics_password="super_secure", - account_id_hashing_key="super_secure" + account_id_hashing_key="super_secure", ) client = TestClient(app) @@ -100,7 +145,7 @@ def client_with_admin_role(client): @pytest.fixture def client_with_hashed_email(client): - + def _hashed_email(email: str, role: str): client = TestClient(app) account_id = hash_email_address(email, "super_secure") diff --git a/ooniapi/services/oonifindings/tests/docker-compose.yml b/ooniapi/services/oonifindings/tests/docker-compose.yml new file mode 100644 index 00000000..bf383642 --- /dev/null +++ b/ooniapi/services/oonifindings/tests/docker-compose.yml @@ -0,0 +1,9 @@ +version: '2' +services: + clickhouse: + image: "clickhouse/clickhouse-server" + ports: + - "9000" + volumes: + - ./fixtures:/fixtures + - ./fixtures/initdb:/docker-entrypoint-initdb.d/ diff --git a/ooniapi/services/oonifindings/tests/fixtures/.gitignore b/ooniapi/services/oonifindings/tests/fixtures/.gitignore new file mode 100644 index 00000000..10d00b57 --- /dev/null +++ b/ooniapi/services/oonifindings/tests/fixtures/.gitignore @@ -0,0 +1 @@ +*.gz diff --git a/ooniapi/services/oonifindings/tests/fixtures/initdb/clickhouse.sql b/ooniapi/services/oonifindings/tests/fixtures/initdb/clickhouse.sql new file mode 100644 index 00000000..45ec5503 --- /dev/null +++ b/ooniapi/services/oonifindings/tests/fixtures/initdb/clickhouse.sql @@ -0,0 +1,141 @@ +CREATE TABLE + analysis_web_measurement ( + `domain` String, + `input` String, + `test_name` String, + `probe_asn` UInt32, + `probe_as_org_name` String, + `probe_cc` String, + `resolver_asn` UInt32, + `resolver_as_cc` String, + `network_type` String, + `measurement_start_time` DateTime64 (3, 'UTC'), + `measurement_uid` String, + `ooni_run_link_id` String, + `top_probe_analysis` Nullable (String), + `top_dns_failure` Nullable (String), + `top_tcp_failure` Nullable (String), + `top_tls_failure` Nullable (String), + `dns_blocked` Float32, + `dns_down` Float32, + `dns_ok` Float32, + `tcp_blocked` Float32, + `tcp_down` Float32, + `tcp_ok` Float32, + `tls_blocked` Float32, + `tls_down` Float32, + `tls_ok` Float32 + ) ENGINE = ReplacingMergeTree PRIMARY KEY measurement_uid +ORDER BY + ( + measurement_uid, + measurement_start_time, + probe_cc, + probe_asn + ) SETTINGS index_granularity = 8192; + +CREATE TABLE + obs_web ( + `measurement_uid` String, + `observation_idx` UInt16, + `input` Nullable (String), + `report_id` String, + `ooni_run_link_id` String DEFAULT '', + `measurement_start_time` DateTime64 (3, 'UTC'), + `software_name` String, + `software_version` String, + `test_name` String, + `test_version` String, + `bucket_date` String, + `probe_asn` UInt32, + `probe_cc` String, + `probe_as_org_name` String, + `probe_as_cc` String, + `probe_as_name` String, + `network_type` String, + `platform` String, + `origin` String, + `engine_name` String, + `engine_version` String, + `architecture` String, + `resolver_ip` String, + `resolver_asn` UInt32, + `resolver_cc` String, + `resolver_as_org_name` String, + `resolver_as_cc` String, + `resolver_is_scrubbed` UInt8, + `resolver_asn_probe` UInt32, + `resolver_as_org_name_probe` String, + `created_at` Nullable (DateTime ('UTC')), + `target_id` Nullable (String), + `hostname` Nullable (String), + `transaction_id` Nullable (UInt16), + `ip` Nullable (String), + `port` Nullable (UInt16), + `ip_asn` Nullable (UInt32), + `ip_as_org_name` Nullable (String), + `ip_as_cc` Nullable (String), + `ip_cc` Nullable (String), + `ip_is_bogon` Nullable (UInt8), + `dns_query_type` Nullable (String), + `dns_failure` Nullable (String), + `dns_engine` Nullable (String), + `dns_engine_resolver_address` Nullable (String), + `dns_answer_type` Nullable (String), + `dns_answer` Nullable (String), + `dns_answer_asn` Nullable (UInt32), + `dns_answer_as_org_name` Nullable (String), + `dns_t` Nullable (Float64), + `tcp_failure` Nullable (String), + `tcp_success` Nullable (UInt8), + `tcp_t` Nullable (Float64), + `tls_failure` Nullable (String), + `tls_server_name` Nullable (String), + `tls_outer_server_name` Nullable (String), + `tls_echconfig` Nullable (String), + `tls_version` Nullable (String), + `tls_cipher_suite` Nullable (String), + `tls_is_certificate_valid` Nullable (UInt8), + `tls_end_entity_certificate_fingerprint` Nullable (String), + `tls_end_entity_certificate_subject` Nullable (String), + `tls_end_entity_certificate_subject_common_name` Nullable (String), + `tls_end_entity_certificate_issuer` Nullable (String), + `tls_end_entity_certificate_issuer_common_name` Nullable (String), + `tls_end_entity_certificate_san_list` Array (String), + `tls_end_entity_certificate_not_valid_after` Nullable (DateTime64 (3, 'UTC')), + `tls_end_entity_certificate_not_valid_before` Nullable (DateTime64 (3, 'UTC')), + `tls_certificate_chain_length` Nullable (UInt16), + `tls_certificate_chain_fingerprints` Array (String), + `tls_handshake_read_count` Nullable (UInt16), + `tls_handshake_write_count` Nullable (UInt16), + `tls_handshake_read_bytes` Nullable (UInt32), + `tls_handshake_write_bytes` Nullable (UInt32), + `tls_handshake_last_operation` Nullable (String), + `tls_handshake_time` Nullable (Float64), + `tls_t` Nullable (Float64), + `http_request_url` Nullable (String), + `http_network` Nullable (String), + `http_alpn` Nullable (String), + `http_failure` Nullable (String), + `http_request_body_length` Nullable (UInt32), + `http_request_method` Nullable (String), + `http_runtime` Nullable (Float64), + `http_response_body_length` Nullable (Int32), + `http_response_body_is_truncated` Nullable (UInt8), + `http_response_body_sha1` Nullable (String), + `http_response_status_code` Nullable (UInt16), + `http_response_header_location` Nullable (String), + `http_response_header_server` Nullable (String), + `http_request_redirect_from` Nullable (String), + `http_request_body_is_truncated` Nullable (UInt8), + `http_t` Nullable (Float64), + `probe_analysis` Nullable (String) + ) ENGINE = ReplacingMergeTree PRIMARY KEY (measurement_uid, observation_idx) +ORDER BY + ( + measurement_uid, + observation_idx, + measurement_start_time, + probe_cc, + probe_asn + ) SETTINGS index_granularity = 8192; \ No newline at end of file diff --git a/ooniapi/services/oonifindings/tests/fixtures/initdb/init.sh b/ooniapi/services/oonifindings/tests/fixtures/initdb/init.sh new file mode 100755 index 00000000..eec93b0f --- /dev/null +++ b/ooniapi/services/oonifindings/tests/fixtures/initdb/init.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +gzip -dc /fixtures/obs_web-sample.sql.gz | clickhouse-client +gzip -dc /fixtures/analysis_web_measurement-sample.sql.gz | clickhouse-client \ No newline at end of file diff --git a/ooniapi/services/oonifindings/tests/test_oonidata.py b/ooniapi/services/oonifindings/tests/test_oonidata.py new file mode 100644 index 00000000..54246155 --- /dev/null +++ b/ooniapi/services/oonifindings/tests/test_oonidata.py @@ -0,0 +1,33 @@ +def test_oonidata(client): + r = client.get("api/v1/observations?since=2024-11-01&until=2024-11-02") + j = r.json() + assert isinstance(j["results"], list), j + assert len(j["results"]) > 0 + for result in j["results"]: + assert "test_name" in result, result + assert "probe_cc" in result, result + + r = client.get("api/v1/analysis?since=2024-11-01&until=2024-11-02&probe_cc=IT") + j = r.json() + assert isinstance(j["results"], list), j + assert len(j["results"]) > 0 + for result in j["results"]: + assert result["probe_cc"] == "IT" + + r = client.get( + "api/v1/aggregation/analysis?since=2024-11-01&until=2024-11-10&probe_cc=IR" + ) + j = r.json() + assert isinstance(j["results"], list), j + assert len(j["results"]) > 0 + for result in j["results"]: + assert result["probe_cc"] == "IR" + + r = client.get( + "api/v1/aggregation/observations?since=2024-11-01&until=2024-11-10&probe_cc=IT" + ) + j = r.json() + assert isinstance(j["results"], list), j + assert len(j["results"]) > 0 + for result in j["results"]: + assert result["probe_cc"] == "IT" diff --git a/ooniapi/services/oonirun/Makefile b/ooniapi/services/oonirun/Makefile index 454bd919..614af81d 100644 --- a/ooniapi/services/oonirun/Makefile +++ b/ooniapi/services/oonirun/Makefile @@ -66,5 +66,7 @@ db-migration: docker-build -e OONI_PG_URL=$(shell aws secretsmanager get-secret-value --secret-id oonidevops/ooni-tier0-postgres/postgresql_url | jq .SecretString) \ ${IMAGE_NAME}:${BUILD_LABEL} \ alembic upgrade head +apidocs: + hatch run python -m oonirun.mkapidocs apidocs.json .PHONY: init test build clean docker print-labels diff --git a/ooniapi/services/oonirun/src/oonirun/mkapidocs.py b/ooniapi/services/oonirun/src/oonirun/mkapidocs.py new file mode 100644 index 00000000..003df07e --- /dev/null +++ b/ooniapi/services/oonirun/src/oonirun/mkapidocs.py @@ -0,0 +1,13 @@ +import sys +import json + +from fastapi.openapi.utils import get_openapi +from .main import app +from .__about__ import VERSION + +if __name__ == "__main__": + openapi = get_openapi(title="OONI Run", version=VERSION, routes=app.routes) + assert len(sys.argv) == 2, "must specify outfile file" + with open(sys.argv[1], "w") as out_file: + out_file.write(json.dumps(openapi)) + out_file.write("\n")