Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
kylemumma committed Jul 17, 2024
1 parent e7e624f commit 88c464c
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions snuba/migrations/operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -77,12 +78,36 @@ def execute(self) -> None:
logger.info(f"Executing on {self.target.value} node: {node}")
try:
connection.execute(self.format_sql(), settings=self._settings)
self._block_on_mutations(connection)
except Exception:
logger.exception(
f"Failed to execute operation on {self.storage_set}, target: {self.target}\n{self.format_sql()}\n{self._settings}"
)
raise

def _block_on_mutations(
self, conn: ClickhousePool, poll_seconds: int = 5, timeout_seconds: int = 300
) -> None:
"""
This function blocks until all entries of system.mutations
have is_done=1. Polls system.mutations every poll_seconds.
Raises error if not unblocked after timeout_seconds.
"""
slept_so_far = 0
while True:
is_mutating = conn.execute(
"select count(*) from system.mutations where is_done=0"
).results != [(0,)]
if not is_mutating:
return
elif slept_so_far >= timeout_seconds:
raise RuntimeError(
f"{conn.host}:{conn.port} not finished mutating after {timeout_seconds} seconds"
)
else:
time.sleep(poll_seconds)
slept_so_far += poll_seconds

@abstractmethod
def format_sql(self) -> str:
raise NotImplementedError
Expand Down

0 comments on commit 88c464c

Please sign in to comment.