Skip to content

Commit

Permalink
chore: Docstring additions, some light refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 22, 2024
1 parent 2b16310 commit a866795
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 17 deletions.
4 changes: 4 additions & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ async def produce_batch_export_record_batches_from_range(
queue: RecordBatchQueue,
query_parameters: dict[str, typing.Any],
):
"""Produce all record batches into `queue` required to complete `full_range`.
This function will skip over any already completed `done_ranges`.
"""
for interval_start, interval_end in generate_query_ranges(full_range, done_ranges):
if interval_start is not None:
query_parameters["interval_start"] = interval_start.strftime("%Y-%m-%d %H:%M:%S.%f")
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,8 @@ async def flush_to_bigquery(
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

details.insert_done_range(last_date_range, data_interval_start)
heartbeater.details = tuple(details.serialize_details())
details.track_done_range(last_date_range, data_interval_start)
heartbeater.set_from_heartbeat_details(details)

flush_tasks = []
while not queue.empty() or not produce_task.done():
Expand Down
53 changes: 45 additions & 8 deletions posthog/temporal/batch_exports/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@

@dataclasses.dataclass
class BatchExportRangeHeartbeatDetails(HeartbeatDetails):
"""Details included in every batch export heartbeat."""
"""Details included in every batch export heartbeat.
Attributes:
done_ranges: Date ranges that have been successfully exported.
_remaining: Anything else in the activity details.
"""

_remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple)
done_ranges: list[DateRange] = dataclasses.field(default_factory=list)
_remaining: collections.abc.Sequence[typing.Any] = dataclasses.field(default_factory=tuple)

@classmethod
def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> dict[str, typing.Any]:
"""Attempt to initialize HeartbeatDetails from an activity's details."""
"""Deserialize this from Temporal activity details.
We expect done ranges to be available in the first index of remaining
values. Moreover, we expect datetime values to be ISO-formatted strings.
"""
done_ranges: list[DateRange] = []
remaining = super().deserialize_details(details)

Expand All @@ -51,7 +60,11 @@ def deserialize_details(cls, details: collections.abc.Sequence[typing.Any]) -> d
return {"done_ranges": done_ranges, **remaining}

def serialize_details(self) -> tuple[typing.Any, ...]:
"""Attempt to initialize HeartbeatDetails from an activity's details."""
"""Serialize this into a tuple.
Each datetime from `self.done_ranges` must be cast to string as values must
be JSON-serializable.
"""
serialized_done_ranges = [
(start.isoformat() if start is not None else start, end.isoformat()) for (start, end) in self.done_ranges
]
Expand All @@ -62,9 +75,20 @@ def serialize_details(self) -> tuple[typing.Any, ...]:
def empty(self) -> bool:
return len(self.done_ranges) == 0

def insert_done_range(
def track_done_range(
self, done_range: DateRange, data_interval_start_input: str | dt.datetime | None, merge: bool = True
):
"""Track a range of datetime values that has been exported successfully.
If this is the first `done_range` then we override the beginning of the
range to ensure it covers the range from `data_interval_start_input`.
Arguments:
done_range: A date range of values that have been exported.
data_interval_start_input: The `data_interval_start` input passed to
the batch export
merge: Whether to merge the new range with existing ones.
"""
if self.empty is True:
if data_interval_start_input is None:
data_interval_start = dt.datetime.fromtimestamp(0, tz=dt.UTC)
Expand All @@ -75,9 +99,16 @@ def insert_done_range(

done_range = (data_interval_start, done_range[1])

self.insert_done_range_within_ranges(done_range, merge=merge)
self.insert_done_range(done_range, merge=merge)

def complete_done_ranges(self, data_interval_end_input: str | dt.datetime, merge: bool = True):
"""Complete the ranges required to reach `data_interval_end_input`.
def complete_done_range(self, data_interval_end_input: str | dt.datetime, merge: bool = True):
This is meant to be called at the end of a batch export to ensure
`self.done_ranges` covers the entire batch period until
`data_interval_end_input`.
"""
if isinstance(data_interval_end_input, str):
data_interval_end = dt.datetime.fromisoformat(data_interval_end_input)
else:
Expand All @@ -89,7 +120,8 @@ def complete_done_range(self, data_interval_end_input: str | dt.datetime, merge:
if merge:
self.merge_done_ranges()

def insert_done_range_within_ranges(self, done_range: DateRange, merge: bool = True):
def insert_done_range(self, done_range: DateRange, merge: bool = True):
"""Insert a date range into `self.done_ranges` in order."""
for index, range in enumerate(self.done_ranges, start=0):
if done_range[0] > range[1]:
continue
Expand All @@ -104,6 +136,11 @@ def insert_done_range_within_ranges(self, done_range: DateRange, merge: bool = T
self.merge_done_ranges()

def merge_done_ranges(self):
"""Merge as many date ranges together as possible in `self.done_ranges`.
This method looks for ranges whose opposite ends are touching and merges
them together. We rely on ranges not overlapping to achieve this.
"""
marked_for_deletion = set()
for index, range in enumerate(self.done_ranges, start=0):
if index in marked_for_deletion:
Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ async def flush_to_redshift(batch):
await flush_to_redshift(batch)

last_date_range = (batch_start_inserted_at, _inserted_at)
heartbeat_details.insert_done_range(last_date_range, data_interval_start)
heartbeater.details = tuple(heartbeat_details.serialize_details())
heartbeat_details.track_done_range(last_date_range, data_interval_start)
heartbeater.set_from_heartbeat_details(heartbeat_details)

batch_start_inserted_at = None
batch = []
Expand All @@ -385,8 +385,8 @@ async def flush_to_redshift(batch):

last_date_range = (batch_start_inserted_at, _inserted_at)

heartbeat_details.insert_done_range(last_date_range, data_interval_start)
heartbeater.details = tuple(heartbeat_details.serialize_details())
heartbeat_details.track_done_range(last_date_range, data_interval_start)
heartbeater.set_from_heartbeat_details(heartbeat_details)

return total_rows_exported

Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ async def flush_to_s3(
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

details.insert_done_range(last_date_range, data_interval_start)
details.track_done_range(last_date_range, data_interval_start)
details.append_upload_state(s3_upload.to_state())
heartbeater.set_from_heartbeat_details(details)

Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,9 @@ async def flush_to_snowflake(
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

details.insert_done_range(last_date_range, data_interval_start)
details.track_done_range(last_date_range, data_interval_start)
details.file_no = flush_counter
heartbeater.details = tuple(details.serialize_details())
heartbeater.set_from_heartbeat_details(details)

writer = JSONLBatchExportWriter(
max_bytes=settings.BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES,
Expand Down

0 comments on commit a866795

Please sign in to comment.