-
-
Notifications
You must be signed in to change notification settings - Fork 63
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Rachel Chen
authored and
Rachel Chen
committed
Dec 18, 2024
1 parent
c847fb4
commit fe64d03
Showing
2 changed files
with
363 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from datetime import datetime | ||
from typing import Any, Mapping, Optional | ||
|
||
from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster | ||
from snuba.clusters.storage_sets import StorageSetKey | ||
from snuba.manual_jobs import Job, JobLogger, JobSpec | ||
|
||
_IP_PREFIX = "ip:" | ||
_SCRUBBED = "scrubbed" | ||
|
||
|
||
class ScrubUserFromEAPSpans(Job): | ||
def __init__(self, job_spec: JobSpec) -> None: | ||
self.__validate_job_params(job_spec.params) | ||
super().__init__(job_spec) | ||
|
||
def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None: | ||
assert params | ||
assert isinstance(params["organization_ids"], list) | ||
assert all([isinstance(p, int) for p in params["organization_ids"]]) | ||
self._organization_ids = params["organization_ids"] | ||
self._start_datetime = datetime.fromisoformat(params["start_datetime"]) | ||
self._end_datetime = datetime.fromisoformat(params["end_datetime"]) | ||
|
||
def _get_query(self, cluster_name: str | None) -> str: | ||
organization_ids = ",".join([str(p) for p in self._organization_ids]) | ||
start_datetime = self._start_datetime.isoformat() | ||
end_datetime = self._end_datetime.isoformat() | ||
on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else "" | ||
return f"""ALTER TABLE eap_spans_2_local | ||
{on_cluster} | ||
UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, '{_IP_PREFIX}'), concat( | ||
'{_IP_PREFIX}', | ||
if( | ||
isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4)), | ||
'{_SCRUBBED}', | ||
substring(v, 4) | ||
) | ||
), v)), `attr_str_2`) | ||
WHERE organization_id IN [{organization_ids}] | ||
AND _sort_timestamp >= toDateTime('{start_datetime}') | ||
AND _sort_timestamp < toDateTime('{end_datetime}')""" | ||
|
||
def execute(self, logger: JobLogger) -> None: | ||
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) | ||
storage_node = cluster.get_local_nodes()[0] | ||
connection = cluster.get_node_connection( | ||
ClickhouseClientSettings.CLEANUP, storage_node | ||
) | ||
if not cluster.is_single_node(): | ||
cluster_name = cluster.get_clickhouse_cluster_name() | ||
else: | ||
cluster_name = None | ||
query = self._get_query(cluster_name) | ||
logger.info("Executing query: {query}") | ||
result = connection.execute(query=query, settings={"mutations_sync": 2}) | ||
|
||
logger.info("complete") | ||
logger.info(repr(result)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,304 @@ | ||
import random | ||
import uuid | ||
from datetime import datetime, timedelta | ||
from typing import Any, Mapping | ||
|
||
import pytest | ||
from google.protobuf.timestamp_pb2 import Timestamp | ||
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( | ||
Column, | ||
TraceItemColumnValues, | ||
TraceItemTableRequest, | ||
TraceItemTableResponse, | ||
) | ||
from sentry_protos.snuba.v1.request_common_pb2 import ( | ||
PageToken, | ||
RequestMeta, | ||
ResponseMeta, | ||
) | ||
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue | ||
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter | ||
|
||
from snuba.datasets.storages.factory import get_storage | ||
from snuba.datasets.storages.storage_key import StorageKey | ||
from snuba.manual_jobs import JobSpec | ||
from snuba.manual_jobs.job_status import JobStatus | ||
from snuba.manual_jobs.runner import get_job_status, run_job | ||
from snuba.manual_jobs.scrub_users_from_eap_spans import ScrubUserFromEAPSpans | ||
from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable | ||
from tests.helpers import write_raw_unprocessed_events | ||
|
||
_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" | ||
_USER = "192.168.0.45" | ||
|
||
|
||
@pytest.mark.redis_db | ||
@pytest.mark.clickhouse_db | ||
def test_basic() -> None: | ||
job_id = "abc" | ||
run_job( | ||
JobSpec( | ||
job_id, | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [1, 3, 5, 6], | ||
"start_datetime": "2024-12-01 00:00:00", | ||
"end_datetime": "2024-12-10 00:00:00", | ||
}, | ||
) | ||
) | ||
|
||
assert get_job_status(job_id) == JobStatus.FINISHED | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("jobspec"), | ||
[ | ||
JobSpec( | ||
"abc", | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [1, "b"], | ||
"start_datetime": "2024-12-01 00:00:00", | ||
"end_datetime": "2024-12-10 00:00:00", | ||
}, | ||
), | ||
JobSpec( | ||
"abc", | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [1, 2], | ||
"start_datetime": "2024-12-01 00:00:0", | ||
"end_datetime": "2024-12-10 00:00:00", | ||
}, | ||
), | ||
JobSpec( | ||
"abc", | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [1, 2], | ||
"start_datetime": "2024-12-01 00:00:00", | ||
"end_datetime": "2024-12-10 00:00:0", | ||
}, | ||
), | ||
], | ||
) | ||
@pytest.mark.redis_db | ||
def test_fail_validation(jobspec: JobSpec) -> None: | ||
with pytest.raises(Exception): | ||
run_job(jobspec) | ||
|
||
|
||
@pytest.mark.redis_db | ||
def test_generate_query() -> None: | ||
job = ScrubUserFromEAPSpans( | ||
JobSpec( | ||
"bassa", | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [1, 3, 5, 6], | ||
"start_datetime": "2024-12-01 00:00:00", | ||
"end_datetime": "2024-12-10 00:00:00", | ||
}, | ||
) | ||
) | ||
assert ( | ||
job._get_query(None) | ||
== """ALTER TABLE eap_spans_2_local | ||
UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, 'ip:'), concat( | ||
'ip:', | ||
if( | ||
isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4)), | ||
'scrubbed', | ||
substring(v, 4) | ||
) | ||
), v)), `attr_str_2`) | ||
WHERE organization_id IN [1,3,5,6] | ||
AND _sort_timestamp >= toDateTime('2024-12-01T00:00:00') | ||
AND _sort_timestamp < toDateTime('2024-12-10T00:00:00')""" | ||
) | ||
|
||
|
||
def _gen_message( | ||
dt: datetime, | ||
organization_id: int, | ||
measurements: dict[str, dict[str, float]] | None = None, | ||
tags: dict[str, str] | None = None, | ||
) -> Mapping[str, Any]: | ||
measurements = measurements or {} | ||
tags = tags or {} | ||
return { | ||
"description": "/api/0/relays/projectconfigs/", | ||
"duration_ms": 152, | ||
"event_id": "d826225de75d42d6b2f01b957d51f18f", | ||
"exclusive_time_ms": 0.228, | ||
"is_segment": True, | ||
"data": { | ||
"sentry.environment": "development", | ||
"sentry.release": _RELEASE_TAG, | ||
"thread.name": "uWSGIWorker1Core0", | ||
"thread.id": "8522009600", | ||
"sentry.segment.name": "/api/0/relays/projectconfigs/", | ||
"sentry.sdk.name": "sentry.python.django", | ||
"sentry.sdk.version": "2.7.0", | ||
"my.float.field": 101.2, | ||
"my.int.field": 2000, | ||
"my.neg.field": -100, | ||
"my.neg.float.field": -101.2, | ||
"my.true.bool.field": True, | ||
"my.false.bool.field": False, | ||
}, | ||
"measurements": { | ||
"num_of_spans": {"value": 50.0}, | ||
"eap.measurement": {"value": random.choice([1, 100, 1000])}, | ||
**measurements, | ||
}, | ||
"organization_id": organization_id, | ||
"origin": "auto.http.django", | ||
"project_id": 1, | ||
"received": 1721319572.877828, | ||
"retention_days": 90, | ||
"segment_id": "8873a98879faf06d", | ||
"sentry_tags": { | ||
"category": "http", | ||
"environment": "development", | ||
"op": "http.server", | ||
"platform": "python", | ||
"release": _RELEASE_TAG, | ||
"sdk.name": "sentry.python.django", | ||
"sdk.version": "2.7.0", | ||
"status": "ok", | ||
"status_code": "200", | ||
"thread.id": "8522009600", | ||
"thread.name": "uWSGIWorker1Core0", | ||
"trace.status": "ok", | ||
"transaction": "/api/0/relays/projectconfigs/", | ||
"transaction.method": "POST", | ||
"transaction.op": "http.server", | ||
"user": "ip:192.168.0.45", | ||
}, | ||
"span_id": "123456781234567D", | ||
"tags": { | ||
"http.status_code": "200", | ||
"relay_endpoint_version": "3", | ||
"relay_id": "88888888-4444-4444-8444-cccccccccccc", | ||
"relay_no_cache": "False", | ||
"relay_protocol_version": "3", | ||
"relay_use_post_or_schedule": "True", | ||
"relay_use_post_or_schedule_rejected": "version", | ||
"user.ip": "192.168.0.45", | ||
"user": "ip:192.168.0.45", | ||
"spans_over_limit": "False", | ||
"server_name": "blah", | ||
"color": random.choice(["red", "green", "blue"]), | ||
"location": random.choice(["mobile", "frontend", "backend"]), | ||
**tags, | ||
}, | ||
"trace_id": uuid.uuid4().hex, | ||
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), | ||
"start_timestamp_precise": dt.timestamp(), | ||
"end_timestamp_precise": dt.timestamp() + 1, | ||
} | ||
|
||
|
||
def _generate_request( | ||
ts: Any, hour_ago: int, organization_id: int, project_ids: list[int] | ||
) -> TraceItemTableRequest: | ||
# project_ids is added as an argument to avoid this query getting cached | ||
return TraceItemTableRequest( | ||
meta=RequestMeta( | ||
project_ids=project_ids, | ||
organization_id=organization_id, | ||
cogs_category="something", | ||
referrer="something", | ||
start_timestamp=Timestamp(seconds=hour_ago), | ||
end_timestamp=ts, | ||
request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", | ||
), | ||
filter=TraceItemFilter( | ||
exists_filter=ExistsFilter( | ||
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color") | ||
) | ||
), | ||
columns=[Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user"))], | ||
order_by=[ | ||
TraceItemTableRequest.OrderBy( | ||
column=Column( | ||
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user") | ||
) | ||
) | ||
], | ||
) | ||
|
||
|
||
def _generate_expected_response(user: str) -> TraceItemTableResponse: | ||
return TraceItemTableResponse( | ||
column_values=[ | ||
TraceItemColumnValues( | ||
attribute_name="user", | ||
results=[AttributeValue(val_str="ip:" + user) for _ in range(20)], | ||
) | ||
], | ||
page_token=PageToken(offset=20), | ||
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), | ||
) | ||
|
||
|
||
@pytest.mark.clickhouse_db | ||
@pytest.mark.redis_db | ||
def test_span_is_scrubbed() -> None: | ||
BASE_TIME = datetime.utcnow().replace( | ||
minute=0, second=0, microsecond=0 | ||
) - timedelta(minutes=180) | ||
organization_ids = [0, 1] | ||
spans_storage = get_storage(StorageKey("eap_spans")) | ||
messages = [ | ||
_gen_message(BASE_TIME - timedelta(minutes=i), organization_id) | ||
for organization_id in organization_ids | ||
for i in range(20) | ||
] | ||
write_raw_unprocessed_events(spans_storage, messages) # type: ignore | ||
|
||
# we inserted spans for organizations 0, 1, 2, and we make sure they look as expected | ||
ts = Timestamp(seconds=int(BASE_TIME.timestamp())) | ||
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) | ||
for organization_id in organization_ids: | ||
|
||
response = EndpointTraceItemTable().execute( | ||
_generate_request(ts, hour_ago, organization_id, [1, 2, 3]) | ||
) | ||
assert response == _generate_expected_response(_USER) | ||
|
||
# next we scrub organizations 0 | ||
start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds) | ||
end_datetime = datetime.utcfromtimestamp(ts.seconds) | ||
|
||
run_job( | ||
JobSpec( | ||
"plswork", | ||
"ScrubUserFromEAPSpans", | ||
False, | ||
{ | ||
"organization_ids": [organization_ids[0]], | ||
"start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"), | ||
"end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"), | ||
}, | ||
) | ||
) | ||
|
||
response = EndpointTraceItemTable().execute( | ||
_generate_request(ts, hour_ago, organization_ids[0], [3, 2, 1]) | ||
) | ||
assert response == _generate_expected_response("scrubbed") | ||
|
||
# then we make sure organization 1 is NOT SCRUBBED | ||
response = EndpointTraceItemTable().execute( | ||
_generate_request(ts, hour_ago, organization_ids[1], [3, 2, 1]) | ||
) | ||
assert response == _generate_expected_response(_USER) |