From fe64d036d9a4a4a5ea61bd3cc81c73d3396fe0ec Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Wed, 18 Dec 2024 14:01:06 -0800 Subject: [PATCH] it works --- .../manual_jobs/scrub_users_from_eap_spans.py | 59 ++++ .../test_scrub_users_from_eap_spans.py | 304 ++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 snuba/manual_jobs/scrub_users_from_eap_spans.py create mode 100644 tests/manual_jobs/test_scrub_users_from_eap_spans.py diff --git a/snuba/manual_jobs/scrub_users_from_eap_spans.py b/snuba/manual_jobs/scrub_users_from_eap_spans.py new file mode 100644 index 00000000000..c2466d77109 --- /dev/null +++ b/snuba/manual_jobs/scrub_users_from_eap_spans.py @@ -0,0 +1,59 @@ +from datetime import datetime +from typing import Any, Mapping, Optional + +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import Job, JobLogger, JobSpec + +_IP_PREFIX = "ip:" +_SCRUBBED = "scrubbed" + + +class ScrubUserFromEAPSpans(Job): + def __init__(self, job_spec: JobSpec) -> None: + self.__validate_job_params(job_spec.params) + super().__init__(job_spec) + + def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None: + assert params + assert isinstance(params["organization_ids"], list) + assert all([isinstance(p, int) for p in params["organization_ids"]]) + self._organization_ids = params["organization_ids"] + self._start_datetime = datetime.fromisoformat(params["start_datetime"]) + self._end_datetime = datetime.fromisoformat(params["end_datetime"]) + + def _get_query(self, cluster_name: str | None) -> str: + organization_ids = ",".join([str(p) for p in self._organization_ids]) + start_datetime = self._start_datetime.isoformat() + end_datetime = self._end_datetime.isoformat() + on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else "" + return f"""ALTER TABLE eap_spans_2_local +{on_cluster} +UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, '{_IP_PREFIX}'), concat( + '{_IP_PREFIX}', + if( + isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4)), + '{_SCRUBBED}', + substring(v, 4) + ) + ), v)), `attr_str_2`) +WHERE organization_id IN [{organization_ids}] +AND _sort_timestamp >= toDateTime('{start_datetime}') +AND _sort_timestamp < toDateTime('{end_datetime}')""" + + def execute(self, logger: JobLogger) -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + storage_node = cluster.get_local_nodes()[0] + connection = cluster.get_node_connection( + ClickhouseClientSettings.CLEANUP, storage_node + ) + if not cluster.is_single_node(): + cluster_name = cluster.get_clickhouse_cluster_name() + else: + cluster_name = None + query = self._get_query(cluster_name) + logger.info("Executing query: {query}") + result = connection.execute(query=query, settings={"mutations_sync": 2}) + + logger.info("complete") + logger.info(repr(result)) diff --git a/tests/manual_jobs/test_scrub_users_from_eap_spans.py b/tests/manual_jobs/test_scrub_users_from_eap_spans.py new file mode 100644 index 00000000000..4b04ee06d95 --- /dev/null +++ b/tests/manual_jobs/test_scrub_users_from_eap_spans.py @@ -0,0 +1,304 @@ +import random +import uuid +from datetime import datetime, timedelta +from typing import Any, Mapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + Column, + TraceItemColumnValues, + TraceItemTableRequest, + TraceItemTableResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + ResponseMeta, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_status import JobStatus +from snuba.manual_jobs.runner import get_job_status, run_job +from snuba.manual_jobs.scrub_users_from_eap_spans import ScrubUserFromEAPSpans +from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable +from tests.helpers import write_raw_unprocessed_events + +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" +_USER = "192.168.0.45" + + +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test_basic() -> None: + job_id = "abc" + run_job( + JobSpec( + job_id, + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [1, 3, 5, 6], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ) + ) + + assert get_job_status(job_id) == JobStatus.FINISHED + + +@pytest.mark.parametrize( + ("jobspec"), + [ + JobSpec( + "abc", + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [1, "b"], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ), + JobSpec( + "abc", + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [1, 2], + "start_datetime": "2024-12-01 00:00:0", + "end_datetime": "2024-12-10 00:00:00", + }, + ), + JobSpec( + "abc", + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [1, 2], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:0", + }, + ), + ], +) +@pytest.mark.redis_db +def test_fail_validation(jobspec: JobSpec) -> None: + with pytest.raises(Exception): + run_job(jobspec) + + +@pytest.mark.redis_db +def test_generate_query() -> None: + job = ScrubUserFromEAPSpans( + JobSpec( + "bassa", + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [1, 3, 5, 6], + "start_datetime": "2024-12-01 00:00:00", + "end_datetime": "2024-12-10 00:00:00", + }, + ) + ) + assert ( + job._get_query(None) + == """ALTER TABLE eap_spans_2_local + +UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, 'ip:'), concat( + 'ip:', + if( + isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4)), + 'scrubbed', + substring(v, 4) + ) + ), v)), `attr_str_2`) +WHERE organization_id IN [1,3,5,6] +AND _sort_timestamp >= toDateTime('2024-12-01T00:00:00') +AND _sort_timestamp < toDateTime('2024-12-10T00:00:00')""" + ) + + +def _gen_message( + dt: datetime, + organization_id: int, + measurements: dict[str, dict[str, float]] | None = None, + tags: dict[str, str] | None = None, +) -> Mapping[str, Any]: + measurements = measurements or {} + tags = tags or {} + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "sentry.release": _RELEASE_TAG, + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + "my.float.field": 101.2, + "my.int.field": 2000, + "my.neg.field": -100, + "my.neg.float.field": -101.2, + "my.true.bool.field": True, + "my.false.bool.field": False, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "eap.measurement": {"value": random.choice([1, 100, 1000])}, + **measurements, + }, + "organization_id": organization_id, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "release": _RELEASE_TAG, + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:192.168.0.45", + }, + "span_id": "123456781234567D", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "user.ip": "192.168.0.45", + "user": "ip:192.168.0.45", + "spans_over_limit": "False", + "server_name": "blah", + "color": random.choice(["red", "green", "blue"]), + "location": random.choice(["mobile", "frontend", "backend"]), + **tags, + }, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +def _generate_request( + ts: Any, hour_ago: int, organization_id: int, project_ids: list[int] +) -> TraceItemTableRequest: + # project_ids is added as an argument to avoid this query getting cached + return TraceItemTableRequest( + meta=RequestMeta( + project_ids=project_ids, + organization_id=organization_id, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + filter=TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color") + ) + ), + columns=[Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user"))], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user") + ) + ) + ], + ) + + +def _generate_expected_response(user: str) -> TraceItemTableResponse: + return TraceItemTableResponse( + column_values=[ + TraceItemColumnValues( + attribute_name="user", + results=[AttributeValue(val_str="ip:" + user) for _ in range(20)], + ) + ], + page_token=PageToken(offset=20), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +def test_span_is_scrubbed() -> None: + BASE_TIME = datetime.utcnow().replace( + minute=0, second=0, microsecond=0 + ) - timedelta(minutes=180) + organization_ids = [0, 1] + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + _gen_message(BASE_TIME - timedelta(minutes=i), organization_id) + for organization_id in organization_ids + for i in range(20) + ] + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + # we inserted spans for organizations 0, 1, 2, and we make sure they look as expected + ts = Timestamp(seconds=int(BASE_TIME.timestamp())) + hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) + for organization_id in organization_ids: + + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_id, [1, 2, 3]) + ) + assert response == _generate_expected_response(_USER) + + # next we scrub organizations 0 + start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds) + end_datetime = datetime.utcfromtimestamp(ts.seconds) + + run_job( + JobSpec( + "plswork", + "ScrubUserFromEAPSpans", + False, + { + "organization_ids": [organization_ids[0]], + "start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"), + "end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"), + }, + ) + ) + + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_ids[0], [3, 2, 1]) + ) + assert response == _generate_expected_response("scrubbed") + + # then we make sure organization 1 is NOT SCRUBBED + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_ids[1], [3, 2, 1]) + ) + assert response == _generate_expected_response(_USER)