Skip to content

Commit

Permalink
ref(replays): remove unused chunking code (#47877)
Browse files Browse the repository at this point in the history
After merging getsentry/relay#2032, we've
observed that we are no longer processing chunked messages (as
intended). We can remove the logic for this as it is no longer used.
  • Loading branch information
JoshFerge authored May 2, 2023
1 parent 52fde50 commit fa6fdea
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 216 deletions.
40 changes: 6 additions & 34 deletions src/sentry/replays/consumers/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,11 @@
from arroyo.processing.strategies import RunTaskInThreads, TransformStep
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.filter import FilterStep
from arroyo.types import Commit, Message, Partition
from django.conf import settings
from sentry_sdk.tracing import Span

from sentry.replays.usecases.ingest import (
RecordingMessage,
RecordingSegmentChunkMessage,
RecordingSegmentMessage,
ingest_chunk,
ingest_recording_chunked,
ingest_recording_not_chunked,
)
from sentry.replays.usecases.ingest import RecordingMessage, ingest_recording

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,54 +48,34 @@ def create_with_partitions(
next_step=CommitOffsets(commit),
)

step2: FilterStep[MessageContext] = FilterStep(
function=is_capstone_message,
next_step=step,
)

return TransformStep(
function=move_chunks_to_cache_or_skip,
next_step=step2,
function=initialize_message_context,
next_step=step,
)


def move_chunks_to_cache_or_skip(message: Message[KafkaPayload]) -> MessageContext:
"""Move chunk messages to cache or skip."""
def initialize_message_context(message: Message[KafkaPayload]) -> MessageContext:
"""Initialize a Sentry transaction and unpack the message."""
transaction = sentry_sdk.start_transaction(
name="replays.consumer.process_recording",
op="replays.consumer",
sampled=random.random()
< getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0),
)
current_hub = sentry_sdk.Hub(sentry_sdk.Hub.current)

message_dict = msgpack.unpackb(message.payload.value)

if message_dict["type"] == "replay_recording_chunk":
ingest_chunk(cast(RecordingSegmentChunkMessage, message_dict), transaction, current_hub)

return MessageContext(message_dict, transaction, current_hub)


def is_capstone_message(message: Message[MessageContext]) -> Any:
"""Return "True" if the message is a capstone and can be processed in parallel."""
message_type = message.payload.message["type"]
return message_type == "replay_recording_not_chunked" or message_type == "replay_recording"


def move_replay_to_permanent_storage(message: Message[MessageContext]) -> Any:
"""Move the replay payload to permanent storage."""
context: MessageContext = message.payload
message_dict = context.message
message_type = message_dict["type"]

if message_type == "replay_recording_not_chunked":
ingest_recording_not_chunked(
ingest_recording(
cast(RecordingMessage, message_dict), context.transaction, context.current_hub
)
elif message_type == "replay_recording":
ingest_recording_chunked(
cast(RecordingSegmentMessage, message_dict), context.transaction, context.current_hub
)
else:
raise ValueError(f"Invalid replays recording message type specified: {message_type}")
104 changes: 7 additions & 97 deletions src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import zlib
from datetime import datetime, timezone
from typing import Optional, TypedDict, Union
from typing import Optional, TypedDict

from django.conf import settings
from sentry_sdk import Hub
Expand All @@ -13,7 +13,6 @@
from sentry import options
from sentry.constants import DataCategory
from sentry.models.project import Project
from sentry.replays.cache import RecordingSegmentCache, RecordingSegmentParts
from sentry.replays.feature import has_feature_access
from sentry.replays.lib.storage import RecordingSegmentStorageMeta, make_storage_driver
from sentry.replays.usecases.ingest.dom_index import parse_and_emit_replay_actions
Expand All @@ -36,14 +35,6 @@ class RecordingSegmentHeaders(TypedDict):
segment_id: int


class RecordingSegmentChunkMessage(TypedDict):
id: str # a uuid that individualy identifies a recording segment
replay_id: str # the uuid of the encompassing replay event
project_id: int
chunk_index: int # each segment is split into chunks to fit into kafka
payload: Union[bytes, str]


class RecordingSegmentMessage(TypedDict):
retention_days: int
org_id: int
Expand Down Expand Up @@ -79,66 +70,13 @@ class RecordingIngestMessage:
payload_with_headers: bytes


@metrics.wraps("replays.usecases.ingest.ingest_recording_chunked")
def ingest_recording_chunked(
message_dict: RecordingSegmentMessage, transaction: Span, current_hub: Hub
) -> None:
"""Ingest chunked recording messages."""
with current_hub:
with transaction.start_child(
op="replays.usecases.ingest.ingest_recording_chunked",
description="ingest_recording_chunked",
):
cache_prefix = replay_recording_segment_cache_id(
project_id=message_dict["project_id"],
replay_id=message_dict["replay_id"],
segment_id=message_dict["replay_recording"]["id"],
)
parts = RecordingSegmentParts(
prefix=cache_prefix, num_parts=message_dict["replay_recording"]["chunks"]
)

try:
recording_segment_with_headers = collate_segment_chunks(parts)
except ValueError:
logger.exception("Missing recording-segment.")
return None

logger.info(
"ingest_recording_chunked.info",
extra={
"organization_id": message_dict["org_id"],
"project_id": message_dict["project_id"],
"replay_id": message_dict["replay_id"],
"num_parts": message_dict["replay_recording"]["chunks"],
"size_compressed": len(recording_segment_with_headers),
},
)
message = RecordingIngestMessage(
replay_id=message_dict["replay_id"],
key_id=message_dict.get("key_id"),
org_id=message_dict["org_id"],
project_id=message_dict["project_id"],
received=message_dict["received"],
retention_days=message_dict["retention_days"],
payload_with_headers=recording_segment_with_headers,
)
ingest_recording(message, transaction)

# Segment chunks are always deleted if ingest behavior runs without error.
with metrics.timer("replays.process_recording.store_recording.drop_segments"):
parts.drop()


@metrics.wraps("replays.usecases.ingest.ingest_recording_not_chunked")
def ingest_recording_not_chunked(
message_dict: RecordingMessage, transaction: Span, current_hub: Hub
) -> None:
@metrics.wraps("replays.usecases.ingest.ingest_recording")
def ingest_recording(message_dict: RecordingMessage, transaction: Span, current_hub: Hub) -> None:
"""Ingest non-chunked recording messages."""
with current_hub:
with transaction.start_child(
op="replays.usecases.ingest.ingest_recording_not_chunked",
description="ingest_recording_not_chunked",
op="replays.usecases.ingest.ingest_recording",
description="ingest_recording",
):
message = RecordingIngestMessage(
replay_id=message_dict["replay_id"],
Expand All @@ -149,10 +87,10 @@ def ingest_recording_not_chunked(
retention_days=message_dict["retention_days"],
payload_with_headers=message_dict["payload"],
)
ingest_recording(message, transaction)
_ingest_recording(message, transaction)


def ingest_recording(message: RecordingIngestMessage, transaction: Span) -> None:
def _ingest_recording(message: RecordingIngestMessage, transaction: Span) -> None:
"""Ingest recording messages."""
try:
headers, recording_segment = process_headers(message.payload_with_headers)
Expand Down Expand Up @@ -208,34 +146,6 @@ def ingest_recording(message: RecordingIngestMessage, transaction: Span) -> None
transaction.finish()


@metrics.wraps("replays.usecases.ingest.ingest_chunk")
def ingest_chunk(
message_dict: RecordingSegmentChunkMessage, transaction: Span, current_hub: Hub
) -> None:
"""Ingest chunked message part."""
with current_hub:
with transaction.start_child(op="replays.process_recording.store_chunk"):
cache_prefix = replay_recording_segment_cache_id(
project_id=message_dict["project_id"],
replay_id=message_dict["replay_id"],
segment_id=message_dict["id"],
)

payload = message_dict["payload"]
payload = payload.encode("utf-8") if isinstance(payload, str) else payload

part = RecordingSegmentCache(cache_prefix)
part[message_dict["chunk_index"]] = payload

transaction.finish()


@metrics.wraps("replays.usecases.ingest.collate_segment_chunks")
def collate_segment_chunks(chunks: RecordingSegmentParts) -> bytes:
"""Collect and merge recording segment chunks."""
return b"".join(list(chunks))


@metrics.wraps("replays.usecases.ingest.process_headers")
def process_headers(bytes_with_headers: bytes) -> tuple[RecordingSegmentHeaders, bytes]:
try:
Expand Down
85 changes: 0 additions & 85 deletions tests/sentry/replays/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,6 @@ def submit(self, messages):
strategy.join(1)
strategy.terminate()

def chunked_messages(
self,
message: bytes = b'[{"hello":"world"}]',
segment_id: int = 0,
compressed: bool = False,
) -> List[Dict[str, Any]]:
message = zlib.compress(message) if compressed else message
return [
{
"payload": f'{{"segment_id":{segment_id}}}\n'.encode() + message,
"replay_id": self.replay_id,
"project_id": self.project.id,
"id": self.replay_recording_id,
"chunk_index": 0,
"type": "replay_recording_chunk",
"org_id": self.organization.id,
"received": time.time(),
"retention_days": 30,
"key_id": 123,
"retention_days": 30,
},
{
"type": "replay_recording",
"replay_id": self.replay_id,
"replay_recording": {
"chunks": 1,
"id": self.replay_recording_id,
},
"project_id": self.project.id,
"org_id": self.organization.id,
"received": time.time(),
"retention_days": 30,
},
]

def nonchunked_messages(
self,
message: bytes = b'[{"hello":"world"}]',
Expand All @@ -102,56 +67,6 @@ def nonchunked_messages(
}
]

@patch("sentry.models.OrganizationOnboardingTask.objects.record")
@patch("sentry.analytics.record")
def test_chunked_compressed_segment_ingestion(self, mock_record, mock_onboarding_task):
segment_id = 0
self.submit(self.chunked_messages(segment_id=segment_id, compressed=True))
self.assert_replay_recording_segment(segment_id, compressed=True)

self.project.refresh_from_db()
assert self.project.flags.has_replays

mock_onboarding_task.assert_called_with(
organization_id=self.project.organization_id,
task=OnboardingTask.SESSION_REPLAY,
status=OnboardingTaskStatus.COMPLETE,
date_completed=ANY,
)

mock_record.assert_called_with(
"first_replay.sent",
organization_id=self.organization.id,
project_id=self.project.id,
platform=self.project.platform,
user_id=self.organization.default_owner_id,
)

@patch("sentry.models.OrganizationOnboardingTask.objects.record")
@patch("sentry.analytics.record")
def test_chunked_uncompressed_segment_ingestion(self, mock_record, mock_onboarding_task):
segment_id = 0
self.submit(self.chunked_messages(segment_id=segment_id, compressed=False))
self.assert_replay_recording_segment(segment_id, compressed=False)

self.project.refresh_from_db()
assert self.project.flags.has_replays

mock_onboarding_task.assert_called_with(
organization_id=self.project.organization_id,
task=OnboardingTask.SESSION_REPLAY,
status=OnboardingTaskStatus.COMPLETE,
date_completed=ANY,
)

mock_record.assert_called_with(
"first_replay.sent",
organization_id=self.organization.id,
project_id=self.project.id,
platform=self.project.platform,
user_id=self.organization.default_owner_id,
)

@patch("sentry.models.OrganizationOnboardingTask.objects.record")
@patch("sentry.analytics.record")
def test_compressed_segment_ingestion(self, mock_record, mock_onboarding_task):
Expand Down

0 comments on commit fa6fdea

Please sign in to comment.