Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge ooni/data API and new measurements service #908

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor(oonimeasurements): redesign measurements service (#895)
* init measurements refactor

* refactor: measurement partial

* Remove the SQLAlchemy models for fastpath tables

* Align the queries to be closer to original implementation

* Rename migration files so that they are applied in correct order

* Fix typing of measurement_uid

* Add more debug info when migrations fail

* Start fixing some of the tests

* Use random port for clickhouse

* reformat

* More progress on fixing broken tests

* Fix all the tests

* Remove duplicate test_measurements from tests

---------

Co-authored-by: Arturo Filastò <arturo@filasto.net>
  • Loading branch information
DecFox and hellais authored Jan 8, 2025
commit 14aa82f06e348f43d13ef2ab2293bf8c39806f9c
4 changes: 4 additions & 0 deletions ooniapi/common/src/common/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from clickhouse_sqlalchemy import get_declarative_base


Base = get_declarative_base()
1 change: 1 addition & 0 deletions ooniapi/common/src/common/routers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date, datetime
from typing import Union
from pydantic import BaseModel as PydandicBaseModel
from pydantic import ConfigDict

2 changes: 2 additions & 0 deletions ooniapi/common/src/common/utils.py
Original file line number Diff line number Diff line change
@@ -51,6 +51,8 @@ def commasplit(p: str) -> List[str]:
def convert_to_csv(r) -> str:
"""Convert aggregation result dict/list to CSV"""
csvf = StringIO()
if len(r) == 0:
return ""
if isinstance(r, dict):
# 0-dimensional data
fieldnames = sorted(r.keys())
1 change: 1 addition & 0 deletions ooniapi/services/oonimeasurements/pyproject.toml
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ dependencies = [
"fastapi ~= 0.108.0",
"psycopg2 ~= 2.9.5",
"clickhouse-driver ~= 0.2.6",
"clickhouse-sqlalchemy ~= 0.3.2",
"sqlalchemy ~= 2.0.27",
"ujson ~= 5.9.0",
"urllib3 ~= 2.1.0",
Original file line number Diff line number Diff line change
@@ -7,9 +7,10 @@
from .common.config import Settings
from .common.dependencies import get_settings


def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]):
db = Clickhouse.from_url(settings.clickhouse_url)
try:
yield db
finally:
finally:
db.disconnect()
Original file line number Diff line number Diff line change
@@ -8,7 +8,8 @@

from prometheus_fastapi_instrumentator import Instrumentator

from .routers import aggregation, measurements
from .routers.v1 import aggregation
from .routers.v1 import measurements

from .dependencies import get_clickhouse_session
from .common.dependencies import get_settings
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""
Aggregation API
The routes are mounted under /api
"""

from datetime import datetime, timedelta, date
from datetime import datetime, timedelta, date, timezone
from typing import List, Any, Dict, Optional, Union
import logging

from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from typing_extensions import Annotated

from clickhouse_driver import Client as ClickhouseClient
@@ -20,8 +18,8 @@

from oonimeasurements.common.clickhouse_utils import query_click, query_click_one_row
from oonimeasurements.common.utils import jerror, commasplit, convert_to_csv
from ..dependencies import get_clickhouse_session

from ...dependencies import get_clickhouse_session
from ...common.routers import BaseModel

router = APIRouter()

@@ -116,7 +114,7 @@ class AggregationResult(BaseModel):
failure_count: int
ok_count: int
measurement_count: int
measurement_start_day: Optional[date] = None
measurement_start_day: Optional[str] = None
blocking_type: Optional[str] = None
category_code: Optional[str] = None
domain: Optional[str] = None
@@ -134,8 +132,9 @@ class MeasurementAggregation(BaseModel):


@router.get(
"/v1/aggregation",
response_model_exclude_none=True
"/v1/aggregation",
response_model_exclude_none=True,
response_model=MeasurementAggregation,
)
async def get_measurements(
response: Response,
@@ -247,7 +246,9 @@ async def get_measurements(
int(i[2:]) if i.startswith("AS") else i for i in commasplit(probe_asn)
]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid ASN value in parameter probe_asn")
raise HTTPException(
status_code=400, detail="Invalid ASN value in parameter probe_asn"
)

probe_cc_s = []
if probe_cc:
@@ -342,12 +343,16 @@ async def get_measurements(
group_by: List = []
try:
if axis_x == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_x:
add_axis(axis_x, cols, colnames, group_by)

if axis_y == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_y:
add_axis(axis_y, cols, colnames, group_by)

@@ -372,7 +377,17 @@ async def get_measurements(

try:
if dimension_cnt > 0:
r: Any = list(query_click(db, query, query_params, query_prio=4))
str_format = "%Y-%m-%d"
if time_grain == "hour":
str_format = "%Y-%m-%dT%H:%M:%SZ"
r: Any = []
for row in query_click(db, query, query_params, query_prio=4):
## Handle the difference in formatting between hourly and daily measurement_start_day
if "measurement_start_day" in row:
row["measurement_start_day"] = row[
"measurement_start_day"
].strftime(str_format)
r.append(row)
else:
r = query_click_one_row(db, query, query_params, query_prio=4)

@@ -410,7 +425,8 @@ async def get_measurements(
elapsed_seconds=pq.elapsed,
),
result=r,
).model_dump(exclude_none=True)
)

except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=str(e))
Loading