Skip to content

Commit

Permalink
fix(metrics): Implement accurate histogram zooming (#33583)
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Apr 14, 2022
1 parent a838ad4 commit 9bf8c1e
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 39 deletions.
78 changes: 63 additions & 15 deletions src/sentry/snuba/metrics/fields/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from sentry.sentry_metrics import indexer
from sentry.sentry_metrics.utils import resolve_weak
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.fields.histogram import ClickhouseHistogram, rebucket_histogram
from sentry.snuba.metrics.fields.histogram import (
ClickhouseHistogram,
rebucket_histogram,
zoom_histogram,
)
from sentry.snuba.metrics.fields.snql import (
abnormal_sessions,
abnormal_users,
Expand Down Expand Up @@ -233,12 +237,19 @@ def run_post_query_function(
) -> SnubaDataType:
raise NotImplementedError

@abstractmethod
def generate_filter_snql_conditions(
self, org_id: int, query_definition: QueryDefinition
) -> Optional[Function]:
raise NotImplementedError


@dataclass
class DerivedOpDefinition(MetricOperationDefinition):
can_orderby: bool
query_definition_args: Optional[List[str]] = None
post_query_func: Callable[..., PostQueryFuncReturnType] = lambda *args: args
filter_conditions_func: Callable[..., Optional[Function]] = lambda _: None


class RawOp(MetricOperation):
Expand All @@ -254,6 +265,11 @@ def run_post_query_function(
) -> SnubaDataType:
return data

def generate_filter_snql_conditions(
self, org_id: int, query_definition: QueryDefinition
) -> None:
return


class DerivedOp(DerivedOpDefinition, MetricOperation):
def validate_can_orderby(self) -> None:
Expand Down Expand Up @@ -288,6 +304,16 @@ def run_post_query_function(
data[key][idx] = subdata
return data

def generate_filter_snql_conditions(
self, org_id: int, query_definition: QueryDefinition
) -> Optional[Function]:
kwargs = {"org_id": org_id}
if self.query_definition_args is not None:
for field in self.query_definition_args:
kwargs[field] = getattr(query_definition, field)

return self.filter_conditions_func(**kwargs)


class MetricExpressionBase(ABC):
@abstractmethod
Expand All @@ -310,7 +336,9 @@ def generate_metric_ids(self, projects: Sequence[Project]) -> Set[int]:
raise NotImplementedError

@abstractmethod
def generate_select_statements(self, projects: Sequence[Project]) -> List[Function]:
def generate_select_statements(
self, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[Function]:
"""
Method that generates a list of SnQL functions required to query an instance of
MetricsFieldBase
Expand All @@ -319,9 +347,7 @@ def generate_select_statements(self, projects: Sequence[Project]) -> List[Functi

@abstractmethod
def generate_orderby_clause(
self,
direction: Direction,
projects: Sequence[Project],
self, direction: Direction, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[OrderBy]:
"""
Method that generates a list of SnQL OrderBy clauses based on an instance of
Expand Down Expand Up @@ -414,19 +440,23 @@ class MetricExpression(MetricExpressionDefinition, MetricExpressionBase):
def get_entity(self, projects: Sequence[Project]) -> MetricEntity:
return OPERATIONS_TO_ENTITY[self.metric_operation.op]

def generate_select_statements(self, projects: Sequence[Project]) -> List[Function]:
def generate_select_statements(
self, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[Function]:
org_id = org_id_from_projects(projects)
return [
self.build_conditional_aggregate_for_metric(org_id, entity=self.get_entity(projects))
self.build_conditional_aggregate_for_metric(
org_id, entity=self.get_entity(projects), query_definition=query_definition
)
]

def generate_orderby_clause(
self, direction: Direction, projects: Sequence[Project]
self, direction: Direction, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[OrderBy]:
self.metric_operation.validate_can_orderby()
return [
OrderBy(
self.generate_select_statements(projects)[0],
self.generate_select_statements(projects, query_definition=query_definition)[0],
direction,
)
]
Expand Down Expand Up @@ -457,11 +487,21 @@ def generate_bottom_up_derived_metrics_dependencies(
) -> Iterable[Tuple[MetricOperationType, str]]:
return [(self.metric_operation.op, self.metric_object.metric_mri)]

def build_conditional_aggregate_for_metric(self, org_id: int, entity: MetricEntity) -> Function:
def build_conditional_aggregate_for_metric(
self, org_id: int, entity: MetricEntity, query_definition: QueryDefinition
) -> Function:
snuba_function = OP_TO_SNUBA_FUNCTION[entity][self.metric_operation.op]
conditions = self.metric_object.generate_filter_snql_conditions(org_id=org_id)

operation_based_filter = self.metric_operation.generate_filter_snql_conditions(
org_id=org_id, query_definition=query_definition
)
if operation_based_filter is not None:
conditions = Function("and", [conditions, operation_based_filter])

return Function(
snuba_function,
[Column("value"), self.metric_object.generate_filter_snql_conditions(org_id=org_id)],
[Column("value"), conditions],
alias=f"{self.metric_operation.op}({self.metric_object.metric_mri})",
)

Expand Down Expand Up @@ -589,7 +629,9 @@ def __recursively_generate_select_snql(
)
]

def generate_select_statements(self, projects: Sequence[Project]) -> List[Function]:
def generate_select_statements(
self, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[Function]:
# Before, we are able to generate the relevant SnQL for a derived metric, we need to
# validate that this instance of SingularEntityDerivedMetric is built from constituent
# metrics that span a single entity
Expand All @@ -600,14 +642,16 @@ def generate_select_statements(self, projects: Sequence[Project]) -> List[Functi
return self.__recursively_generate_select_snql(org_id, derived_metric_mri=self.metric_mri)

def generate_orderby_clause(
self, direction: Direction, projects: Sequence[Project]
self, direction: Direction, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[OrderBy]:
if not projects:
self._raise_entity_validation_exception("generate_orderby_clause")
self.get_entity(projects=projects)
return [
OrderBy(
self.generate_select_statements(projects=projects)[0],
self.generate_select_statements(
projects=projects, query_definition=query_definition
)[0],
direction,
)
]
Expand Down Expand Up @@ -646,13 +690,16 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
def generate_metric_ids(self, projects: Sequence[Project]) -> Set[Any]:
raise NotSupportedOverCompositeEntityException()

def generate_select_statements(self, projects: Sequence[Project]) -> List[Function]:
def generate_select_statements(
self, projects: Sequence[Project], query_definition: QueryDefinition
) -> List[Function]:
raise NotSupportedOverCompositeEntityException()

def generate_orderby_clause(
self,
direction: Direction,
projects: Sequence[Project],
query_definition: QueryDefinition,
) -> List[OrderBy]:
raise NotSupportedOverCompositeEntityException(
f"It is not possible to orderBy field "
Expand Down Expand Up @@ -1025,6 +1072,7 @@ def run_post_query_function(
can_orderby=False,
query_definition_args=["histogram_from", "histogram_to", "histogram_buckets"],
post_query_func=rebucket_histogram,
filter_conditions_func=zoom_histogram,
)
]
}
Expand Down
70 changes: 69 additions & 1 deletion src/sentry/snuba/metrics/fields/histogram.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import math
from typing import List, Optional, Tuple

import sentry_sdk
from snuba_sdk import Column, Function

ClickhouseHistogram = List[Tuple[float, float, float]]

Expand Down Expand Up @@ -58,4 +60,70 @@ def rebucket_histogram(
rv_height = overlap_perc * height
rv[lower_target, upper_target] += rv_height

return [(lower, upper, height) for (lower, upper), height in rv.items()]
return [(lower, upper, math.ceil(height)) for (lower, upper), height in rv.items()]


def zoom_histogram(
org_id: int,
histogram_buckets: int,
histogram_from: Optional[float] = None,
histogram_to: Optional[float] = None,
) -> Optional[Function]:
# The histogram "zoom" function is only there to limit the number of
# histogram merge states we have to merge in order to get greater accuracy
# on lower zoom levels. Since the maximum number of histogram buckets in
# ClickHouse is a constant number (250), any row we can filter out
# before aggregation is a potential win in accuracy.
#
# We do two things:
#
# - We throw away any buckets whose maximum value is lower than
# histogram_from, as we know there are no values in those buckets that
# overlap with our zoom range.
# - We throw away any buckets whose minimum value is higher than
# histogram_to, for the same reason.
#
# Note that we may still end up merging histogram states whose min/max
# bounds are not strictly within the user-defined bounds
# histogram_from/histogram_to. It is the job of rebucket_histogram to get
# rid of those extra datapoints in query results.
#
# We use `arrayReduce("maxMerge", [max])` where one would typically write
# `maxMerge(max)`, or maybe `maxMergeArray([max])` This is because
# ClickHouse appears to have some sort of internal limitation where nested
# aggregate functions are disallowed even if they would make sense, at the
# same time ClickHouse doesn't appear to be clever enough to detect this
# constellation in all cases. The following ClickHouse SQL is invalid:
#
# select histogramMergeIf(histogram_buckets, maxMerge(max) >= 123)
#
# yet somehow this is fine:
#
# select histogramMergeIf(histogram_buckets, arrayReduce('maxMerge', [max]) >= 123)
#
# We can't put this sort of filter in the where-clause as the metrics API
# allows for querying histograms alongside other kinds of data, so almost
# all user-defined filters end up in a -If aggregate function.
conditions = []
if histogram_from is not None:
conditions.append(
Function(
"greaterOrEquals",
[Function("arrayReduce", ["maxMerge", [Column("max")]]), histogram_from],
)
)

if histogram_to is not None:
conditions.append(
Function(
"lessOrEquals",
[Function("arrayReduce", ["minMerge", [Column("min")]]), histogram_to],
)
)

if len(conditions) == 1:
return conditions[0]
elif conditions:
return Function("and", conditions)
else:
return None
6 changes: 4 additions & 2 deletions src/sentry/snuba/metrics/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _build_orderby(self) -> Optional[List[OrderBy]]:
(op, metric_mri), direction = self._query_definition.orderby
metric_field_obj = metric_object_factory(op, metric_mri)
return metric_field_obj.generate_orderby_clause(
projects=self._projects, direction=direction
projects=self._projects, direction=direction, query_definition=self._query_definition
)

def __build_totals_and_series_queries(
Expand Down Expand Up @@ -467,7 +467,9 @@ def get_snuba_queries(self):
metric_ids_set = set()
for op, name in fields:
metric_field_obj = metric_mri_to_obj_dict[(op, name)]
select += metric_field_obj.generate_select_statements(projects=self._projects)
select += metric_field_obj.generate_select_statements(
projects=self._projects, query_definition=self._query_definition
)
metric_ids_set |= metric_field_obj.generate_metric_ids(self._projects)

where_for_entity = [
Expand Down
57 changes: 54 additions & 3 deletions tests/sentry/api/endpoints/test_organization_metric_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,58 @@ def test_histogram(self):
histogramFrom="2",
)

hist = [(2.0, 4.0, 2.0), (4.0, 6.0, 2.5)]
hist = [(2.0, 4.0, 2), (4.0, 6.0, 3)]

assert response.data["groups"] == [
{
"by": {},
"totals": {f"histogram({TransactionMetricKey.MEASUREMENTS_LCP.value})": hist},
}
]

def test_histogram_zooming(self):
# Record some strings
org_id = self.organization.id
tag1 = indexer.record(org_id, "tag1")
value1 = indexer.record(org_id, "value1")
value2 = indexer.record(org_id, "value2")

self._send_buckets(
[
{
"org_id": org_id,
"project_id": self.project.id,
"metric_id": self.transaction_lcp_metric,
"timestamp": int(time.time()),
"type": "d",
"value": numbers,
"tags": {tag: value},
"retention_days": 90,
}
for tag, value, numbers in (
(tag1, value1, [1, 2, 3]),
(tag1, value2, [10, 100, 1000]),
)
],
entity="metrics_distributions",
)

# Note: everything is a string here on purpose to ensure we parse ints properly
response = self.get_success_response(
self.organization.slug,
field=f"histogram({TransactionMetricKey.MEASUREMENTS_LCP.value})",
statsPeriod="1h",
interval="1h",
includeSeries="0",
histogramBuckets="2",
histogramTo="9",
)

# if zoom_histogram were not called, the variable-width
# HdrHistogram buckets returned from clickhouse would be so
# inaccurate that we would accidentally return something
# else: (5.0, 9.0, 1)
hist = [(1.0, 5.0, 3), (5.0, 9.0, 0)]

assert response.data["groups"] == [
{
Expand Down Expand Up @@ -2291,7 +2342,7 @@ def test_histogram_session_duration(self):
histogramBuckets="2",
histogramFrom="2",
)
hist = [(2.0, 5.5, 3.5), (5.5, 9.0, 4.0)]
hist = [(2.0, 5.5, 4), (5.5, 9.0, 4)]
assert response.data["groups"] == [
{
"by": {},
Expand All @@ -2309,7 +2360,7 @@ def test_histogram_session_duration(self):
histogramBuckets="2",
histogramFrom="2",
)
hist = [(2.0, 4.0, 2.0), (4.0, 6.0, 2.5)]
hist = [(2.0, 4.0, 2), (4.0, 6.0, 3)]
assert response.data["groups"] == [
{
"by": {},
Expand Down
Loading

0 comments on commit 9bf8c1e

Please sign in to comment.