Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predicate Pushdown for video table #273

Merged
merged 24 commits into from
Jul 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d1e2485
minor fixes
gaurav274 Jul 3, 2022
e7c8425
minor fix
gaurav274 Jul 3, 2022
8167c80
minor fix
gaurav274 Jul 3, 2022
0154650
merged upstream
gaurav274 Jul 4, 2022
24c2c25
feat(binder): add support for select *
xzdandy Jul 4, 2022
0227f7b
style: remove print and comment
xzdandy Jul 4, 2022
20716d8
Merge branch 'master' of github.com:georgia-tech-db/eva
gaurav274 Jul 5, 2022
d171598
docs(binder): add docstrings for functions in
xzdandy Jul 5, 2022
6d66454
test(binder): add intergration testcases for select * in nested queries.
xzdandy Jul 5, 2022
f954441
test(intergration) fix select star testcases for real video
xzdandy Jul 5, 2022
81201aa
fix(binder): Only support SELECT * when * is alone
xzdandy Jul 5, 2022
f3cf780
bug(binder): alais support for join and lateral join?
xzdandy Jul 5, 2022
ad6933b
style: minor fix
xzdandy Jul 5, 2022
c610d2e
feat(binder): new SELECT * implmentation works with JOIN
xzdandy Jul 6, 2022
14ba7fc
Merge branch 'select-star' of github.com:xzdandy/eva
gaurav274 Jul 6, 2022
54f5deb
fix: Binder fix for lateral joins
gaurav274 Jul 7, 2022
17c8750
feat: predicate pushdown
gaurav274 Jul 27, 2022
b6725e7
feat: predicate pushdown for video table
gaurav274 Jul 27, 2022
514c677
merge: master
gaurav274 Jul 27, 2022
fa8fd02
Merge branch 'master' of github.com:georgia-tech-db/eva
gaurav274 Jul 27, 2022
204594d
Merge branch 'master' into pushdown
gaurav274 Jul 27, 2022
c39228e
test: add new test case for expression utils
gaurav274 Jul 28, 2022
47494ca
Merge branch 'master' of github.com:georgia-tech-db/eva
gaurav274 Jul 29, 2022
973d032
style: run formatter.py
gaurav274 Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions eva/executor/create_mat_view_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ def exec(self):
"""Create materialized view executor"""
if not handle_if_not_exists(self.node.view, self.node.if_not_exists):
child = self.children[0]
project_cols = None
# only support seq scan based materialization
if child.node.opr_type != PlanOprType.SEQUENTIAL_SCAN:
err_msg = "Invalid query {}, expected {}".format(
child.node.opr_type, PlanOprType.SEQUENTIAL_SCAN
if child.node.opr_type == PlanOprType.SEQUENTIAL_SCAN:
project_cols = child.project_expr
elif child.node.opr_type == PlanOprType.PROJECT:
project_cols = child.target_list
else:
err_msg = "Invalid query {}, expected {} or {}".format(
child.node.opr_type,
PlanOprType.SEQUENTIAL_SCAN,
PlanOprType.PROJECT,
)

logger.error(err_msg)
raise RuntimeError(err_msg)

# gather child projected column objects
child_objs = []
for child_col in child.project_expr:
for child_col in project_cols:
if child_col.etype == ExpressionType.TUPLE_VALUE:
child_objs.append(child_col.col_object)
elif child_col.etype == ExpressionType.FUNCTION_EXPRESSION:
Expand Down
6 changes: 5 additions & 1 deletion eva/executor/storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def validate(self):

def exec(self) -> Iterator[Batch]:
if self.node.video.is_video:
return VideoStorageEngine.read(self.node.video, self.node.batch_mem_size)
return VideoStorageEngine.read(
self.node.video,
self.node.batch_mem_size,
predicate=self.node.predicate,
)
else:
return StorageEngine.read(self.node.video, self.node.batch_mem_size)
240 changes: 239 additions & 1 deletion eva/expression/expression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eva.expression.abstract_expression import ExpressionType

from typing import List

from eva.expression.abstract_expression import AbstractExpression, ExpressionType
from eva.expression.comparison_expression import ComparisonExpression
from eva.expression.constant_value_expression import ConstantValueExpression
from eva.expression.logical_expression import LogicalExpression
from eva.expression.tuple_value_expression import TupleValueExpression


def expression_tree_to_conjunction_list(expression_tree):
Expand All @@ -28,3 +35,234 @@ def expression_tree_to_conjunction_list(expression_tree):
expression_list.append(expression_tree)

return expression_list


def conjuction_list_to_expression_tree(
expression_list: List[AbstractExpression],
) -> AbstractExpression:
"""Convert expression list to expression tree wuing conjuction connector

Args:
expression_list (List[AbstractExpression]): list of conjunctives

Returns:
AbstractExpression: expression tree
"""
if len(expression_list) == 0:
return None
prev_expr = expression_list[0]
for expr in expression_list[1:]:
prev_expr = LogicalExpression(ExpressionType.LOGICAL_AND, prev_expr, expr)
return prev_expr


def extract_range_list_from_comparison_expr(
expr: ComparisonExpression, lower_bound: int, upper_bound: int
) -> List:
"""Extracts the valid range from the comparison expression.
The expression needs to be amongst <, >, <=, >=, =, !=.

Args:
expr (ComparisonExpression): comparison expression with two children
that are leaf expression nodes. If the input doesnot match,
the function return False
lower_bound (int): lower bound of the comparison predicate
upper_bound (int): upper bound of the comparison predicate

Returns:
List[Tuple(int)]: list of valid ranges

Raises:
RuntimeError: Invalid expression

Example:
extract_range_from_comparison_expr(id < 10, 0, inf): True, [(0,9)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we input inf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inf is just an example. For actual code, we would always know the upper and lower bound. The ultimate goal is to get it from catalog and pass it down the pipeline.

"""

if not isinstance(expr, ComparisonExpression):
raise RuntimeError(f"Expected Comparision Expression, got {type(expr)}")
left = expr.children[0]
right = expr.children[1]
expr_type = expr.etype
val = None
const_first = False
if isinstance(left, TupleValueExpression) and isinstance(
right, ConstantValueExpression
):
val = right.value
elif isinstance(left, ConstantValueExpression) and isinstance(
right, TupleValueExpression
):
val = left.value
const_first = True
else:
raise RuntimeError(
f"Only supports extracting range from Comparision Expression \
with two children TupleValueExpression and \
ConstantValueExpression, got {left} and {right}"
)

if const_first:
if expr_type is ExpressionType.COMPARE_GREATER:
expr_type = ExpressionType.COMPARE_LESSER
elif expr_type is ExpressionType.COMPARE_LESSER:
expr_type = ExpressionType.COMPARE_GREATER
elif expr_type is ExpressionType.COMPARE_GEQ:
expr_type = ExpressionType.COMPARE_LEQ
elif expr_type is ExpressionType.COMPARE_LEQ:
expr_type = ExpressionType.COMPARE_GEQ

valid_ranges = []
if expr_type == ExpressionType.COMPARE_EQUAL:
valid_ranges.append((val, val))
elif expr_type == ExpressionType.COMPARE_NEQ:
valid_ranges.append((lower_bound, val - 1))
valid_ranges.append((val + 1, upper_bound))
elif expr_type == ExpressionType.COMPARE_GREATER:
valid_ranges.append((val + 1, upper_bound))
elif expr_type == ExpressionType.COMPARE_GEQ:
valid_ranges.append((val, upper_bound))
elif expr_type == ExpressionType.COMPARE_LESSER:
valid_ranges.append((lower_bound, val - 1))
elif expr_type == ExpressionType.COMPARE_LEQ:
valid_ranges.append((lower_bound, val))
else:
raise RuntimeError(f"Unsupported Expression Type {expr_type}")
return valid_ranges


def extract_range_list_from_predicate(
predicate: AbstractExpression, lower_bound: int, upper_bound: int
) -> List:
"""The function converts the range predicate on the column in the
`predicate` to a list of [(start_1, end_1), ... ] pairs.
Assumes the predicate contains conditions on only one column

Args:
predicate (AbstractExpression): Input predicate to extract
valid ranges. The predicate should contain conditions on
only one columns, else it raise error.
lower_bound (int): lower bound of the comparison predicate
upper_bound (int): upper bound of the comparison predicate

Returns:
List[Tuple]: list of (start, end) pairs of valid ranges

Example:
id < 10 : [(0, 9)]
id > 5 AND id < 10 : [(6, 9)]
id < 10 OR id >20 : [(0, 9), (21, Inf)]
"""

def overlap(x, y):
overlap = (max(x[0], y[0]), min(x[1], y[1]))
if overlap[0] <= overlap[1]:
return overlap

def union(ranges: List):
# union all the ranges
reduced_list = []
for begin, end in sorted(ranges):
if reduced_list and reduced_list[-1][1] >= begin - 1:
reduced_list[-1] = (
reduced_list[-1][0],
max(reduced_list[-1][1], end),
)
else:
reduced_list.append((begin, end))
return reduced_list

if predicate.etype == ExpressionType.LOGICAL_AND:
left_ranges = extract_range_list_from_predicate(
predicate.children[0], lower_bound, upper_bound
)
right_ranges = extract_range_list_from_predicate(
predicate.children[1], lower_bound, upper_bound
)
valid_overlaps = []
for left_range in left_ranges:
for right_range in right_ranges:
over = overlap(left_range, right_range)
if over:
valid_overlaps.append(over)
return union(valid_overlaps)

elif predicate.etype == ExpressionType.LOGICAL_OR:
left_ranges = extract_range_list_from_predicate(
predicate.children[0], lower_bound, upper_bound
)
right_ranges = extract_range_list_from_predicate(
predicate.children[1], lower_bound, upper_bound
)
return union(left_ranges + right_ranges)

elif isinstance(predicate, ComparisonExpression):
return union(
extract_range_list_from_comparison_expr(predicate, lower_bound, upper_bound)
)

else:
raise RuntimeError(f"Contains unsuporrted expression {type(predicate)}")


def contains_single_column(predicate: AbstractExpression, column: str = None) -> bool:
"""Checks if predicate contains conditions on single predicate

Args:
predicate (AbstractExpression): predicate expression
column_alias (str): check if the single column matches
the input column_alias
Returns:
bool: True, if contains single predicate, else False
if predicate is None, return False
"""

def get_columns(predicate):
if isinstance(predicate, TupleValueExpression):
return set([predicate.col_alias])
cols = set()
for child in predicate.children:
child_cols = get_columns(child)
if len(child_cols):
cols.update(child_cols)
return cols

if not predicate:
return False

cols = get_columns(predicate)
if len(cols) == 1:
if column is None:
return True
pred_col = cols.pop()
if pred_col == column:
return True
return False


def is_simple_predicate(predicate: AbstractExpression) -> bool:
"""Checks if conditions in the predicate are on a single column and
only contains LogicalExpression, ComparisonExpression,
TupleValueExpression or ConstantValueExpression

Args:
predicate (AbstractExpression): predicate expression to check

Returns:
bool: True, if it is a simple predicate, lese False
"""

def _has_simple_expressions(expr):
simple = type(expr) in simple_expressions
for child in expr.children:
simple = simple and _has_simple_expressions(child)
return simple

simple_expressions = [
LogicalExpression,
ComparisonExpression,
TupleValueExpression,
ConstantValueExpression,
]

return _has_simple_expressions(predicate) and contains_single_column(predicate)
41 changes: 40 additions & 1 deletion eva/optimizer/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

from eva.catalog.catalog_manager import CatalogManager
from eva.expression.abstract_expression import AbstractExpression, ExpressionType
from eva.expression.expression_utils import expression_tree_to_conjunction_list
from eva.expression.expression_utils import (
conjuction_list_to_expression_tree,
contains_single_column,
expression_tree_to_conjunction_list,
is_simple_predicate,
)
from eva.parser.create_statement import ColumnDefinition
from eva.utils.logging_manager import logger

Expand Down Expand Up @@ -80,3 +85,37 @@ def extract_equi_join_keys(
right_join_keys.append(left_child)

return (left_join_keys, right_join_keys)


def extract_pushdown_predicate(
predicate: AbstractExpression, column_alias: str
) -> Tuple[AbstractExpression, AbstractExpression]:
"""Decompose the predicate into pushdown predicate and remaining predicate

Args:
predicate (AbstractExpression): predicate that needs to be decomposed
column (str): column_alias to extract predicate
Returns:
Tuple[AbstractExpression, AbstractExpression]: (pushdown predicate,
remaining predicate)
"""
if predicate is None:
return None, None

if contains_single_column(predicate, column_alias):
if is_simple_predicate(predicate):
return predicate, None

pushdown_preds = []
rem_pred = []
pred_list = expression_tree_to_conjunction_list(predicate)
for pred in pred_list:
if contains_single_column(pred, column_alias) and is_simple_predicate(pred):
pushdown_preds.append(pred)
else:
rem_pred.append(pred)

return (
conjuction_list_to_expression_tree(pushdown_preds),
conjuction_list_to_expression_tree(rem_pred),
)
Loading