diff --git a/oonidata/src/oonidata/models/dataformats.py b/oonidata/src/oonidata/models/dataformats.py index a478180e..9de2a6db 100644 --- a/oonidata/src/oonidata/models/dataformats.py +++ b/oonidata/src/oonidata/models/dataformats.py @@ -284,6 +284,7 @@ class DNSQuery(BaseModel): dial_id: Optional[int] = None + @add_slots @dataclass class TCPConnectStatus(BaseModel): @@ -365,3 +366,42 @@ class NetworkEvent(BaseModel): # Deprecated fields dial_id: Optional[int] = None conn_id: Optional[int] = None + + +@add_slots +@dataclass +class OpenVPNHandshake(BaseModel): + handshake_time: float + endpoint: str + ip: str # we might want to make this optional, and scrub in favor of ASN/prefix + port: int + transport: str + provider: str + t0: float + t: float + openvpn_options: Optional[Dict[str, str]] = None + tags: Optional[List[str]] = None + transaction_id: Optional[str] = None + failure: Failure = None + +@add_slots +@dataclass +class OpenVPNPacket(BaseModel): + operation: str + opcode: str + id: int + payload_size: int + acks: Optional[List[int]] = None + send_attempts: Optional[int] = None + + +@add_slots +@dataclass +class OpenVPNNetworkEvent(BaseModel): + operation: str + stage: str + t: float + tags: Optional[List[str]] = None + packet: Optional[OpenVPNPacket] = None + transaction_id: Optional[int] = None + diff --git a/oonidata/src/oonidata/models/nettests/__init__.py b/oonidata/src/oonidata/models/nettests/__init__.py index 3a9a015e..db308494 100644 --- a/oonidata/src/oonidata/models/nettests/__init__.py +++ b/oonidata/src/oonidata/models/nettests/__init__.py @@ -13,6 +13,7 @@ from .whatsapp import Whatsapp from .http_invalid_request_line import HTTPInvalidRequestLine from .http_header_field_manipulation import HTTPHeaderFieldManipulation +from .openvpn import OpenVPN SUPPORTED_CLASSES = [ HTTPHeaderFieldManipulation, @@ -27,6 +28,7 @@ Signal, FacebookMessenger, Whatsapp, + OpenVPN, BaseMeasurement, ] SupportedDataformats = Union[ @@ -42,6 +44,7 @@ Signal, FacebookMessenger, Whatsapp, + OpenVPN, BaseMeasurement, ] diff --git a/oonidata/src/oonidata/models/nettests/openvpn.py b/oonidata/src/oonidata/models/nettests/openvpn.py new file mode 100644 index 00000000..a67254c9 --- /dev/null +++ b/oonidata/src/oonidata/models/nettests/openvpn.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import List, Optional + +from ..base import BaseModel + +from oonidata.compat import add_slots +from oonidata.models.dataformats import ( + BaseTestKeys, + Failure, + TCPConnect, + OpenVPNHandshake, + OpenVPNNetworkEvent, +) +from oonidata.models.nettests.base_measurement import BaseMeasurement + + +@add_slots +@dataclass +class OpenVPNTestKeys(BaseTestKeys): + success: Optional[bool] = False + failure: Failure = None + + network_events: Optional[List[OpenVPNNetworkEvent]] = None + tcp_connect: Optional[List[TCPConnect]] = None + openvpn_handshake: Optional[List[OpenVPNHandshake]] = None + + bootstrap_time: Optional[float] = None + tunnel: str = None + + +@add_slots +@dataclass +class OpenVPN(BaseMeasurement): + __test_name__ = "openvpn" + + test_keys: OpenVPNTestKeys diff --git a/oonidata/src/oonidata/models/observations.py b/oonidata/src/oonidata/models/observations.py index cce96c82..f9c43d1d 100644 --- a/oonidata/src/oonidata/models/observations.py +++ b/oonidata/src/oonidata/models/observations.py @@ -383,3 +383,88 @@ class HTTPMiddleboxObservation: hfm_diff: Optional[str] = None hfm_failure: Optional[str] = None hfm_success: Optional[bool] = None + + +@table_model( + table_name="obs_openvpn", + table_index=( + "measurement_start_time", + "measurement_uid", + "observation_idx", + ), +) +@dataclass +class OpenVPNObservation: + measurement_meta: MeasurementMeta + + probe_meta: ProbeMeta + + observation_idx: int = 0 + + created_at: Optional[datetime] = None + + timestamp: datetime = None + + # Fields added by the processor + + ip: str = "" + port: int = 0 + transport: str = "" + + success: bool = False + failure: Failure = None + + protocol: str = "" + variant: Optional[str] = None + + # TCP related observation + tcp_failure: Optional[Failure] = None + tcp_success: Optional[bool] = None + tcp_t: Optional[float] = None + + # OpenVPN handshake observation + openvpn_handshake_failure: Optional[Failure] = None + openvpn_handshake_t: Optional[float] = None + openvpn_handshake_t0: Optional[float] = None + openvpn_bootstrap_time: Optional[float] = None + + # timing info about the handshake packets + openvpn_handshake_hr_client_t: Optional[float] = None + openvpn_handshake_hr_server_t: Optional[float] = None + openvpn_handshake_clt_hello_t: Optional[float] = None + openvpn_handshake_srv_hello_t: Optional[float] = None + openvpn_handshake_key_exchg_n: Optional[int] = None + openvpn_handshake_got_keys__t: Optional[float] = None + openvpn_handshake_gen_keys__t: Optional[float] = None + + + + +@table_model( + table_name="obs_tunnel", + table_index= ("measurement_uid", "observation_idx", "measurement_start_time"), +) +@dataclass +class TunnelEndpointObservation: + measurement_meta: MeasurementMeta + probe_meta: ProbeMeta + + measurement_start_time: datetime + + ip: str + port: int + transport: str + + # definition of success will need to change when/if we're able to gather metrics + # through the tunnel. + success: bool + failure: Failure + + protocol: str + family: str + + # indicates obfuscation or modifications from the main protocol family. + variant: Optional[str] = None + + # any metadata about the providers behind the endpoints. + provider: Optional[str] = None diff --git a/oonipipeline/Design.md b/oonipipeline/Design.md index 82323d34..7411d508 100644 --- a/oonipipeline/Design.md +++ b/oonipipeline/Design.md @@ -12,11 +12,11 @@ needed. ### Expose a queriable low level view on measurements -Currently it's only possible to query measurement at a granuliaty which is as -fine a measurement. +Currently it's only possible to query measurement at a granularity which is as +fine as a measurement. This means that it's only possible to answer questions which the original -designer of the experiment had already throught of. +designer of the experiment had already thought of. On the other hand the new pipeline breaks down measurements into distinct observations (think 1 DNS query and answer or 1 TLS handshake towards a @@ -145,16 +145,17 @@ port combination. You can run the observation generation with a clickhouse backend like so: +TODO(art): check this is correct. + ``` -poetry run python -m oonidata mkobs --clickhouse clickhouse://localhost/ --data-dir tests/data/datadir/ --start-day 2022-08-01 --end-day 2022-10-01 --create-tables --parallelism 20 +hatch run oonipipeline --probe-cc US --test-name signal --workflow-name observations --start-at 2022-08-01 --end-at 2022-10-01 ``` Here is the list of supported observations so far: - [x] WebObservation, which has information about DNS, TCP, TLS and HTTP(s) - [x] WebControlObservation, has the control measurements run by web connectivity (is used to generate ground truths) -- [ ] CircumventionToolObservation, still needs to be designed and implemented - (ideally we would use the same for OpenVPN, Psiphon, VanillaTor) +- [x] OpenVPNObservation, with measurements run by the openvpn experiment. ### Response body archiving diff --git a/oonipipeline/Readme.md b/oonipipeline/Readme.md index b1a20b01..9bc42df3 100644 --- a/oonipipeline/Readme.md +++ b/oonipipeline/Readme.md @@ -8,7 +8,7 @@ For historical context, these are the major revisions: - `v1` - OONI Pipeline based on custom CLI scripts using mongodb as a backend. Used until ~2015. - `v2` - OONI Pipeline based on [luigi](https://luigi.readthedocs.io/en/stable/). Used until ~2017. - `v3` - OONI Pipeline based on [airflow](https://airflow.apache.org/). Used until ~2020. -- `v4` - OONI Pipeline basedon custom script and systemd units (aka fastpath). Currently in use in production. +- `v4` - OONI Pipeline based on custom script and systemd units (aka fastpath). Currently in use in production. - `v5` - Next generation OONI Pipeline. What this readme is relevant to. Expected to become in production by Q4 2024. ## Setup @@ -41,13 +41,19 @@ clickhouse server Workflows are started by first scheduling them and then triggering a backfill operation on them. When they are scheduled they will also run on a daily basis. + ``` -hatch run oonipipeline schedule --probe-cc US --test-name signal --create-tables +hatch run oonipipeline schedule --probe-cc US --test-name signal ``` You can then trigger the backfill operation like so: ``` -hatch run oonipipeline backfill --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 +hatch run oonipipeline backfill --create-tables --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 +``` + +If you need to re-create the database tables (because the schema has changed), you want to add the `--drop-tables` flag to the invocation: +``` +hatch run oonipipeline backfill --create-tables --drop-tables --probe-cc US --test-name signal --workflow-name observations --start-at 2024-01-01 --end-at 2024-02-01 ``` You will then need some workers to actually perform the task you backfilled, these can be started like so: diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py index 1e063762..060e8d25 100644 --- a/oonipipeline/src/oonipipeline/cli/commands.py +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -189,7 +189,7 @@ async def main(): @click.option( "--analysis/--no-analysis", default=True, - help="should we drop tables before creating them", + help="schedule analysis too", ) def schedule(probe_cc: List[str], test_name: List[str], analysis: bool): """ diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index ee8efb20..6c4a7476 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -28,6 +28,7 @@ WebControlObservation, WebObservation, HTTPMiddleboxObservation, + OpenVPNObservation, ) from .connections import ClickhouseConnection @@ -170,6 +171,7 @@ def format_create_query( table_models = [ WebObservation, WebControlObservation, + OpenVPNObservation, HTTPMiddleboxObservation, WebAnalysis, MeasurementExperimentResult, diff --git a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py index 30e668fd..25a1525d 100644 --- a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py +++ b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py @@ -24,6 +24,8 @@ NetworkEvent, TCPConnect, TLSHandshake, + OpenVPNHandshake, + OpenVPNNetworkEvent, maybe_binary_data_to_bytes, ) from oonidata.models.nettests.base_measurement import BaseMeasurement @@ -35,6 +37,7 @@ TCPObservation, TLSObservation, WebObservation, + OpenVPNObservation, ) from oonidata.datautils import ( InvalidCertificateChain, @@ -726,6 +729,70 @@ def make_measurement_meta(msmt: BaseMeasurement, bucket_date: str) -> Measuremen measurement_start_time=measurement_start_time, ) +def count_key_exchange_packets(network_events: List[OpenVPNNetworkEvent]) -> int: + """ + return number of packets exchanged in the SENT_KEY state + """ + n = 0 + for evt in network_events: + if evt.stage == "SENT_KEY" and evt.operation.startswith("packet_"): + n+=1 + return n + +def measurement_to_openvpn_observation( + msmt_meta: MeasurementMeta, + probe_meta: ProbeMeta, + netinfodb: NetinfoDB, + openvpn_h: OpenVPNHandshake, + tcp_connect: Optional[List[TCPConnect]], + network_events: Optional[List[OpenVPNNetworkEvent]], + bootstrap_time: float, +) -> OpenVPNObservation: + + oo = OpenVPNObservation( + measurement_meta=msmt_meta, + probe_meta=probe_meta, + failure=normalize_failure(openvpn_h.failure), + timestamp=make_timestamp(msmt_meta.measurement_start_time, openvpn_h.t), + success=openvpn_h.failure == None, + protocol="openvpn", + transport = openvpn_h.transport, + ip = openvpn_h.ip, + port = openvpn_h.port, + openvpn_bootstrap_time=bootstrap_time, + ) + + if len(tcp_connect) != 0: + tcp = tcp_connect[0] + oo.tcp_success = tcp.success + oo.tcp_failure = tcp.failure + oo.tcp_t = tcp.t + + oo.handshake_failure = openvpn_h.failure + oo.handshake_t = openvpn_h.t + oo.handshake_t0 = openvpn_h.t0 + + # TODO(ain): condition to test version >= xyz + if len(network_events) != 0: + for evt in network_events: + if evt.packet is not None: + if evt.packet.opcode == "P_CONTROL_HARD_RESET_CLIENT_V2": + oo.openvpn_handshake_hr_client_t = evt.t + elif evt.packet.opcode == "P_CONTROL_HARD_RESET_SERVER_V2": + oo.openvpn_handshake_hr_server_t = evt.t + elif "client_hello" in evt.tags: + oo.openvpn_handshake_clt_hello_t = evt.t + elif "server_hello" in evt.tags: + oo.openvpn_handshake_srv_hello_t = evt.t + if evt.operation == "state" and evt.stage == "GOT_KEY": + oo.openvpn_handshake_got_keys__t = evt.t + if evt.operation == "state" and evt.stage == "GENERATED_KEYS": + oo.openvpn_handshake_gen_keys__t = evt.t + + oo.openvpn_handshake_key_exchg_n = count_key_exchange_packets(network_events) + + return oo + class MeasurementTransformer: """ @@ -878,7 +945,7 @@ def consume_web_observations( It will attempt to map them via the transaction_id or ip:port tuple. - Any observation that cannot be mapped will be returned inside of it's + Any observation that cannot be mapped will be returned inside of its own WebObservation with all other columns set to None. """ web_obs_list: List[WebObservation] = [] @@ -977,5 +1044,39 @@ def consume_web_observations( return web_obs_list + def make_openvpn_observations(self, + tcp_observations: Optional[List[TCPConnect]], + openvpn_handshakes: Optional[List[OpenVPNHandshake]], + network_events: Optional[List[OpenVPNNetworkEvent]], + bootstrap_time: float, + ) -> List[OpenVPNObservation]: + """ + Returns a list of OpenVPNObservations by mapping all related + TCPObservations, OpenVPNNetworkevents and OpenVPNHandshakes. + """ + openvpn_obs_list: List[OpenVPNObservation] = [] + + for openvpn_handshake in openvpn_handshakes: + openvpn_obs_list.append( + measurement_to_openvpn_observation( + msmt_meta=self.measurement_meta, + probe_meta=self.probe_meta, + netinfodb=self.netinfodb, + tcp_connect=tcp_observations, + openvpn_h=openvpn_handshake, + network_events=network_events, + bootstrap_time=bootstrap_time, + ) + ) + + # TODO: can factor out function with web_observation + for idx, obs in enumerate(openvpn_obs_list): + obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}" + obs.created_at = datetime.now(timezone.utc).replace( + microsecond=0, tzinfo=None + ) + + return openvpn_obs_list + def make_observations(self, measurement): assert RuntimeError("make_observations is not implemented") diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py new file mode 100644 index 00000000..9904c295 --- /dev/null +++ b/oonipipeline/src/oonipipeline/transforms/nettests/openvpn.py @@ -0,0 +1,20 @@ +from typing import List, Tuple +from oonidata.models.nettests import OpenVPN +from oonidata.models.observations import OpenVPNObservation + +from ..measurement_transformer import MeasurementTransformer + + +class OpenVPNTransformer(MeasurementTransformer): + def make_observations(self, msmt: OpenVPN) -> Tuple[List[OpenVPNObservation]]: + if not msmt.test_keys: + return ([],) + + openvpn_obs = self.make_openvpn_observations( + tcp_observations=self.make_tcp_observations(msmt.test_keys.tcp_connect), + openvpn_handshakes=msmt.test_keys.openvpn_handshake, + network_events=msmt.test_keys.network_events, + bootstrap_time=msmt.test_keys.bootstrap_time, + ) + + return (openvpn_obs, ) diff --git a/oonipipeline/src/oonipipeline/transforms/observations.py b/oonipipeline/src/oonipipeline/transforms/observations.py index 5002c830..62ab0fbd 100644 --- a/oonipipeline/src/oonipipeline/transforms/observations.py +++ b/oonipipeline/src/oonipipeline/transforms/observations.py @@ -5,6 +5,7 @@ HTTPMiddleboxObservation, WebControlObservation, WebObservation, + OpenVPNObservation ) from oonidata.models.nettests import ( @@ -34,6 +35,7 @@ from .nettests.browser_web import BrowserWebTransformer from .nettests.urlgetter import UrlGetterTransformer from .nettests.web_connectivity import WebConnectivityTransformer +from .nettests.openvpn import OpenVPNTransformer from .nettests.http_invalid_request_line import ( HTTPInvalidRequestLineTransformer, ) @@ -53,12 +55,14 @@ "http_header_field_manipulation": HTTPHeaderFieldManipulationTransformer, "http_invalid_request_line": HTTPInvalidRequestLineTransformer, "web_connectivity": WebConnectivityTransformer, + "openvpn": OpenVPNTransformer, } TypeWebConnectivityObservations = Tuple[ List[WebObservation], List[WebControlObservation] ] TypeWebObservations = Tuple[List[WebObservation]] +TypeOpenVPNObservations = Tuple[List[OpenVPNObservation]] TypeHTTPMiddleboxObservations = Tuple[List[HTTPMiddleboxObservation]] @@ -107,6 +111,7 @@ def measurement_to_observations( TypeWebObservations, TypeWebConnectivityObservations, TypeHTTPMiddleboxObservations, + TypeOpenVPNObservations, Tuple[()], ]: if msmt.test_name in NETTEST_TRANSFORMERS: diff --git a/oonipipeline/tests/_fixtures.py b/oonipipeline/tests/_fixtures.py index 358dbe95..398d47db 100644 --- a/oonipipeline/tests/_fixtures.py +++ b/oonipipeline/tests/_fixtures.py @@ -38,6 +38,8 @@ "20240302000050.000654_SN_webconnectivity_fe4221088fbdcb0a", # nxdomain down "20240302000305.316064_EG_webconnectivity_397bca9091b07444", # nxdomain blocked, unknown_failure and from the future "20240309112858.009725_SE_webconnectivity_dce757ef4ec9b6c8", # blockpage for Iran in Sweden + "20240923234302.648951_FI_openvpn_714dd28ff412c1a5", # openvpn from Finland, tcp + "20240923234302.024724_FI_openvpn_515e6b6d9c0d832e", # openvpn from Finland, udp ] SAMPLE_POSTCANS = ["2024030100_AM_webconnectivity.n1.0.tar.gz"] diff --git a/oonipipeline/tests/test_transforms.py b/oonipipeline/tests/test_transforms.py index 5261360b..a07b3d9c 100644 --- a/oonipipeline/tests/test_transforms.py +++ b/oonipipeline/tests/test_transforms.py @@ -11,6 +11,7 @@ from oonidata.models.nettests.stun_reachability import StunReachability from oonidata.models.nettests.urlgetter import UrlGetter from oonidata.models.nettests.browser_web import BrowserWeb +from oonidata.models.nettests.openvpn import OpenVPN from oonidata.models.observations import WebObservation from oonipipeline.transforms.measurement_transformer import ( @@ -391,3 +392,51 @@ def test_facebook_messenger_obs(netinfodb, measurements): hostname_set.add(wo.hostname) assert hostname_set == spec_hostname_set assert len(web_obs) == 14 + +def test_openvpn_obs(netinfodb, measurements): + bucket_date = "2024-09-23" + + msmt_udp = load_measurement( + msmt_path=measurements[ + "20240923234302.024724_FI_openvpn_515e6b6d9c0d832e" + ] + ) + assert isinstance(msmt_udp, OpenVPN) + obs_tup_udp = measurement_to_observations( + msmt=msmt_udp, netinfodb=netinfodb, bucket_date=bucket_date + ) + assert len(obs_tup_udp) == 1 + oou: OpenVPNObservation = obs_tup_udp[0][0] + + assert oou.success is True + assert oou.transport == "udp" + assert oou.port == 1194 + assert oou.ip == "37.218.243.98" + assert oou.tcp_success is None + assert oou.openvpn_handshake_srv_hello_t == 0.175448177 + assert oou.openvpn_handshake_got_keys__t == 0.305975312 + assert oou.openvpn_handshake_gen_keys__t == 0.376011823 + assert oou.openvpn_bootstrap_time==0.376279583 + + msmt_tcp = load_measurement( + msmt_path=measurements[ + "20240923234302.648951_FI_openvpn_714dd28ff412c1a5" + ] + ) + assert isinstance(msmt_tcp, OpenVPN) + obs_tup_tcp = measurement_to_observations( + msmt=msmt_tcp, netinfodb=netinfodb, bucket_date=bucket_date + ) + assert len(obs_tup_tcp) == 1 + oot: OpenVPNObservation = obs_tup_tcp[0][0] + + assert oot.success is True + assert oot.transport == "tcp" + assert oot.port == 1194 + assert oot.ip == "37.218.243.98" + assert oot.tcp_success is True + assert oot.tcp_t == 0.053010729 + assert oot.openvpn_handshake_hr_client_t==0.05684776 + assert oot.openvpn_handshake_srv_hello_t==0.204483958 + assert oot.openvpn_handshake_gen_keys__t==0.571443906 + assert oot.openvpn_bootstrap_time==0.571501093