Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Consistent SQL Query Generation #1015

Merged
merged 19 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240131-155100.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add Support for Consistent SQL Query Generation
time: 2024-01-31T15:51:00.673728-08:00
custom:
Author: plypaul
Issue: "1020"
101 changes: 0 additions & 101 deletions metricflow/dag/id_generation.py

This file was deleted.

105 changes: 101 additions & 4 deletions metricflow/dag/id_prefix.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,114 @@
from __future__ import annotations

from enum import Enum
from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass
from enum import Enum, EnumMeta

from typing_extensions import override

class IdPrefix(Enum):
"""Enumerates the prefixes used for generating IDs.

TODO: Move all ID prefixes here.
class IdPrefix(ABC):
"""A prefix for generating IDs. e.g. prefix=foo -> id=foo_0."""

@property
@abstractmethod
def str_value(self) -> str:
"""Return the string value of this ID prefix."""
raise NotImplementedError


class EnumMetaClassHelper(ABCMeta, EnumMeta):
"""Metaclass to allow subclassing of IdPrefix / Enum.

Without this, you'll get an error:

Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all
its bases

since Enum has a special metaclass.
"""

pass


class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see.

The other option is to make IdPrefix a protocol, but we've been over the tradeoffs there and I think having it be a superclass makes sense even though enum-type subclasses are a bit weird.

"""Enumerates the prefixes used for generating IDs."""

DATAFLOW_NODE_AGGREGATE_MEASURES_ID_PREFIX = "am"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit is a thing of beauty.

DATAFLOW_NODE_COMPUTE_METRICS_ID_PREFIX = "cm"
DATAFLOW_NODE_JOIN_AGGREGATED_MEASURES_BY_GROUPBY_COLUMNS_PREFIX = "jamgc"
DATAFLOW_NODE_JOIN_TO_STANDARD_OUTPUT_ID_PREFIX = "jso"
DATAFLOW_NODE_JOIN_SELF_OVER_TIME_RANGE_ID_PREFIX = "jotr"
DATAFLOW_NODE_ORDER_BY_LIMIT_ID_PREFIX = "obl"
DATAFLOW_NODE_PASS_FILTER_ELEMENTS_ID_PREFIX = "pfe"
DATAFLOW_NODE_READ_SQL_SOURCE_ID_PREFIX = "rss"
DATAFLOW_NODE_WHERE_CONSTRAINT_ID_PREFIX = "wcc"
DATAFLOW_NODE_WRITE_TO_RESULT_DATAFRAME_ID_PREFIX = "wrd"
DATAFLOW_NODE_WRITE_TO_RESULT_TABLE_ID_PREFIX = "wrt"
DATAFLOW_NODE_COMBINE_AGGREGATED_OUTPUTS_ID_PREFIX = "cao"
DATAFLOW_NODE_CONSTRAIN_TIME_RANGE_ID_PREFIX = "ctr"
DATAFLOW_NODE_SET_MEASURE_AGGREGATION_TIME = "sma"
DATAFLOW_NODE_SEMI_ADDITIVE_JOIN_ID_PREFIX = "saj"
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX = "jts"
DATAFLOW_NODE_MIN_MAX_ID_PREFIX = "mm"
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
SQL_EXPR_FUNCTION_ID_PREFIX = "fnc"
SQL_EXPR_PERCENTILE_ID_PREFIX = "perc"
SQL_EXPR_STRING_ID_PREFIX = "str"
SQL_EXPR_NULL_PREFIX = "null"
SQL_EXPR_LOGICAL_OPERATOR_PREFIX = "lo"
SQL_EXPR_STRING_LITERAL_PREFIX = "sl"
SQL_EXPR_IS_NULL_PREFIX = "isn"
SQL_EXPR_CAST_TO_TIMESTAMP_PREFIX = "ctt"
SQL_EXPR_DATE_TRUNC = "dt"
SQL_EXPR_SUBTRACT_TIME_INTERVAL_PREFIX = "sti"
SQL_EXPR_EXTRACT = "ex"
SQL_EXPR_RATIO_COMPUTATION = "rc"
SQL_EXPR_BETWEEN_PREFIX = "betw"
SQL_EXPR_WINDOW_FUNCTION_ID_PREFIX = "wfnc"
SQL_EXPR_GENERATE_UUID_PREFIX = "uuid"

SQL_PLAN_SELECT_STATEMENT_ID_PREFIX = "ss"
SQL_PLAN_TABLE_FROM_CLAUSE_ID_PREFIX = "tfc"

EXEC_NODE_READ_SQL_QUERY = "rsq"
EXEC_NODE_NOOP = "noop"
EXEC_NODE_WRITE_TO_TABLE = "wtt"

# Group by item resolution
GROUP_BY_ITEM_RESOLUTION_DAG = "gbir"
QUERY_GROUP_BY_ITEM_RESOLUTION_NODE = "qr"
METRIC_GROUP_BY_ITEM_RESOLUTION_NODE = "mtr"
MEASURE_GROUP_BY_ITEM_RESOLUTION_NODE = "msr"
VALUES_GROUP_BY_ITEM_RESOLUTION_NODE = "vr"

DATAFLOW_PLAN_PREFIX = "dfp"
OPTIMIZED_DATAFLOW_PLAN_PREFIX = "dfpo"
SQL_QUERY_PLAN_PREFIX = "sqp"
EXEC_PLAN_PREFIX = "ep"

MF_DAG = "mfd"

TIME_SPINE_SOURCE = "time_spine_src"
SUB_QUERY = "subq"

@property
@override
def str_value(self) -> str:
return self.value


@dataclass(frozen=True)
class DynamicIdPrefix(IdPrefix):
"""ID prefixes based on any string value."""

prefix: str

@property
@override
def str_value(self) -> str:
return self.prefix
13 changes: 8 additions & 5 deletions metricflow/dag/mf_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
import jinja2

from metricflow.dag.dag_to_text import MetricFlowDagTextFormatter
from metricflow.dag.id_generation import IdGeneratorRegistry
from metricflow.dag.id_prefix import IdPrefix
from metricflow.dag.prefix_id import PrefixIdGenerator
from metricflow.dag.sequential_id import SequentialIdGenerator
from metricflow.visitor import VisitorOutputT

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,7 +41,7 @@ def __repr__(self) -> str: # noqa: D

@staticmethod
def create_unique(id_prefix: IdPrefix) -> NodeId: # noqa: D
return NodeId(str(PrefixIdGenerator.create_next_id(id_prefix)))
return NodeId(str(SequentialIdGenerator.create_next_id(id_prefix)))


class DagNodeVisitor(Generic[VisitorOutputT], ABC):
Expand Down Expand Up @@ -96,7 +95,7 @@ def __repr__(self) -> str: # noqa: D

@classmethod
@abstractmethod
def id_prefix(cls) -> str:
def id_prefix(cls) -> IdPrefix:
"""The prefix to use when generating IDs for nodes.

e.g. a prefix of "my_node" will generate an ID like "my_node_0"
Expand All @@ -106,7 +105,7 @@ def id_prefix(cls) -> str:
@classmethod
def create_unique_id(cls) -> NodeId:
"""Create and return a unique identifier to use when creating nodes."""
return NodeId(IdGeneratorRegistry.for_class(cls).create_id(cls.id_prefix()))
return NodeId(id_str=SequentialIdGenerator.create_next_id(cls.id_prefix()).str_value)

def accept_dag_node_visitor(self, visitor: DagNodeVisitor[VisitorOutputT]) -> VisitorOutputT:
"""Visit this node."""
Expand Down Expand Up @@ -171,6 +170,10 @@ def from_str(id_str: str) -> DagId:
"""Migration helper to create DAG IDs."""
return DagId(id_str)

@staticmethod
def from_id_prefix(id_prefix: IdPrefix) -> DagId: # noqa: D
return DagId(id_str=SequentialIdGenerator.create_next_id(id_prefix).str_value)


DagNodeT = TypeVar("DagNodeT", bound=DagNode)

Expand Down
26 changes: 17 additions & 9 deletions metricflow/dag/prefix_id.py → metricflow/dag/sequential_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,35 @@ class SequentialId:
id_prefix: IdPrefix
index: int

@override
def __str__(self) -> str:
return f"{self.id_prefix.value}_{self.index}"
@property
def str_value(self) -> str: # noqa: D
return f"{self.id_prefix.str_value}_{self.index}"

@override
def __repr__(self) -> str:
return self.str_value

class PrefixIdGenerator:
"""Generate ID values based on an ID prefix.

TODO: Migrate ID generation use cases to this class.
"""
class SequentialIdGenerator:
"""Generates sequential ID values based on a prefix."""

DEFAULT_START_VALUE = 0
_default_start_value = 0
_state_lock = threading.Lock()
_prefix_to_next_value: Dict[IdPrefix, int] = {}

@classmethod
def create_next_id(cls, id_prefix: IdPrefix) -> SequentialId: # noqa: D
with cls._state_lock:
if id_prefix not in cls._prefix_to_next_value:
cls._prefix_to_next_value[id_prefix] = cls.DEFAULT_START_VALUE
cls._prefix_to_next_value[id_prefix] = cls._default_start_value
index = cls._prefix_to_next_value[id_prefix]
cls._prefix_to_next_value[id_prefix] = index + 1

return SequentialId(id_prefix, index)

@classmethod
def reset(cls, default_start_value: int = 0) -> None:
"""Resets the numbering of the generated IDs so that it starts at the given value."""
with cls._state_lock:
cls._prefix_to_next_value = {}
cls._default_start_value = default_start_value
12 changes: 5 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from dbt_semantic_interfaces.validations.unique_valid_name import MetricFlowReservedKeywords

from metricflow.dag.id_generation import DATAFLOW_PLAN_PREFIX, IdGeneratorRegistry
from metricflow.dag.id_prefix import StaticIdPrefix
from metricflow.dag.mf_dag import DagId
from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver
from metricflow.dataflow.builder.node_evaluator import (
JoinLinkableInstancesRecipe,
Expand Down Expand Up @@ -206,9 +207,8 @@ def _build_plan(
output_selection_specs=output_selection_specs,
)

plan_id = IdGeneratorRegistry.for_class(DataflowPlanBuilder).create_id(DATAFLOW_PLAN_PREFIX)

plan = DataflowPlan(plan_id=plan_id, sink_output_nodes=[sink_node])
plan_id = DagId.from_id_prefix(StaticIdPrefix.DATAFLOW_PLAN_PREFIX)
plan = DataflowPlan(sink_output_nodes=[sink_node], plan_id=plan_id)
for optimizer in optimizers:
logger.info(f"Applying {optimizer.__class__.__name__}")
try:
Expand Down Expand Up @@ -646,9 +646,7 @@ def _build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> Da
parent_node=output_node, order_by_specs=query_spec.order_by_specs, limit=query_spec.limit
)

plan_id = IdGeneratorRegistry.for_class(DataflowPlanBuilder).create_id(DATAFLOW_PLAN_PREFIX)

return DataflowPlan(plan_id=plan_id, sink_output_nodes=[sink_node])
return DataflowPlan(sink_output_nodes=[sink_node])

@staticmethod
def build_sink_node(
Expand Down
Loading
Loading