diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index a1ba88b17d4e5..fd9a718667262 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -449,6 +449,10 @@ async def produce_batch_export_record_batches_from_range( queue: RecordBatchQueue, query_parameters: dict[str, typing.Any], ): + """Produce all record batches into `queue` required to complete `full_range`. + + This function will skip over any already completed `done_ranges`. + """ for interval_start, interval_end in generate_query_ranges(full_range, done_ranges): if interval_start is not None: query_parameters["interval_start"] = interval_start.strftime("%Y-%m-%d %H:%M:%S.%f") diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 94300a46bbd9e..500d2a3ef8657 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -514,8 +514,8 @@ async def flush_to_bigquery( rows_exported.add(records_since_last_flush) bytes_exported.add(bytes_since_last_flush) - details.insert_done_range(last_date_range, data_interval_start) - heartbeater.details = tuple(details.serialize_details()) + details.track_done_range(last_date_range, data_interval_start) + heartbeater.set_from_heartbeat_details(details) flush_tasks = [] while not queue.empty() or not produce_task.done(): diff --git a/posthog/temporal/batch_exports/heartbeat.py b/posthog/temporal/batch_exports/heartbeat.py index d0c1d038f0010..d2eb504eca7f6 100644 --- a/posthog/temporal/batch_exports/heartbeat.py +++ b/posthog/temporal/batch_exports/heartbeat.py @@ -19,14 +19,23 @@ @dataclasses.dataclass class BatchExportRangeHeartbeatDetails(HeartbeatDetails): - """Details included in every batch export heartbeat.""" + """Details included in every batch export heartbeat. + + Attributes: + done_ranges: Date ranges that have been successfully exported. + _remaining: Anything else in the activity details. + """ - _remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple) done_ranges: list[DateRange] = dataclasses.field(default_factory=list) + _remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple) @classmethod def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> dict[str, typing.Any]: - """Attempt to initialize HeartbeatDetails from an activity's details.""" + """Deserialize this from Temporal activity details. + + We expect done ranges to be available in the first index of remaining + values. Moreover, we expect datetime values to be ISO-formatted strings. + """ done_ranges: list[DateRange] = [] remaining = super().deserialize_details(details) @@ -51,7 +60,11 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d return {"done_ranges": done_ranges, **remaining} def serialize_details(self) -> tuple[typing.Any, ...]: - """Attempt to initialize HeartbeatDetails from an activity's details.""" + """Serialize this into a tuple. + + Each datetime from `self.done_ranges` must be cast to string as values must + be JSON-serializable. + """ serialized_done_ranges = [ (start.isoformat() if start is not None else start, end.isoformat()) for (start, end) in self.done_ranges ] @@ -62,9 +75,20 @@ def serialize_details(self) -> tuple[typing.Any, ...]: def empty(self) -> bool: return len(self.done_ranges) == 0 - def insert_done_range( + def track_done_range( self, done_range: DateRange, data_interval_start_input: str | dt.datetime | None, merge: bool = True ): + """Track a range of datetime values that has been exported successfully. + + If this is the first `done_range` then we override the beginning of the + range to ensure it covers the range from `data_interval_start_input`. + + Arguments: + done_range: A date range of values that have been exported. + data_interval_start_input: The `data_interval_start` input passed to + the batch export + merge: Whether to merge the new range with existing ones. + """ if self.empty is True: if data_interval_start_input is None: data_interval_start = dt.datetime.fromtimestamp(0, tz=dt.UTC) @@ -75,9 +99,16 @@ def insert_done_range( done_range = (data_interval_start, done_range[1]) - self.insert_done_range_within_ranges(done_range, merge=merge) + self.insert_done_range(done_range, merge=merge) + + def complete_done_ranges(self, data_interval_end_input: str | dt.datetime, merge: bool = True): + """Complete the ranges required to reach `data_interval_end_input`. - def complete_done_range(self, data_interval_end_input: str | dt.datetime, merge: bool = True): + + This is meant to be called at the end of a batch export to ensure + `self.done_ranges` covers the entire batch period until + `data_interval_end_input`. + """ if isinstance(data_interval_end_input, str): data_interval_end = dt.datetime.fromisoformat(data_interval_end_input) else: @@ -89,7 +120,8 @@ def complete_done_range(self, data_interval_end_input: str | dt.datetime, merge: if merge: self.merge_done_ranges() - def insert_done_range_within_ranges(self, done_range: DateRange, merge: bool = True): + def insert_done_range(self, done_range: DateRange, merge: bool = True): + """Insert a date range into `self.done_ranges` in order.""" for index, range in enumerate(self.done_ranges, start=0): if done_range[0] > range[1]: continue @@ -104,6 +136,11 @@ def insert_done_range_within_ranges(self, done_range: DateRange, merge: bool = T self.merge_done_ranges() def merge_done_ranges(self): + """Merge as many date ranges together as possible in `self.done_ranges`. + + This method looks for ranges whose opposite ends are touching and merges + them together. We rely on ranges not overlapping to achieve this. + """ marked_for_deletion = set() for index, range in enumerate(self.done_ranges, start=0): if index in marked_for_deletion: diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index cba9da3f88876..1fbc6e3de58d3 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -374,8 +374,8 @@ async def flush_to_redshift(batch): await flush_to_redshift(batch) last_date_range = (batch_start_inserted_at, _inserted_at) - heartbeat_details.insert_done_range(last_date_range, data_interval_start) - heartbeater.details = tuple(heartbeat_details.serialize_details()) + heartbeat_details.track_done_range(last_date_range, data_interval_start) + heartbeater.set_from_heartbeat_details(heartbeat_details) batch_start_inserted_at = None batch = [] @@ -385,8 +385,8 @@ async def flush_to_redshift(batch): last_date_range = (batch_start_inserted_at, _inserted_at) - heartbeat_details.insert_done_range(last_date_range, data_interval_start) - heartbeater.details = tuple(heartbeat_details.serialize_details()) + heartbeat_details.track_done_range(last_date_range, data_interval_start) + heartbeater.set_from_heartbeat_details(heartbeat_details) return total_rows_exported diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 121b90efef172..e771e6ae2bb91 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -614,7 +614,7 @@ async def flush_to_s3( rows_exported.add(records_since_last_flush) bytes_exported.add(bytes_since_last_flush) - details.insert_done_range(last_date_range, data_interval_start) + details.track_done_range(last_date_range, data_interval_start) details.append_upload_state(s3_upload.to_state()) heartbeater.set_from_heartbeat_details(details) diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 5af85e3de4b3f..0c29662a29404 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -701,9 +701,9 @@ async def flush_to_snowflake( rows_exported.add(records_since_last_flush) bytes_exported.add(bytes_since_last_flush) - details.insert_done_range(last_date_range, data_interval_start) + details.track_done_range(last_date_range, data_interval_start) details.file_no = flush_counter - heartbeater.details = tuple(details.serialize_details()) + heartbeater.set_from_heartbeat_details(details) writer = JSONLBatchExportWriter( max_bytes=settings.BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES,