Skip to content

Commit

Permalink
Core: Improve assembling retention task list
Browse files Browse the repository at this point in the history
By discretely resolving the retention task list, i.e. the list of
partitions to be retired, and iterating them at runtime instead of using
JOINs, the program can discriminate between many more error conditions,
and inform the user correspondingly. This significantly improves
usability.

Currently, the error conditions are:

- Strategy not given.
- Given tag(s) do not exist.
- No matching retention policies found.
- Data table not found.
- No data to be retired.

At the same time, the refactoring simplifies the implementations of the
specific retention policy task executors (delete, reallocate, snapshot).
  • Loading branch information
amotl committed Jul 8, 2023
1 parent ce5b2e6 commit c63d2b8
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 261 deletions.
259 changes: 136 additions & 123 deletions cratedb_retention/core.py
Original file line number Diff line number Diff line change
@@ -1,176 +1,189 @@
# Copyright (c) 2021-2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import dataclasses
import logging
import typing as t
from importlib.resources import read_text

import sqlalchemy as sa

from cratedb_retention.model import JobSettings, RetentionStrategy
from cratedb_retention.model import JobSettings, RetentionPolicy, RetentionStrategy, RetentionTask
from cratedb_retention.store import RetentionPolicyStore
from cratedb_retention.strategy.delete import DeleteRetentionTask
from cratedb_retention.strategy.reallocate import ReallocateRetentionJob, ReallocateRetentionTask
from cratedb_retention.strategy.snapshot import SnapshotRetentionTask
from cratedb_retention.util.database import run_sql

logger = logging.getLogger(__name__)


class RetentionJob:
"""
The retention job implementation evaluates its configuration and runtime settings,
and dispatches to corresponding retention strategy implementations.
This is effectively the main application, implementing a retention and expiration
The main application, implementing a retention and expiration
management subsystem for CrateDB.
The retention job evaluates its configuration and runtime settings,
and dispatches to corresponding retention strategy implementations.
"""

def __init__(self, settings: JobSettings):
# Runtime context settings.
self.settings = settings

# Retention policy store API.
self.store = RetentionPolicyStore(settings=self.settings)

def start(self):
"""
Resolve retention policies to tasks, and invoke them.
"""
logger.info(
f"Connecting to database {self.settings.database.safe}, " f"table {self.settings.policy_table.fullname}"
)

logger.info(
f"Starting data retention using '{self.settings.strategy}' " f"and cut-off day '{self.settings.cutoff_day}'"
)

strategy = self.settings.strategy

implementation: t.Type[GenericRetention]
msg = f"Starting data retention using '{self.settings.strategy}', cut-off day '{self.settings.cutoff_day}'"
if self.settings.tags:
msg += f", and tags '{self.settings.tags}'"
logger.info(msg)

# Resolve strategy implementation.
if strategy is RetentionStrategy.DELETE:
from cratedb_retention.strategy.delete import DeleteRetention

implementation = DeleteRetention
elif strategy is RetentionStrategy.REALLOCATE:
from cratedb_retention.strategy.reallocate import ReallocateRetention

implementation = ReallocateRetention
elif strategy is RetentionStrategy.SNAPSHOT:
from cratedb_retention.strategy.snapshot import SnapshotRetention

implementation = SnapshotRetention
else:
raise NotImplementedError(f"Retention strategy {strategy} not implemented yet")

# Propagate runtime context settings, and invoke job.
# Invoke job.
# TODO: Add audit logging.
# TODO: Add job tracking.
job = implementation(settings=self.settings, store=self.store)
job.start()


@dataclasses.dataclass
class GenericRetention:
"""
Represent a complete generic data retention job.
"""

# Runtime context settings.
settings: JobSettings

# Retention policy store API.
store: RetentionPolicyStore

# Which action class to use for deserializing records.
_action_class: t.Any = dataclasses.field(init=False)

# File name of SQL statement to load retention policies.
_tasks_sql_file: t.Union[str, None] = dataclasses.field(init=False, default=None)
for task in self.get_retention_tasks():
logger.info(f"Executing data retention task: {task}")

# SQL statement to load retention policies.
_tasks_sql_text: t.Union[str, None] = dataclasses.field(init=False, default=None)

def start(self):
"""
Evaluate retention policies, and invoke actions.
"""

for policy in self.get_policy_tasks():
logger.info(f"Executing data retention policy: {policy}")
sql_bunch: t.Iterable = policy.to_sql()
sql_bunch: t.Iterable = task.to_sql()
if not isinstance(sql_bunch, t.List):
sql_bunch = [sql_bunch]

# Run a sequence of SQL statements for this task.
# Stop the sequence once anyone fails.
# Stop the sequence when a step fails.
for sql in sql_bunch:
try:
run_sql(dburi=self.settings.database.dburi, sql=sql)
except Exception:
logger.exception(f"Data retention SQL statement failed: {sql}")
# TODO: Do not `raise`, but `break`. Other policies should be executed.
raise
break

def get_where_clause(self, field_prefix: str = ""):
def get_retention_tasks(self) -> t.Generator[RetentionTask, None, None]:
"""
Compute SQL WHERE clause based on selected strategy and tags.
TODO: Migrate to SQLAlchemy.
Derive retention tasks from policies. There will be one task per resolved partition.
"""

# Sanity checks.
if self.settings.strategy is None:
raise ValueError("Unable to build where clause without retention strategy")
raise ValueError("Unable to load retention policies without strategy")

if self.settings.tags and not self.store.tags_exist(self.settings.tags):
logger.warning(f"No retention policies found with tags: {sorted(self.settings.tags)}")
return

# Load policies, filtering by strategy and tags.
policies = list(self.store.retrieve_policies(strategy=self.settings.strategy, tags=self.settings.tags))
if not policies:
logger.warning("No matching retention policies found")
return

for policy in policies:
logger.info(f"Processing data retention policy: {policy}")

# Verify data table exists.
if not self.store.database.table_exists(policy.table_fullname):
logger.warning(f"Data table not found: {policy.table_fullname}")
continue

sql_renderer = TaskSqlRenderer(settings=self.settings, store=self.store, policy=policy)
selectable = sql_renderer.render()
results = self.store.query(selectable)
if not results:
logger.warning(f"No data to be retired from data table: {policy.table_fullname}")
continue

# Resolve task runner implementation.
task_class: t.Type[RetentionTask]
if policy.strategy is RetentionStrategy.DELETE:
task_class = DeleteRetentionTask
elif policy.strategy is RetentionStrategy.REALLOCATE:
task_class = ReallocateRetentionTask
elif policy.strategy is RetentionStrategy.SNAPSHOT:
task_class = SnapshotRetentionTask
else:
raise NotImplementedError(f"Retention strategy {policy.strategy_type} not implemented yet")

# Iterate tasks, and invoke task runner.
for result in results:
task = task_class.factory(
table_schema=policy.table_schema,
table_name=policy.table_name,
table_fullname=policy.table_fullname,
partition_column=policy.partition_column,
partition_value=result["partition_value"],
reallocation_attribute_name=policy.reallocation_attribute_name,
reallocation_attribute_value=policy.reallocation_attribute_value,
target_repository_name=policy.target_repository_name,
)
yield task


class TaskSqlRenderer:
"""
Render SQL statement to gather retention tasks.
"""

strategy = str(self.settings.strategy.value).lower()
fragments = [f"{field_prefix}strategy='{strategy}'"]
def __init__(self, settings: JobSettings, store: RetentionPolicyStore, policy: RetentionPolicy):
"""
TODO: Get rid of `self.settings`?
"""
self.settings = settings
self.store = store
self.policy = policy

constraints = self.store.get_tags_constraints(self.settings.tags, table_alias="r")
if constraints is not None:
constraints_sql = constraints.compile(self.store.database.engine, compile_kwargs={"literal_binds": True})
fragments += [str(constraints_sql)]
def render(self):
"""
Render SQL statement.
clauses = map(sa.text, fragments)
return sa.and_(*clauses)
SQL DQL clause for selecting records from the retention policy database table,
used for all strategy implementations. It can be interpolated into other SQL
templates by using the `policy_dql` template variable.
def get_policy_tasks(self):
The retention policy database table is called `"ext"."retention_policy"` by default.
"""
Resolve retention policy items.

# Sanity checks.
if self.settings.cutoff_day is None:
raise ValueError("Unable to operate without cutoff date")

policy = self.policy
selectable = sa.text(
f"""
SELECT
*,
TRY_CAST(p.values['{policy.partition_column}'] AS BIGINT) AS partition_value
FROM
"information_schema"."table_partitions" AS p
WHERE 1
AND p.table_schema = '{policy.table_schema}'
AND p.table_name = '{policy.table_name}'
AND p.values['{policy.partition_column}'] <
'{self.settings.cutoff_day}'::TIMESTAMP - '{policy.retention_period} days'::INTERVAL
"""
if self._action_class is None:
raise ValueError("Loading retention policies needs an action class")
)
return self.specialize(selectable)

# Read baseline SQL clause, for selecting records from the retention policy
# database table, to be interpolated into the other templates.
policy_dql = read_text("cratedb_retention.strategy", "policy.sql")
def specialize(self, selectable):
"""
Specialize SQL statement.
if self.settings.tags and not self.store.tags_exist(self.settings.tags):
logger.warning(f"No retention policies found with tags: {self.settings.tags}")
return []

where_clause = self.get_where_clause(field_prefix="r.")
logger.info(f"Retention tasks - WHERE clause: {where_clause}")

# Read SQL statement, and interpolate runtime settings as template variables.
if self._tasks_sql_file:
sql = read_text("cratedb_retention.strategy", self._tasks_sql_file)
elif self._tasks_sql_text:
sql = self._tasks_sql_text
else:
sql = f"""
{policy_dql}
WHERE
{where_clause}
;
"""
tplvars = self.settings.to_dict()
try:
sql = sql.format(policy_dql=policy_dql, where_clause=where_clause)
except KeyError:
pass
sql = sql.format_map(tplvars)

# Load retention policies, already resolved against source table,
# and retention period vs. cut-off date.
policy_records = run_sql(self.settings.database.dburi, sql)
if not policy_records:
logger.warning("Retention policies and/or data table not found, or no data to be retired")
return []

logger.info(f"Loaded retention policies: {policy_records}")
for record in policy_records:
# Unmarshal entity from database table record to Python object.
policy = self._action_class.from_record(record)
yield policy
For example, the `REALLOCATE` strategy needs to wrap the foundational SQL statement
into another one, in order to correlate the results with `sys.shards` and `sys.nodes`
tables.
"""
if self.settings.strategy is RetentionStrategy.REALLOCATE:
# Acquire SQL template.
sql_wrapper = ReallocateRetentionJob.SQL

# Render SQL statement using classic string templating.
tplvars = self.policy.to_storage_dict()
sql = sql_wrapper.format(policy_dql=selectable, **tplvars)
selectable = sa.text(sql)

return selectable
Loading

0 comments on commit c63d2b8

Please sign in to comment.