From 3c07ebf56f102892978eee75c0dfb351ee290bc6 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 10 May 2018 07:38:11 -0400 Subject: [PATCH] ENH: Refactor the pandas backend This is a major refactor of the pandas backend to support new functionality as well as to clean up and simplify the architecture. 1. `execute_first` is removed: This was originally useful for implementing the ``WindowOp`` rule, but is made obsolete by allowing ``ibis.client.Client``s to be data arguments in ``execute_node`` rules. 1. `data_preload` is removed: This was originally use to load in data from the pandas backend specifically, and was very much a special snowflake in the execute loop. This is also made obsolete by allowing ``ibis.client.Client``s to be data arguments in ``execute_node`` rules. 1. ``execute_literal`` was introduced to avoid the performance overhead of dispatching a call to ``execute_node`` for the few cases we have of evaluating ``ops.Literal`` nodes. 1. I've introduced a new ``inputs`` property to the ``Node`` class, which is the sequence of arguments required to evaluate an expression without any additional context. Currently, only ``WindowOp`` requires something different because it needs access to the column underlying the ``expr`` since you cannot blindly evaluate ``expr`` and get the correct result, all other ``Node`` subclasses define ``inputs`` as ``return self.args``. 1. Finally, I've added support for expressions in the `preceding` and `following` arguments of `WindowOp`. Author: Phillip Cloud Closes #1441 from cpcloud/pandas-execute-refactor and squashes the following commits: 168e016 [Phillip Cloud] ENH: Refactor the pandas backend --- ci/requirements-dev-2.7.yml | 1 + ci/requirements-dev-3.5.yml | 1 + ci/requirements-dev-3.6.yml | 1 + ci/requirements-docs-3.6.yml | 1 + ibis/bigquery/udf/tests/test_find.py | 4 +- ibis/compat.py | 2 + ibis/expr/api.py | 18 +- ibis/expr/format.py | 4 +- ibis/expr/operations.py | 46 +-- ibis/expr/types.py | 4 +- ibis/expr/window.py | 88 ++--- ibis/file/csv.py | 15 +- ibis/file/hdf5.py | 13 +- ibis/file/parquet.py | 13 +- ibis/pandas/aggcontext.py | 51 ++- ibis/pandas/core.py | 309 ++++++++++-------- ibis/pandas/dispatch.py | 70 +++- ibis/pandas/execution/generic.py | 22 +- ibis/pandas/execution/join.py | 3 +- ibis/pandas/execution/selection.py | 6 +- ibis/pandas/execution/tests/conftest.py | 24 +- ibis/pandas/execution/tests/test_functions.py | 9 +- ibis/pandas/execution/tests/test_join.py | 2 +- .../pandas/execution/tests/test_operations.py | 24 +- ibis/pandas/execution/tests/test_window.py | 101 +++++- ibis/pandas/execution/util.py | 6 +- ibis/pandas/execution/window.py | 54 ++- ibis/pandas/tests/test_client.py | 4 +- ibis/pandas/tests/test_core.py | 60 ++-- ibis/pandas/tests/test_udf.py | 2 - ibis/util.py | 34 +- 31 files changed, 605 insertions(+), 387 deletions(-) diff --git a/ci/requirements-dev-2.7.yml b/ci/requirements-dev-2.7.yml index 0defe3a496e2..f56a622409bf 100644 --- a/ci/requirements-dev-2.7.yml +++ b/ci/requirements-dev-2.7.yml @@ -26,6 +26,7 @@ dependencies: - pymysql - pytables - pytest + - pytest-xdist - python=2.7 - python-graphviz - python-hdfs>=2.0.16 diff --git a/ci/requirements-dev-3.5.yml b/ci/requirements-dev-3.5.yml index ce2af1155945..d371c185eeec 100644 --- a/ci/requirements-dev-3.5.yml +++ b/ci/requirements-dev-3.5.yml @@ -20,6 +20,7 @@ dependencies: - pyarrow>=0.6.0 - pymysql - pytest + - pytest-xdist - python=3.5 - python-graphviz - python-hdfs>=2.0.16 diff --git a/ci/requirements-dev-3.6.yml b/ci/requirements-dev-3.6.yml index a59e656de849..8fcfde2b2745 100644 --- a/ci/requirements-dev-3.6.yml +++ b/ci/requirements-dev-3.6.yml @@ -21,6 +21,7 @@ dependencies: - pymysql - pytables - pytest + - pytest-xdist - python=3.6 - python-graphviz - python-hdfs>=2.0.16 diff --git a/ci/requirements-docs-3.6.yml b/ci/requirements-docs-3.6.yml index 5c1a33835556..55a9febb0c78 100644 --- a/ci/requirements-docs-3.6.yml +++ b/ci/requirements-docs-3.6.yml @@ -25,6 +25,7 @@ dependencies: - pymysql - pytables - pytest + - pytest-xdist - python=3.6 - python-graphviz - python-hdfs>=2.0.16 diff --git a/ibis/bigquery/udf/tests/test_find.py b/ibis/bigquery/udf/tests/test_find.py index 3d0fef8cb97f..dc9740f01e2f 100644 --- a/ibis/bigquery/udf/tests/test_find.py +++ b/ibis/bigquery/udf/tests/test_find.py @@ -1,6 +1,6 @@ import ast from ibis.bigquery.udf.find import find_names -from ibis.util import is_sequence +from ibis.util import is_iterable def parse_expr(expr): @@ -17,7 +17,7 @@ def eq(left, right): if type(left) != type(right): return False - if is_sequence(left) and is_sequence(right): + if is_iterable(left) and is_iterable(right): return all(map(eq, left, right)) if not isinstance(left, ast.AST) and not isinstance(right, ast.AST): diff --git a/ibis/compat.py b/ibis/compat.py index 0aed2e8f8509..fd47dd129255 100644 --- a/ibis/compat.py +++ b/ibis/compat.py @@ -28,6 +28,7 @@ def viewkeys(x): from inspect import signature, Parameter, _empty import unittest.mock as mock range = range + map = map import builtins import pickle maketrans = str.maketrans @@ -45,6 +46,7 @@ def viewkeys(x): lzip = zip zip = itertools.izip zip_longest = itertools.izip_longest + map = itertools.imap def viewkeys(x): return x.viewkeys() diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 6018de8159f8..6cef012d7670 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -921,17 +921,15 @@ def _case(arg): string_col = Column[string*] 'string_col' from table ref_0 cases: - ValueList[string*] - Literal[string] - a - Literal[string] - b + Literal[string] + a + Literal[string] + b results: - ValueList[string*] - Literal[string] - an a - Literal[string] - a b + Literal[string] + an a + Literal[string] + a b default: Literal[string] null or (not a and not b) diff --git a/ibis/expr/format.py b/ibis/expr/format.py index 9008660ab3f0..cf3f3fc50c40 100644 --- a/ibis/expr/format.py +++ b/ibis/expr/format.py @@ -218,7 +218,7 @@ def visit(what, extra_indents=0): if not arg_names: for arg in op.args: - if isinstance(arg, list): + if util.is_iterable(arg): for x in arg: visit(x) else: @@ -230,7 +230,7 @@ def visit(what, extra_indents=0): name = None if name is not None: name = self._indent('{0}:'.format(name)) - if isinstance(arg, list): + if util.is_iterable(arg): if name is not None and len(arg) > 0: formatted_args.append(name) indents = 1 diff --git a/ibis/expr/operations.py b/ibis/expr/operations.py index b0389b857fca..254d2b229e87 100644 --- a/ibis/expr/operations.py +++ b/ibis/expr/operations.py @@ -4,16 +4,16 @@ import itertools import collections -from functools import partial from ibis.expr.schema import HasSchema, Schema -import ibis.util as util import ibis.common as com -import ibis.compat as compat import ibis.expr.types as ir import ibis.expr.rules as rlz import ibis.expr.schema as sch import ibis.expr.datatypes as dt + +from ibis import util, compat +from ibis.compat import functools, map from ibis.expr.signature import Annotable, Argument as Arg @@ -56,6 +56,10 @@ def _pp(x): return '%s(%s)' % (opname, ', '.join(pprint_args)) + @property + def inputs(self): + return tuple(self.args) + def blocks(self): # The contents of this node at referentially distinct and may not be # analyzed deeper @@ -134,13 +138,9 @@ def has_resolved_name(self): def all_equal(left, right, cache=None): - if isinstance(left, list): - if not isinstance(right, list): - return False - for a, b in zip(left, right): - if not all_equal(a, b, cache=cache): - return False - return True + if util.is_iterable(left): + return util.is_iterable(right) and all( + map(functools.partial(all_equal, cache=cache), left, right)) if hasattr(left, 'equals'): return left.equals(right, cache=cache) @@ -727,7 +727,7 @@ class Count(Reduction): where = Arg(rlz.boolean, default=None) def output_type(self): - return partial(ir.IntegerScalar, dtype=dt.int64) + return functools.partial(ir.IntegerScalar, dtype=dt.int64) class Arbitrary(Reduction): @@ -827,7 +827,7 @@ class HLLCardinality(Reduction): def output_type(self): # Impala 2.0 and higher returns a DOUBLE # return ir.DoubleScalar - return partial(ir.IntegerScalar, dtype=dt.int64) + return functools.partial(ir.IntegerScalar, dtype=dt.int64) class GroupConcat(Reduction): @@ -883,6 +883,10 @@ def over(self, window): new_window = self.window.combine(window) return WindowOp(self.expr, new_window) + @property + def inputs(self): + return self.expr.op().inputs[0], self.window + def root_tables(self): result = list(toolz.unique( toolz.concatv( @@ -901,7 +905,7 @@ def root_tables(self): class ShiftBase(AnalyticOp): arg = Arg(rlz.column(rlz.any)) - offset = Arg(rlz.integer, default=None) + offset = Arg(rlz.one_of((rlz.integer, rlz.interval)), default=None) default = Arg(rlz.any, default=None) output_type = rlz.typeof('arg') @@ -2202,7 +2206,7 @@ def output_type(self): class E(Constant): def output_type(self): - return partial(ir.FloatingScalar, dtype=dt.float64) + return functools.partial(ir.FloatingScalar, dtype=dt.float64) class TemporalUnaryOp(UnaryOp): @@ -2666,8 +2670,7 @@ class ExpressionList(Node): exprs = Arg(rlz.noop) def __init__(self, values): - values = list(map(rlz.any, values)) - super(ExpressionList, self).__init__(values) + super(ExpressionList, self).__init__(list(map(rlz.any, values))) def root_tables(self): return distinct_roots(self.exprs) @@ -2683,12 +2686,11 @@ class ValueList(ValueOp): display_argnames = False # disable showing argnames in repr def __init__(self, values): - values = list(map(rlz.any, values)) - super(ValueList, self).__init__(values) + super(ValueList, self).__init__(tuple(map(rlz.any, values))) + + def output_type(self): + dtype = rlz.highest_precedence_dtype(self.values) + return functools.partial(ir.ListExpr, dtype=dtype) def root_tables(self): return distinct_roots(*self.values) - - def _make_expr(self): - dtype = rlz.highest_precedence_dtype(self.values) - return ir.ListExpr(self, dtype=dtype) diff --git a/ibis/expr/types.py b/ibis/expr/types.py index a70d620958ff..626952f438fd 100644 --- a/ibis/expr/types.py +++ b/ibis/expr/types.py @@ -680,11 +680,11 @@ def __getitem__(self, key): return self.values[key] def __add__(self, other): - other_values = getattr(other, 'values', other) + other_values = tuple(getattr(other, 'values', other)) return type(self.op())(self.values + other_values).to_expr() def __radd__(self, other): - other_values = getattr(other, 'values', other) + other_values = tuple(getattr(other, 'values', other)) return type(self.op())(other_values + self.values).to_expr() def __bool__(self): diff --git a/ibis/expr/window.py b/ibis/expr/window.py index 21a2175fb715..90bda2a54666 100644 --- a/ibis/expr/window.py +++ b/ibis/expr/window.py @@ -1,17 +1,3 @@ -# Copyright 2014 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import ibis.expr.types as ir import ibis.expr.operations as ops import ibis.util as util @@ -58,38 +44,50 @@ def __init__(self, group_by=None, order_by=None, self._validate_frame() def _validate_frame(self): - p_tuple = has_p = False - f_tuple = has_f = False + preceding_tuple = has_preceding = False + following_tuple = has_following = False if self.preceding is not None: - p_tuple = isinstance(self.preceding, tuple) - has_p = True + preceding_tuple = isinstance(self.preceding, tuple) + has_preceding = True if self.following is not None: - f_tuple = isinstance(self.following, tuple) - has_f = True - - if ((p_tuple and has_f) or (f_tuple and has_p)): - raise com.IbisInputError('Can only specify one window side ' - ' when you want an off-center ' - 'window') - elif p_tuple: + following_tuple = isinstance(self.following, tuple) + has_following = True + + if ((preceding_tuple and has_following) or + (following_tuple and has_preceding)): + raise com.IbisInputError( + 'Can only specify one window side when you want an ' + 'off-center window' + ) + elif preceding_tuple: start, end = self.preceding if start is None: assert end >= 0 else: assert start > end - elif f_tuple: + elif following_tuple: start, end = self.following if end is None: assert start >= 0 else: assert start < end else: - if has_p and self.preceding < 0: - raise com.IbisInputError('Window offset must be positive') - - if has_f and self.following < 0: - raise com.IbisInputError('Window offset must be positive') + if not isinstance(self.preceding, ir.Expr): + if has_preceding and self.preceding < 0: + raise com.IbisInputError( + "'preceding' must be positive, got {}".format( + self.preceding + ) + ) + + if not isinstance(self.following, ir.Expr): + if has_following and self.following < 0: + raise com.IbisInputError( + "'following' must be positive, got {}".format( + self.following + ) + ) def bind(self, table): # Internal API, ensure that any unresolved expr references (as strings, @@ -128,32 +126,36 @@ def equals(self, other, cache=None): if cache is None: cache = {} - if (self, other) in cache: - return cache[(self, other)] - - if id(self) == id(other): - cache[(self, other)] = True + if self is other: + cache[self, other] = True return True if not isinstance(other, Window): - cache[(self, other)] = False + cache[self, other] = False return False + try: + return cache[self, other] + except KeyError: + pass + if (len(self._group_by) != len(other._group_by) or not ops.all_equal(self._group_by, other._group_by, cache=cache)): - cache[(self, other)] = False + cache[self, other] = False return False if (len(self._order_by) != len(other._order_by) or not ops.all_equal(self._order_by, other._order_by, cache=cache)): - cache[(self, other)] = False + cache[self, other] = False return False - equal = (self.preceding == other.preceding and - self.following == other.following) - cache[(self, other)] = equal + equal = ( + ops.all_equal(self.preceding, other.preceding, cache=cache) and + ops.all_equal(self.following, other.following, cache=cache) + ) + cache[self, other] = equal return equal diff --git a/ibis/file/csv.py b/ibis/file/csv.py index 6c1a02571058..a1d97d56ce0e 100644 --- a/ibis/file/csv.py +++ b/ibis/file/csv.py @@ -7,7 +7,7 @@ from ibis.compat import parse_version from ibis.file.client import FileClient from ibis.pandas.api import PandasDialect -from ibis.pandas.core import pre_execute, execute # noqa +from ibis.pandas.core import execute_node, pre_execute, execute from ibis.pandas.execution.selection import physical_tables @@ -92,20 +92,15 @@ def version(self): return parse_version(pd.__version__) -@pre_execute.register(CSVClient.table_class, CSVClient) -def csv_pre_execute_table(op, client, scope, **kwargs): - # cache - if isinstance(scope.get(op), pd.DataFrame): - return {} - +@execute_node.register(CSVClient.table_class, CSVClient) +def csv_read_table(op, client, scope, **kwargs): path = client.dictionary[op.name] df = _read_csv(path, schema=op.schema, header=0, **op.read_csv_kwargs) - - return {op: df} + return df @pre_execute.register(ops.Selection, CSVClient) -def csv_pre_execute(op, client, scope, **kwargs): +def csv_pre_execute_selection(op, client, scope, **kwargs): tables = filter(lambda t: t not in scope, physical_tables(op.table.op())) ops = {} diff --git a/ibis/file/hdf5.py b/ibis/file/hdf5.py index 2e3bc7eaca69..fffd7dc0367f 100644 --- a/ibis/file/hdf5.py +++ b/ibis/file/hdf5.py @@ -2,7 +2,7 @@ import ibis.expr.schema as sch import ibis.expr.operations as ops from ibis.file.client import FileClient -from ibis.pandas.core import pre_execute, execute # noqa +from ibis.pandas.core import execute_node, execute def connect(path): @@ -66,14 +66,9 @@ def list_databases(self, path=None): return self._list_databases_dirs_or_files(path) -@pre_execute.register(HDFClient.table_class, HDFClient) -def hdf_pre_execute_table(op, client, scope, **kwargs): - - # cache - if isinstance(scope.get(op), pd.DataFrame): - return {} - +@execute_node.register(HDFClient.table_class, HDFClient) +def hdf_read_table(op, client, scope, **kwargs): key = op.name path = client.dictionary[key] df = pd.read_hdf(str(path), key, mode='r') - return {op: df} + return df diff --git a/ibis/file/parquet.py b/ibis/file/parquet.py index bad2ec78155c..16448f355688 100644 --- a/ibis/file/parquet.py +++ b/ibis/file/parquet.py @@ -1,4 +1,3 @@ -import pandas as pd import pyarrow as pa import pyarrow.parquet as pq @@ -9,7 +8,7 @@ from ibis.compat import parse_version from ibis.file.client import FileClient from ibis.pandas.api import PandasDialect -from ibis.pandas.core import pre_execute, execute +from ibis.pandas.core import execute_node, execute dialect = PandasDialect @@ -104,13 +103,9 @@ def version(self): return parse_version(pa.__version__) -@pre_execute.register(ParquetClient.table_class, ParquetClient) -def parquet_pre_execute_client(op, client, scope, **kwargs): - # cache - if isinstance(scope.get(op), pd.DataFrame): - return {} - +@execute_node.register(ParquetClient.table_class, ParquetClient) +def parquet_read_table(op, client, scope, **kwargs): path = client.dictionary[op.name] table = pq.read_table(str(path)) df = table.to_pandas() - return {op: df} + return df diff --git a/ibis/pandas/aggcontext.py b/ibis/pandas/aggcontext.py index b43dd16b95e9..10b2d671060b 100644 --- a/ibis/pandas/aggcontext.py +++ b/ibis/pandas/aggcontext.py @@ -128,10 +128,10 @@ >>> window = ibis.cumulative_window(order_by=t.time) >>> t.value.sum().over(window) # doctest: +SKIP -Trailing -~~~~~~~~ +Moving +~~~~~~ -Also called rolling. +Also called referred to as "rolling" in other libraries such as pandas. SQL @@ -217,6 +217,14 @@ import six +from multipledispatch import Dispatcher + +import pandas as pd + +import ibis +import ibis.expr.datatypes as dt +import ibis.expr.types as ir + @six.add_metaclass(abc.ABCMeta) class AggregationContext(object): @@ -259,6 +267,25 @@ def agg(self, grouped_data, function, *args, **kwargs): return grouped_data.transform(function, *args, **kwargs) +compute_window_spec = Dispatcher('compute_window_spec') + + +@compute_window_spec.register(ir.Expr, dt.Interval) +def compute_window_spec_interval(expr, dtype): + value = ibis.pandas.execute(expr) + return pd.tseries.frequencies.to_offset(value) + + +@compute_window_spec.register(ir.Expr, dt.DataType) +def compute_window_spec_expr(expr, _): + return ibis.pandas.execute(expr) + + +@compute_window_spec.register(object, type(None)) +def compute_window_spec_default(obj, _): + return obj + + class Window(AggregationContext): __slots__ = 'construct_window', @@ -282,9 +309,11 @@ def agg(self, grouped_data, function, *args, **kwargs): try: method = self.short_circuit_method(grouped_data, function) except AttributeError: - method = getattr(self.construct_window(grouped_data), function) + window = self.construct_window(grouped_data) + method = getattr(window, function) - return method(*args, **kwargs) + result = method(*args, **kwargs) + return result class Cumulative(Window): @@ -298,14 +327,14 @@ def short_circuit_method(self, grouped_data, function): return getattr(grouped_data, 'cum{}'.format(function)) -class Trailing(Window): +class Moving(Window): __slots__ = () - def __init__(self, *args, **kwargs): - super(Trailing, self).__init__('rolling', *args, **kwargs) + def __init__(self, preceding, *args, **kwargs): + dtype = getattr(preceding, 'type', lambda: None)() + preceding = compute_window_spec(preceding, dtype) + super(Moving, self).__init__('rolling', preceding, *args, **kwargs) def short_circuit_method(self, grouped_data, function): - raise AttributeError( - 'No short circuit method for rolling operations' - ) + raise AttributeError('No short circuit method for rolling operations') diff --git a/ibis/pandas/core.py b/ibis/pandas/core.py index 7a3172f58a67..380abe590756 100644 --- a/ibis/pandas/core.py +++ b/ibis/pandas/core.py @@ -1,12 +1,10 @@ """The pandas backend is a departure from the typical ibis backend in that it -doesn't compile to anything, and the execution of the ibis expression coming -dervied from a PandasTable is under the purview of ibis itself rather than -sending something to a server and having it return the results. +doesn't compile to anything, and the execution of the ibis expression +is under the purview of ibis itself rather than executing SQL on a server. Design ------ - -Ibis uses a technique called `multiple dispatch +The pandas backend uses a technique called `multiple dispatch `_, implemented in a third-party open source library called `multipledispatch `_. @@ -16,16 +14,15 @@ Compilation ----------- -This is a no-op because we directly execute ibis expressions. +This is a no-op because we execute ibis expressions directly. Execution --------- - Execution is divided into different dispatched functions, each arising from different a use case. -A top level dispatched function named ``execute`` with two signatures exists -to provide a single API for executing an ibis expression. +A top level function `execute` exists to provide the API for executing an ibis +expression against in-memory data. The general flow of execution is: @@ -37,47 +34,50 @@ execute the current node with its executed arguments -Specifically, execute is comprised of 4 steps that happen at different times -during the loop. +Specifically, execute is comprised of a series of steps that happen at +different times during the loop. -1. ``data_preload`` -------------------- -First, data_preload is called. data_preload provides a way for an expression to -intercept the data and inject scope. By default it does nothing. - -2. ``pre_execute`` +1. ``pre_execute`` ------------------ - -Second, at the beginning of the main execution loop, ``pre_execute`` is called. +First, at the beginning of the main execution loop, ``pre_execute`` is called. This function serves a similar purpose to ``data_preload``, the key difference being that ``pre_execute`` is called *every time* there's a call to execute. By default this function does nothing. -3. ``execute_first`` --------------------- - -Third is ``execute_first``. This function gives an expression the opportunity -to define the entire flow of execution of an expression starting from the top -of the expression. - -This functionality was essential for implementing window functions in the -pandas backend - -By default this function does nothing. - -4. ``execute_node`` +2. ``execute_node`` ------------------- -Finally, when an expression is ready to be evaluated we call +Second, when an expression is ready to be evaluated we call :func:`~ibis.pandas.core.execute` on the expressions arguments and then :func:`~ibis.pandas.dispatch.execute_node` on the expression with its now-materialized arguments. + +3. ``post_execute`` +------------------- +The final step--``post_execute``--is called immediately after the previous call +to ``execute_node`` and takes the instance of the +:class:`~ibis.expr.operations.Node` just computed and the result of the +computation. + +The purpose of this function is to allow additional computation to happen in +the context of the current level of the execution loop. You might be wondering +That may sound vague, so let's look at an example. + +Let's say you want to take a three day rolling average, and you want to include +3 days of data prior to the first date of the input. You don't want to see that +data in the result for a few reasons, one of which is that it would break the +contract of window functions: given N rows of input there are N rows of output. + +Defining a ``post_execute`` rule for :class:`~ibis.expr.operations.WindowOp` +allows you to encode such logic. One might want to implement this using +:class:`~ibis.expr.operations.ScalarParameter`, in which case the ``scope`` +passed to ``post_execute`` would be the bound values passed in at the time the +``execute`` method was called. """ from __future__ import absolute_import -import collections import numbers import datetime @@ -87,16 +87,20 @@ import toolz +import ibis +import ibis.common as com + import ibis.expr.types as ir -import ibis.expr.lineage as lin +import ibis.expr.operations as ops import ibis.expr.datatypes as dt +import ibis.expr.window as win from ibis.compat import functools from ibis.client import find_backends import ibis.pandas.aggcontext as agg_ctx from ibis.pandas.dispatch import ( - execute, execute_node, execute_first, data_preload, pre_execute + execute_node, pre_execute, post_execute, execute_literal ) @@ -112,43 +116,22 @@ scalar_types = fixed_width_types + temporal_types simple_types = scalar_types + six.string_types - -def find_data(expr): - """Find data sources bound to `expr`. - - Parameters - ---------- - expr : ibis.expr.types.Expr - - Returns - ------- - data : collections.OrderedDict - """ - def finder(expr): - op = expr.op() - if hasattr(op, 'source'): - data = (op, op.source.dictionary.get(op.name, None)) - else: - data = None - return lin.proceed, data - - return collections.OrderedDict(lin.traverse(finder, expr)) - - -_VALID_INPUT_TYPES = (ir.Expr, dt.DataType, type(None)) + scalar_types +_VALID_INPUT_TYPES = ( + ibis.client.Client, ir.Expr, dt.DataType, type(None), win.Window +) + scalar_types -@execute.register(ir.Expr, dict) def execute_with_scope(expr, scope, context=None, **kwargs): """Execute an expression `expr`, with data provided in `scope`. Parameters ---------- - expr : ir.Expr + expr : ibis.expr.types.Expr The expression to execute. - scope : dict - A dictionary mapping :class:`~ibis.expr.types.Node` subclass instances - to concrete data such as a pandas DataFrame. + scope : collections.Mapping + A dictionary mapping :class:`~ibis.expr.operations.Node` subclass + instances to concrete data such as a pandas DataFrame. + context : Optional[ibis.pandas.aggcontext.AggregationContext] Returns ------- @@ -158,101 +141,171 @@ def execute_with_scope(expr, scope, context=None, **kwargs): # Call pre_execute, to allow clients to intercept the expression before # computing anything *and* before associating leaf nodes with data. This - # allows clients to provide their own scope. - scope = toolz.merge( - scope, - *map( - functools.partial(pre_execute, op, scope=scope, **kwargs), - find_backends(expr) - ) + # allows clients to provide their own data for each leaf. + clients = list(find_backends(expr)) + + if context is None: + context = agg_ctx.Summarize() + + pre_executed_scope = map( + functools.partial( + pre_execute, op, scope=scope, context=context, **kwargs), + clients ) + new_scope = toolz.merge(scope, *pre_executed_scope) + result = execute_until_in_scope( + expr, new_scope, context=context, clients=clients, **kwargs) + + # XXX: we *explicitly* pass in scope and not new_scope here so that + # post_execute sees the scope of execute_with_scope, not the scope of + # execute_until_in_scope + return post_execute( + op, result, scope=scope, context=context, clients=clients, **kwargs) + + +def execute_until_in_scope(expr, scope, context=None, clients=None, **kwargs): + """Execute until our op is in `scope`. + + Parameters + ---------- + expr : ibis.expr.types.Expr + scope : Mapping + context : Optional[AggregationContext] + clients : List[ibis.client.Client] + kwargs : Mapping + """ + # these should never be None + assert context is not None, 'context is None' + assert clients is not None, 'clients is None' # base case: our op has been computed (or is a leaf data node), so # return the corresponding value + op = expr.op() if op in scope: return scope[op] - if context is None: - context = agg_ctx.Summarize() + new_scope = execute_bottom_up(expr, scope, context=context, **kwargs) + pre_executor = functools.partial(pre_execute, op, scope=scope, **kwargs) + new_scope = toolz.merge(new_scope, *map(pre_executor, clients)) + return execute_until_in_scope( + expr, new_scope, context=context, clients=clients, **kwargs) - try: - computed_args = [scope[t] for t in op.root_tables()] - except KeyError: - pass - else: - try: - # special case: we have a definition of execute_first that matches - # our current operation and data leaves - return execute_first( - op, *computed_args, scope=scope, context=context, **kwargs - ) - except NotImplementedError: - pass - args = op.args +def is_computable_arg(op, arg): + """Is `arg` a valid input to an ``execute_node`` rule? - # recursively compute the op's arguments - computed_args = [ - execute(arg, scope, context=context, **kwargs) - if hasattr(arg, 'op') else arg - for arg in args if isinstance(arg, _VALID_INPUT_TYPES) - ] + Parameters + ---------- + arg : object + Any Python object - # Compute our op, with its computed arguments - return execute_node( - op, *computed_args, - scope=scope, - context=context, - **kwargs + Returns + ------- + result : bool + """ + return ( + isinstance(op, (ops.ValueList, ops.WindowOp)) or + isinstance(arg, _VALID_INPUT_TYPES) ) -@execute.register(ir.Expr) -def execute_without_scope( - expr, params=None, scope=None, context=None, **kwargs): +def execute_bottom_up(expr, scope, context=None, **kwargs): + """Execute `expr` bottom-up. + + Parameters + ---------- + expr : ibis.expr.types.Expr + scope : Mapping[ibis.expr.operations.Node, object] + context : Optional[ibis.pandas.aggcontext.AggregationContext] + kwargs : Dict[str, object] + + Returns + ------- + result : Mapping[ + ibis.expr.operations.Node, + Union[pandas.Series, pandas.DataFrame, scalar_types] + ] + A mapping from node to the computed result of that Node + """ + op = expr.op() + + # if we're in scope then return the scope, this will then be passed back + # into execute_bottom_up, which will then terminate + if op in scope: + return scope + elif isinstance(op, ops.Literal): + # special case literals to avoid the overhead of dispatching + # execute_node + return { + op: execute_literal( + op, op.value, expr.type(), context=context, **kwargs + ) + } + + # figure out what arguments we're able to compute on based on the + # expressions inputs. things like expressions, None, and scalar types are + # computable whereas ``list``s are not + args = op.inputs + is_computable_argument = functools.partial(is_computable_arg, op) + computable_args = list(filter(is_computable_argument, args)) + + # recursively compute each node's arguments until we've changed type + scopes = [ + execute_bottom_up(arg, scope, context=context, **kwargs) + if hasattr(arg, 'op') else {arg: arg} + for arg in computable_args + ] + + # if we're unable to find data then raise an exception + if not scopes: + raise com.UnboundExpressionError( + 'Unable to find data for expression:\n{}'.format(repr(expr)) + ) + + # there should be exactly one dictionary per computable argument + assert len(computable_args) == len(scopes) + + new_scope = toolz.merge(scopes) + + # pass our computed arguments to this node's execute_node implementation + data = [ + new_scope[arg.op()] if hasattr(arg, 'op') else arg + for arg in computable_args + ] + result = execute_node(op, *data, scope=scope, context=context, **kwargs) + return {op: result} + + +def execute(expr, params=None, scope=None, context=None, **kwargs): """Execute an expression against data that are bound to it. If no data are bound, raise an Exception. Parameters ---------- - expr : ir.Expr + expr : ibis.expr.types.Expr The expression to execute - params : Dict[Expr, object] + params : Mapping[Expr, object] + scope : Mapping[ibis.expr.operations.Node, object] + context : Optional[ibis.pandas.aggcontext.AggregationContext] Returns ------- - result : scalar, pd.Series, pd.DataFrame + result : Union[pandas.Series, pandas.DataFrame, scalar_types] Raises ------ ValueError * If no data are bound to the input expression """ - - data_scope = find_data(expr) - - factory = type(data_scope) - if scope is None: - scope = factory() + scope = {} if params is None: - params = factory() + params = {} + # TODO: make expresions hashable so that we can get rid of these .op() + # calls everywhere params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()} - new_scope = toolz.merge(scope, data_scope, params, factory=factory) - - # data_preload - new_scope.update( - (node, data_preload(node, data, scope=new_scope)) - for node, data in new_scope.items() - ) - - # By default, our aggregate functions are N -> 1 - return execute( - expr, - new_scope, - context=context if context is not None else agg_ctx.Summarize(), - **kwargs - ) + new_scope = toolz.merge(scope, params) + return execute_with_scope(expr, new_scope, context=context, **kwargs) diff --git a/ibis/pandas/dispatch.py b/ibis/pandas/dispatch.py index 6b5f6d74391d..34b11f1a6dcd 100644 --- a/ibis/pandas/dispatch.py +++ b/ibis/pandas/dispatch.py @@ -2,12 +2,14 @@ import contextlib -import ibis.common as com -import ibis.expr.operations as ops from multipledispatch import Dispatcher, halt_ordering, restart_ordering +import pandas as pd + +import ibis.common as com +import ibis.expr.operations as ops +import ibis.expr.datatypes as dt -execute = Dispatcher('execute', doc='Execute an expression') # Individual operation execution execute_node = Dispatcher( @@ -24,15 +26,10 @@ def execute_node_without_scope(node, **kwargs): raise com.UnboundExpressionError( 'Node of type {!r} has no data bound to it. ' 'You probably tried to execute an expression without a data source.' + .format(type(node).__name__) ) -execute_first = Dispatcher( - 'execute_first', doc='Compute from the top of the expression downward') - -data_preload = Dispatcher( - 'data_preload', doc='Possibly preload data from the client, given a node') - pre_execute = Dispatcher( 'pre_execute', doc="""\ @@ -46,18 +43,61 @@ def execute_node_without_scope(node, **kwargs): """) -# Default does nothing -@data_preload.register(object, object) -def data_preload_default(node, data, **kwargs): - return data - - # Default returns an empty scope @pre_execute.register(object, object) def pre_execute_default(node, client, **kwargs): return {} +execute_literal = Dispatcher( + 'execute_literal', + doc="""\ +Special case literal execution to avoid the dispatching overhead of +``execute_node``. + +Parameters +---------- +op : ibis.expr.operations.Node +value : object + The literal value of the object, e.g., int, float. +datatype : ibis.expr.datatypes.DataType + Used to specialize on expressions whose underlying value is of a different + type than its would-be type. For example, interval values are represented + by an integer. +""") + + +# By default return the literal value +@execute_literal.register(ops.Literal, object, dt.DataType) +def execute_node_literal_value_datatype(op, value, datatype, **kwargs): + return value + + +# By default return the literal value +@execute_literal.register(ops.Literal, object, dt.Interval) +def execute_node_literal_value_interval(op, value, datatype, **kwargs): + return pd.Timedelta(value, unit=datatype.unit) + + +post_execute = Dispatcher( + 'post_execute', + doc="""\ +Execute code on the result of a computation. + +Parameters +---------- +op : ibis.expr.operations.Node + The operation that was just executed +data : object + The result of the computation +""") + + +@post_execute.register(ops.Node, object) +def post_execute_default(op, data, **kwargs): + return data + + @contextlib.contextmanager def pause_ordering(): """Pause multipledispatch ordering.""" diff --git a/ibis/pandas/execution/generic.py b/ibis/pandas/execution/generic.py index e62912c5c959..335fe21ed9b9 100644 --- a/ibis/pandas/execution/generic.py +++ b/ibis/pandas/execution/generic.py @@ -17,6 +17,7 @@ from ibis import compat +import ibis import ibis.common as com import ibis.expr.datatypes as dt import ibis.expr.operations as ops @@ -24,6 +25,7 @@ from ibis.compat import functools from ibis.pandas.core import ( + execute, boolean_types, integer_types, floating_types, @@ -32,15 +34,9 @@ fixed_width_types, scalar_types ) -from ibis.pandas.dispatch import execute, execute_node -from ibis.pandas.execution import constants - -@execute_node.register(ops.Literal) -@execute_node.register(ops.Literal, object) -@execute_node.register(ops.Literal, object, dt.DataType) -def execute_node_literal(op, *args, **kwargs): - return op.value +from ibis.pandas.dispatch import execute_node +from ibis.pandas.execution import constants @execute_node.register(ops.Literal, object, dt.Interval) @@ -680,8 +676,8 @@ def execute_node_self_reference_dataframe(op, data, **kwargs): return data -@execute_node.register(ops.ValueList) -def execute_node_value_list(op, **kwargs): +@execute_node.register(ops.ValueList, collections.Sequence) +def execute_node_value_list(op, _, **kwargs): return [execute(arg, **kwargs) for arg in op.values] @@ -762,3 +758,9 @@ def execute_node_where_scalar_series_scalar(op, cond, true, false, **kwargs): @execute_node.register(ops.Where, boolean_types, scalar_types, pd.Series) def execute_node_where_scalar_scalar_series(op, cond, true, false, **kwargs): return pd.Series(np.repeat(true, len(false))) if cond else false + + +@execute_node.register( + ibis.pandas.client.PandasTable, ibis.pandas.client.PandasClient) +def execute_database_table_client(op, client, **kwargs): + return client.dictionary[op.name] diff --git a/ibis/pandas/execution/join.py b/ibis/pandas/execution/join.py index 3d196672f47a..06f53e1b8ace 100644 --- a/ibis/pandas/execution/join.py +++ b/ibis/pandas/execution/join.py @@ -4,7 +4,8 @@ import ibis.expr.operations as ops -from ibis.pandas.dispatch import execute, execute_node +from ibis.pandas.dispatch import execute_node +from ibis.pandas.core import execute from ibis.pandas.execution import constants diff --git a/ibis/pandas/execution/selection.py b/ibis/pandas/execution/selection.py index 00912c1a41d0..baef9da04284 100644 --- a/ibis/pandas/execution/selection.py +++ b/ibis/pandas/execution/selection.py @@ -19,7 +19,8 @@ from ibis.compat import functools -from ibis.pandas.dispatch import execute, execute_node +from ibis.pandas.dispatch import execute_node +from ibis.pandas.core import execute from ibis.pandas.execution import constants, util @@ -44,8 +45,7 @@ ----- :class:`~ibis.expr.types.ScalarExpr` instances occur when a specific column projection is a window operation. -""" -) +""") @compute_projection.register(ir.ScalarExpr, ops.Selection, pd.DataFrame) diff --git a/ibis/pandas/execution/tests/conftest.py b/ibis/pandas/execution/tests/conftest.py index 7fedcc3bc0ee..afb25d60cfa6 100644 --- a/ibis/pandas/execution/tests/conftest.py +++ b/ibis/pandas/execution/tests/conftest.py @@ -52,6 +52,11 @@ def df(): }) +@pytest.fixture(scope='module') +def time_df(df): + return df.set_index('plain_datetimes_naive') + + @pytest.fixture(scope='module') def batting_df(): path = os.path.join( @@ -136,7 +141,8 @@ def time_keyed_df2(): @pytest.fixture(scope='module') def client( - df, df1, df2, df3, time_df1, time_df2, time_keyed_df1, time_keyed_df2 + df, df1, df2, df3, time_df1, time_df2, time_keyed_df1, time_keyed_df2, + time_df ): return ibis.pandas.connect( dict( @@ -149,7 +155,8 @@ def client( time_df1=time_df1, time_df2=time_df2, time_keyed_df1=time_keyed_df1, - time_keyed_df2=time_keyed_df2 + time_keyed_df2=time_keyed_df2, + time_df=time_df ) ) @@ -177,6 +184,19 @@ def t(client): ) +@pytest.fixture(scope='module') +def time_t(client): + return client.table( + 'time_df', + schema={ + 'decimal': dt.Decimal(4, 3), + 'array_of_float64': dt.Array(dt.double), + 'array_of_int64': dt.Array(dt.int64), + 'array_of_strings': dt.Array(dt.string), + } + ) + + @pytest.fixture(scope='module') def lahman(batting_df, awards_players_df): return ibis.pandas.connect({ diff --git a/ibis/pandas/execution/tests/test_functions.py b/ibis/pandas/execution/tests/test_functions.py index 0c50485e9122..62fa8946175e 100644 --- a/ibis/pandas/execution/tests/test_functions.py +++ b/ibis/pandas/execution/tests/test_functions.py @@ -31,17 +31,16 @@ def test_binary_operations(t, df, op): expr = op(t.plain_float64, t.plain_int64) result = expr.execute() - tm.assert_series_equal(result, op(df.plain_float64, df.plain_int64)) + expected = op(df.plain_float64, df.plain_int64) + tm.assert_series_equal(result, expected) @pytest.mark.parametrize('op', [operator.and_, operator.or_, operator.xor]) def test_binary_boolean_operations(t, df, op): expr = op(t.plain_int64 == 1, t.plain_int64 == 2) result = expr.execute() - tm.assert_series_equal( - result, - op(df.plain_int64 == 1, df.plain_int64 == 2) - ) + expected = op(df.plain_int64 == 1, df.plain_int64 == 2) + tm.assert_series_equal(result, expected) @pytest.mark.parametrize('places', [-2, 0, 1, 2, None]) diff --git a/ibis/pandas/execution/tests/test_join.py b/ibis/pandas/execution/tests/test_join.py index f670965c50bb..f773a65e5101 100644 --- a/ibis/pandas/execution/tests/test_join.py +++ b/ibis/pandas/execution/tests/test_join.py @@ -135,7 +135,7 @@ def test_join_with_post_expression_selection(how, left, right, df1, df2): @join_type -def test_join_with_post_expression_filter(how, left, df1): +def test_join_with_post_expression_filter(how, left): lhs = left[['key', 'key2']] rhs = left[['key2', 'value']] diff --git a/ibis/pandas/execution/tests/test_operations.py b/ibis/pandas/execution/tests/test_operations.py index 25c27e550c2f..d050e63a0229 100644 --- a/ibis/pandas/execution/tests/test_operations.py +++ b/ibis/pandas/execution/tests/test_operations.py @@ -637,17 +637,7 @@ def test_scalar_parameter(t, df, raw_value): tm.assert_series_equal(result, expected) -@pytest.mark.parametrize( - 'elements', - [ - [1], - (1,), - pytest.mark.xfail({1}, raises=TypeError, reason='Not yet implemented'), - pytest.mark.xfail( - frozenset({1}), raises=TypeError, reason='Not yet implemented' - ), - ] -) +@pytest.mark.parametrize('elements', [[1], (1,), {1}, frozenset({1})]) def test_isin(t, df, elements): expr = t.plain_float64.isin(elements) expected = df.plain_float64.isin(elements) @@ -655,17 +645,7 @@ def test_isin(t, df, elements): tm.assert_series_equal(result, expected) -@pytest.mark.parametrize( - 'elements', - [ - [1], - (1,), - pytest.mark.xfail({1}, raises=TypeError, reason='Not yet implemented'), - pytest.mark.xfail( - frozenset({1}), raises=TypeError, reason='Not yet implemented' - ), - ] -) +@pytest.mark.parametrize('elements', [[1], (1,), {1}, frozenset({1})]) def test_notin(t, df, elements): expr = t.plain_float64.notin(elements) expected = ~df.plain_float64.isin(elements) diff --git a/ibis/pandas/execution/tests/test_window.py b/ibis/pandas/execution/tests/test_window.py index 3e845cde1418..0fd7cf9141d9 100644 --- a/ibis/pandas/execution/tests/test_window.py +++ b/ibis/pandas/execution/tests/test_window.py @@ -9,20 +9,66 @@ import ibis +execute = ibis.pandas.execute + pytestmark = pytest.mark.pandas -def test_lead(t, df): - expr = t.dup_strings.lead() +@pytest.fixture(scope='session') +def sort_kind(): + return 'mergesort' + + +default = pytest.mark.parametrize('default', [ibis.NA, ibis.literal('a')]) +row_offset = pytest.mark.parametrize( + 'row_offset', list(map(ibis.literal, [-1, 1, 0]))) +delta_offset = pytest.mark.parametrize( + 'delta_offset', + [ibis.day(), 2 * ibis.day(), -2 * ibis.day()] +) + + +@default +@row_offset +def test_lead(t, df, row_offset, default): + expr = t.dup_strings.lead(row_offset, default=default) + result = expr.execute() + expected = df.dup_strings.shift(-execute(row_offset)) + if default is not ibis.NA: + expected = expected.fillna(execute(default)) + tm.assert_series_equal(result, expected) + + +@default +@row_offset +def test_lag(t, df, row_offset, default): + expr = t.dup_strings.lag(row_offset, default=default) + result = expr.execute() + expected = df.dup_strings.shift(execute(row_offset)) + if default is not ibis.NA: + expected = expected.fillna(execute(default)) + tm.assert_series_equal(result, expected) + + +@default +@delta_offset +def test_lead_delta(time_t, time_df, delta_offset, default): + expr = time_t.dup_strings.lead(delta_offset, default=default) result = expr.execute() - expected = df.dup_strings.shift(-1) + expected = time_df.dup_strings.tshift(freq=-execute(delta_offset)) + if default is not ibis.NA: + expected = expected.fillna(execute(default)) tm.assert_series_equal(result, expected) -def test_lag(t, df): - expr = t.dup_strings.lag() +@default +@delta_offset +def test_lag_delta(time_t, time_df, delta_offset, default): + expr = time_t.dup_strings.lag(delta_offset, default=default) result = expr.execute() - expected = df.dup_strings.shift(1) + expected = time_df.dup_strings.tshift(freq=execute(delta_offset)) + if default is not ibis.NA: + expected = expected.fillna(execute(default)) tm.assert_series_equal(result, expected) @@ -135,19 +181,20 @@ def test_batting_quantile(players, players_df): @pytest.mark.parametrize('op', ['sum', 'mean', 'min', 'max']) -def test_batting_specific_cumulative(batting, batting_df, op): +def test_batting_specific_cumulative(batting, batting_df, op, sort_kind): ibis_method = methodcaller('cum{}'.format(op)) expr = ibis_method(batting.sort_by([batting.yearID]).G) result = expr.execute().astype('float64') pandas_method = methodcaller(op) expected = pandas_method( - batting_df[['G', 'yearID']].sort_values('yearID').G.expanding() + batting_df[['G', 'yearID']].sort_values( + 'yearID', kind=sort_kind).G.expanding() ).reset_index(drop=True) tm.assert_series_equal(result, expected) -def test_batting_cumulative(batting, batting_df): +def test_batting_cumulative(batting, batting_df, sort_kind): expr = batting.mutate( more_values=lambda t: t.G.sum().over( ibis.cumulative_window(order_by=t.yearID) @@ -156,13 +203,14 @@ def test_batting_cumulative(batting, batting_df): result = expr.execute() columns = ['G', 'yearID'] - more_values = batting_df[columns].sort_values('yearID').G.cumsum() + more_values = batting_df[columns].sort_values( + 'yearID', kind=sort_kind).G.cumsum() expected = batting_df.assign(more_values=more_values) tm.assert_frame_equal(result[expected.columns], expected) -def test_batting_cumulative_partitioned(batting, batting_df): +def test_batting_cumulative_partitioned(batting, batting_df, sort_kind): expr = batting.mutate( more_values=lambda t: t.G.sum().over( ibis.cumulative_window(order_by=t.yearID, group_by=t.lgID) @@ -174,7 +222,7 @@ def test_batting_cumulative_partitioned(batting, batting_df): key = 'lgID' expected_result = batting_df[columns].groupby( key, sort=False, as_index=False - ).apply(lambda df: df.sort_values('yearID')).groupby( + ).apply(lambda df: df.sort_values('yearID', kind=sort_kind)).groupby( key, sort=False ).G.cumsum().sort_index(level=-1) expected = expected_result.reset_index( @@ -186,7 +234,7 @@ def test_batting_cumulative_partitioned(batting, batting_df): tm.assert_series_equal(result, expected) -def test_batting_rolling(batting, batting_df): +def test_batting_rolling(batting, batting_df, sort_kind): expr = batting.mutate( more_values=lambda t: t.G.sum().over( ibis.trailing_window(5, order_by=t.yearID) @@ -195,13 +243,14 @@ def test_batting_rolling(batting, batting_df): result = expr.execute() columns = ['G', 'yearID'] - more_values = batting_df[columns].sort_values('yearID').G.rolling(5).sum() + more_values = batting_df[columns].sort_values( + 'yearID', kind=sort_kind).G.rolling(5).sum() expected = batting_df.assign(more_values=more_values) tm.assert_frame_equal(result[expected.columns], expected) -def test_batting_rolling_partitioned(batting, batting_df): +def test_batting_rolling_partitioned(batting, batting_df, sort_kind): expr = batting.mutate( more_values=lambda t: t.G.sum().over( ibis.trailing_window(3, order_by=t.yearID, group_by=t.lgID) @@ -213,7 +262,7 @@ def test_batting_rolling_partitioned(batting, batting_df): key = 'lgID' expected_result = batting_df[columns].groupby( key, sort=False, as_index=False - ).apply(lambda df: df.sort_values('yearID')).groupby( + ).apply(lambda df: df.sort_values('yearID', kind=sort_kind)).groupby( key, sort=False ).G.rolling(3).sum().sort_index(level=-1) expected = expected_result.reset_index( @@ -246,7 +295,7 @@ def test_scalar_broadcasting(batting, batting_df): tm.assert_frame_equal(result, expected) -def test_mutate_with_window_after_join(): +def test_mutate_with_window_after_join(sort_kind): left_df = pd.DataFrame({ 'ints': [0, 1, 2], 'strings': ['a', 'b', 'c'], @@ -266,7 +315,7 @@ def test_mutate_with_window_after_join(): expected = pd.DataFrame({ 'dates': pd.concat( [left_df.dates] * 3 - ).sort_values().reset_index(drop=True), + ).sort_values(kind=sort_kind).reset_index(drop=True), 'ints': [0] * 3 + [1] * 3 + [2] * 3, 'strings': ['a'] * 3 + ['b'] * 3 + ['c'] * 3, 'value': [0.0, 3.0, 6.0, 1.0, 4.0, 7.0, np.nan, np.nan, 8.0], @@ -324,3 +373,19 @@ def test_project_list_scalar(): result = expr.mutate(res=expr.ints.quantile([0.5, 0.95])).execute() tm.assert_series_equal( result.res, pd.Series([[1.0, 1.9] for _ in range(0, 3)], name='res')) + + +def test_window_with_preceding_expr(): + index = pd.date_range('20180101', '20180110') + start = 2 + data = np.arange(start, start + len(index)) + df = pd.DataFrame({'value': data, 'time': index}, index=index) + client = ibis.pandas.connect({'df': df}) + t = client.table('df') + expected = df.set_index('time').value.rolling('3d').mean() + expected.index.name = None + day = ibis.day() + window = ibis.trailing_window(3 * day, order_by=t.time) + expr = t.value.mean().over(window) + result = expr.execute() + tm.assert_series_equal(result, expected) diff --git a/ibis/pandas/execution/util.py b/ibis/pandas/execution/util.py index 10da8c059316..82c787dccf7e 100644 --- a/ibis/pandas/execution/util.py +++ b/ibis/pandas/execution/util.py @@ -3,7 +3,7 @@ import ibis import ibis.common as com -from ibis.pandas.dispatch import execute +from ibis.pandas.core import execute def compute_sort_key(key, data, **kwargs): @@ -33,6 +33,8 @@ def compute_sorted_frame(sort_keys, df, **kwargs): new_columns[computed_sort_key] = temporary_column result = df.assign(**new_columns) - result = result.sort_values(computed_sort_keys, ascending=ascending) + result = result.sort_values( + computed_sort_keys, ascending=ascending, kind='mergesort' + ) result = result.drop(new_columns.keys(), axis=1) return result diff --git a/ibis/pandas/execution/window.py b/ibis/pandas/execution/window.py index 3ea84a9aa067..a2a4be056741 100644 --- a/ibis/pandas/execution/window.py +++ b/ibis/pandas/execution/window.py @@ -12,12 +12,14 @@ from pandas.core.groupby import SeriesGroupBy import ibis.common as com +import ibis.expr.window as win import ibis.expr.operations as ops import ibis.pandas.aggcontext as agg_ctx from ibis.pandas.core import integer_types -from ibis.pandas.dispatch import execute, execute_first, execute_node +from ibis.pandas.dispatch import execute_node +from ibis.pandas.core import execute from ibis.pandas.execution import util @@ -40,9 +42,14 @@ def _post_process_group_by_order_by(series, index): return reindexed_series -@execute_first.register(ops.WindowOp, pd.DataFrame) -def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): - operand, window = op.args +@execute_node.register(ops.WindowOp, pd.Series, win.Window) +def execute_window_op(op, data, window, scope=None, context=None, **kwargs): + operand = op.expr + root, = op.root_tables() + try: + data = scope[root] + except KeyError: + data = execute(root.to_expr(), scope=scope, context=context, **kwargs) following = window.following order_by = window._order_by @@ -104,12 +111,15 @@ def execute_frame_window_op(op, data, scope=None, context=None, **kwargs): if not grouping_keys and not order_by: context = agg_ctx.Summarize() elif isinstance(operand.op(), ops.Reduction) and order_by: + # XXX(phillipc): What a horror show preceding = window.preceding if preceding is not None: - context = agg_ctx.Trailing(preceding) + context = agg_ctx.Moving(preceding) else: + # expanding window context = agg_ctx.Cumulative() else: + # groupby transform (window with a partition by clause in SQL parlance) context = agg_ctx.Transform() result = execute(operand, new_scope, context=context, **kwargs) @@ -153,19 +163,45 @@ def execute_series_cumulative_op(op, data, **kwargs): @execute_node.register( ops.Lag, (pd.Series, SeriesGroupBy), integer_types + (type(None),), - type(None), + object, ) def execute_series_lag(op, data, offset, default, **kwargs): - return data.shift(1 if offset is None else offset) + result = data.shift(1 if offset is None else offset) + if not pd.isnull(default): + return result.fillna(default) + return result @execute_node.register( ops.Lead, (pd.Series, SeriesGroupBy), integer_types + (type(None),), - type(None), + object, ) def execute_series_lead(op, data, offset, default, **kwargs): - return data.shift(-(1 if offset is None else offset)) + result = data.shift(-(1 if offset is None else offset)) + if not pd.isnull(default): + return result.fillna(default) + return result + + +@execute_node.register( + ops.Lag, (pd.Series, SeriesGroupBy), pd.Timedelta, object, +) +def execute_series_lag_timedelta(op, data, offset, default, **kwargs): + result = data.tshift(freq=offset) + if not pd.isnull(default): + return result.fillna(default) + return result + + +@execute_node.register( + ops.Lead, (pd.Series, SeriesGroupBy), pd.Timedelta, object +) +def execute_series_lead_timedelta(op, data, offset, default, **kwargs): + result = data.tshift(freq=-offset) + if not pd.isnull(default): + return result.fillna(default) + return result @execute_node.register(ops.FirstValue, pd.Series) diff --git a/ibis/pandas/tests/test_client.py b/ibis/pandas/tests/test_client.py index 488a90a7e9d6..9f3f3ec7e2f9 100644 --- a/ibis/pandas/tests/test_client.py +++ b/ibis/pandas/tests/test_client.py @@ -37,7 +37,9 @@ def test_client_table_repr(table): def test_literal(client): - assert client.execute(ibis.literal(1)) == 1 + lit = ibis.literal(1) + result = client.execute(lit) + assert result == 1 def test_read_with_undiscoverable_type(client): diff --git a/ibis/pandas/tests/test_core.py b/ibis/pandas/tests/test_core.py index fbcda998d604..33720a09d2a2 100644 --- a/ibis/pandas/tests/test_core.py +++ b/ibis/pandas/tests/test_core.py @@ -5,16 +5,13 @@ import ibis import ibis.common as com -import ibis.expr.datatypes as dt import ibis.expr.operations as ops pytest.importorskip('multipledispatch') -from ibis.pandas.execution import ( - execute, execute_node, execute_first -) # noqa: E402 -from ibis.pandas.client import PandasTable, PandasClient # noqa: E402 -from ibis.pandas.core import data_preload, pre_execute # noqa: E402 +from ibis.pandas.dispatch import ( + execute_node, pre_execute, post_execute) # noqa: E402 +from ibis.pandas.client import PandasClient # noqa: E402 from multipledispatch.conflict import ambiguities # noqa: E402 pytestmark = pytest.mark.pandas @@ -39,7 +36,9 @@ def ibis_table(core_client): return core_client.table('df') -@pytest.mark.parametrize('func', [execute, execute_node, execute_first]) +@pytest.mark.parametrize( + 'func', [execute_node, pre_execute, post_execute] +) def test_no_execute_ambiguities(func): assert not ambiguities(func.funcs) @@ -60,35 +59,6 @@ def test_from_dataframe(dataframe, ibis_table, core_client): tm.assert_frame_equal(result, expected) -def test_execute_first_accepts_scope_keyword_argument(ibis_table, dataframe): - - param = ibis.param(dt.int64) - - @execute_first.register(ops.Node, pd.DataFrame) - def foo(op, data, scope=None, **kwargs): - assert scope is not None - return data.dup_strings.str.len() + scope[param.op()] - - expr = ibis_table.dup_strings.length() + param - assert expr.execute(params={param: 2}) is not None - del execute_first.funcs[ops.Node, pd.DataFrame] - execute_first.reorder() - execute_first._cache.clear() - - -def test_data_preload(ibis_table, dataframe): - @data_preload.register(PandasTable, pd.DataFrame) - def data_preload_check_a_thing(_, df, **kwargs): - return df - - result = ibis_table.execute() - tm.assert_frame_equal(result, dataframe) - - del data_preload.funcs[PandasTable, pd.DataFrame] - data_preload.reorder() - data_preload._cache.clear() - - def test_pre_execute_basic(ibis_table, dataframe): """ Test that pre_execute has intercepted execution and provided its own @@ -119,3 +89,21 @@ def test_missing_data_sources(): expr = t.a.length() with pytest.raises(com.UnboundExpressionError): ibis.pandas.execute(expr) + + +def test_missing_data_on_custom_client(): + class MyClient(PandasClient): + def table(self, name): + return ops.DatabaseTable( + name, ibis.schema([('a', 'int64')]), self).to_expr() + + con = MyClient({}) + t = con.table('t') + with pytest.raises( + NotImplementedError, + match=( + 'Could not find signature for execute_node: ' + '' + ) + ): + con.execute(t) diff --git a/ibis/pandas/tests/test_udf.py b/ibis/pandas/tests/test_udf.py index 9a580c7e8331..be1aab8d20fc 100644 --- a/ibis/pandas/tests/test_udf.py +++ b/ibis/pandas/tests/test_udf.py @@ -92,8 +92,6 @@ def test_nullable(): assert nullable(t.a.type()) == (type(None),) -@pytest.mark.xfail( - raises=AssertionError, reason='Nullability is not propagated') def test_nullable_non_nullable_field(): t = ibis.table([('a', dt.String(nullable=False))]) assert nullable(t.a.type()) == () diff --git a/ibis/util.py b/ibis/util.py index 5a51741a200f..d63c5fb2e16a 100644 --- a/ibis/util.py +++ b/ibis/util.py @@ -7,6 +7,8 @@ import six +import toolz + import ibis.compat as compat from ibis.config import options @@ -26,18 +28,12 @@ def indent(text, spaces): return ''.join(prefix + line for line in text.splitlines(True)) -def any_of(values, t): - for x in values: - if isinstance(x, t): - return True - return False +def is_one_of(values, t): + return (isinstance(x, t) for x in values) -def all_of(values, t): - for x in values: - if not isinstance(x, t): - return False - return True +any_of = toolz.compose(any, is_one_of) +all_of = toolz.compose(all, is_one_of) def promote_list(val): @@ -182,8 +178,8 @@ def safe_index(elements, value): return -1 -def is_sequence(o): - """Return whether `o` is a non-string sequence. +def is_iterable(o): + """Return whether `o` is a non-string iterable. Parameters ---------- @@ -193,6 +189,20 @@ def is_sequence(o): Returns ------- is_seq : bool + + Examples + -------- + >>> x = '1' + >>> is_iterable(x) + False + >>> is_iterable(iter(x)) + True + >>> is_iterable(i for i in range(1)) + True + >>> is_iterable(1) + False + >>> is_iterable([]) + True """ return (not isinstance(o, six.string_types) and isinstance(o, collections.Iterable))