Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 5646 #15

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions clients/client-python/gravitino/api/expressions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List

from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.expression import Expression


class Distribution(Expression):
"""An interface that defines how data is distributed across partitions."""

def strategy(self) -> Strategy:
"""
Return the distribution strategy name.
"""
raise NotImplementedError

def number(self) -> int:
"""
Return the number of buckets/distribution.
For example, if the distribution strategy is HASH
* and the number is 10, then the data is distributed across 10 buckets.
"""
raise NotImplementedError

def expressions(self) -> List[Expression]:
"""Return the expressions passed to the distribution function."""
raise NotImplementedError

def children(self) -> List[Expression]:
return self.expressions()

def equals(self, distribution) -> bool:
"""
Indicates whether some other object is "equal to" this one.

Args:
distribution The reference distribution object with which to compare.

Returns:
Returns true if this object is the same as the obj argument; false otherwise.
"""
if distribution is None:
return False

return (
self.strategy() == distribution.strategy()
and self.number() == distribution.number()
and self.expressions() == distribution.expressions()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List, Tuple

from gravitino.api.expressions.distributions.distribution import Distribution
from gravitino.api.expressions.distributions.strategy import Strategy
from gravitino.api.expressions.expression import Expression
from gravitino.api.expressions.named_reference import NamedReference


class DistributionImpl(Distribution):
"""
Create a distribution on columns. Like distribute by (a) or (a, b), for complex like
distributing by (func(a), b) or (func(a), func(b))
"""

_strategy: Strategy
_number: int
_expressions: List[Expression]

def __init__(self, strategy: Strategy, number: int, expressions: List[Expression]):
self._strategy = strategy
self._number = number
self._expressions = expressions

def strategy(self) -> Strategy:
return self._strategy

def number(self) -> int:
return self._number

def expressions(self) -> List[Expression]:
return self._expressions

def __str__(self):
return f"DistributionImpl(strategy={self._strategy}, number={self._number}, expressions={self._expressions})"

def __eq__(self, other):
if self is other:
return True
if other is None or not isinstance(other, DistributionImpl):
return False
return (
self._strategy == other.strategy()
and self._number == other.number()
and self._expressions == other.expressions()
)

def __hash__(self):
return hash((self._strategy, self._number, tuple(self._expressions)))


# Helper methods to create distributions
class Distributions:
NONE = DistributionImpl(Strategy.NONE, 0, [])
"""NONE is used to indicate that there is no distribution."""
HASH = DistributionImpl(Strategy.HASH, 0, [])
"""List bucketing strategy hash, TODO: #1505 Separate the bucket number from the Distribution."""
RANGE = DistributionImpl(Strategy.RANGE, 0, [])
"""List bucketing strategy range, TODO: #1505 Separate the bucket number from the Distribution."""

@staticmethod
def even(number: int, *expressions) -> Distribution:
"""
Create a distribution by evenly distributing the data across the number of buckets.

:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created even distribution
"""
return DistributionImpl(Strategy.EVEN, number, list(expressions))

@staticmethod
def hash(number: int, *expressions) -> Distribution:
"""
Create a distribution by hashing the data across the number of buckets.

:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created hash distribution
"""
return DistributionImpl(Strategy.HASH, number, list(expressions))

@staticmethod
def of(strategy: Strategy, number: int, *expressions) -> Distribution:
"""
Create a distribution by the given strategy.

:param strategy: The strategy to use
:param number: The number of buckets
:param expressions: The expressions to distribute by
:return: The created distribution
"""
return DistributionImpl(strategy, number, list(expressions))

@staticmethod
def fields(
strategy: Strategy, number: int, *field_names: Tuple[str]
) -> Distribution:
"""
Create a distribution on columns. Like distribute by (a) or (a, b), for complex like
distributing by (func(a), b) or (func(a), func(b)), please use DistributionImpl.Builder to create.

NOTE: a, b, c are column names.

SQL syntax: distribute by hash(a, b) buckets 5
fields(Strategy.HASH, 5, "a", "b")

SQL syntax: distribute by hash(a, b, c) buckets 10
fields(Strategy.HASH, 10, "a", "b", "c")

SQL syntax: distribute by EVEN(a) buckets 128
fields(Strategy.EVEN, 128, "a")

:param strategy: The strategy to use.
:param number: The number of buckets.
:param field_names: The field names to distribute by.
:return: The created distribution.
"""
expressions = [NamedReference.field(field_name) for field_name in field_names]
return Distributions.of(strategy, number, *expressions)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# Enum equivalent in Python for Strategy
class Strategy:
NONE = "NONE"
HASH = "HASH"
RANGE = "RANGE"
EVEN = "EVEN"

@staticmethod
def get_by_name(name: str):
name = name.upper()
if name == "NONE":
return Strategy.NONE
if name == "HASH":
return Strategy.HASH
if name == "RANGE":
return Strategy.RANGE
if name in ("EVEN", "RANDOM"):
return Strategy.EVEN
raise ValueError(
f"Invalid distribution strategy: {name}. "
f"Valid values are: {', '.join([Strategy.NONE, Strategy.HASH, Strategy.RANGE, Strategy.EVEN])}"
)
51 changes: 51 additions & 0 deletions clients/client-python/gravitino/api/expressions/expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Set, TYPE_CHECKING

if TYPE_CHECKING:
from gravitino.api.expressions.named_reference import NamedReference


class Expression(ABC):
"""Base class of the public logical expression API."""

EMPTY_EXPRESSION: List[Expression] = []
"""
`EMPTY_EXPRESSION` is only used as an input when the default `children` method builds the result.
"""

EMPTY_NAMED_REFERENCE: List[NamedReference] = []
"""
`EMPTY_NAMED_REFERENCE` is only used as an input when the default `references` method builds
the result array to avoid repeatedly allocating an empty array.
"""

@abstractmethod
def children(self) -> List[Expression]:
"""Returns a list of the children of this node. Children should not change."""
pass

def references(self) -> List[NamedReference]:
"""Returns a list of fields or columns that are referenced by this expression."""

ref_set: Set[NamedReference] = set()
for child in self.children():
ref_set.update(child.references())
return list(ref_set)
Loading