From 9ad663f40714cee434b645f483483e0788cde2e1 Mon Sep 17 00:00:00 2001 From: Hyonjee Joo <5000208+hjoo@users.noreply.github.com> Date: Wed, 7 Aug 2019 10:52:31 -0400 Subject: [PATCH] implement pyspark numeric operations to pass all/test_numeric.py (#9) * implement pyspark compiler numeric operations to pass all/test_numeric.py --- ibis/__init__.py | 2 +- ibis/pyspark/client.py | 14 ++- ibis/pyspark/compiler.py | 200 +++++++++++++++++++++++++++++-- ibis/pyspark/operations.py | 1 + ibis/pyspark/tests/test_basic.py | 12 +- ibis/tests/backends.py | 3 +- 6 files changed, 219 insertions(+), 13 deletions(-) diff --git a/ibis/__init__.py b/ibis/__init__.py index 2644f50caf9f..40ec9f46dd77 100644 --- a/ibis/__init__.py +++ b/ibis/__init__.py @@ -58,7 +58,7 @@ import ibis.spark.api as spark # noqa: F401 with suppress(ImportError): - import ibis.pyspark.api as pyspark + import ibis.pyspark.api as pyspark # noqa: F401 def hdfs_connect( diff --git a/ibis/pyspark/client.py b/ibis/pyspark/client.py index 96709bdc0d09..02dd8adfe4aa 100644 --- a/ibis/pyspark/client.py +++ b/ibis/pyspark/client.py @@ -3,6 +3,8 @@ from ibis.pyspark.operations import PysparkTable from ibis.spark.client import SparkClient +from pyspark.sql.column import Column + class PysparkClient(SparkClient): """ @@ -21,7 +23,17 @@ def execute(self, expr, params=None, limit='default', **kwargs): if isinstance(expr, types.TableExpr): return self.compile(expr).toPandas() + elif isinstance(expr, types.ColumnExpr): + # expression must be named for the projection + expr = expr.name('tmp') + return self.compile(expr.to_projection()).toPandas()['tmp'] elif isinstance(expr, types.ScalarExpr): - return self.compile(expr).toPandas().iloc[0, 0] + compiled = self.compile(expr) + if isinstance(compiled, Column): + # attach result column to a fake DataFrame and + # select the result + compiled = self._session.range(0, 1) \ + .select(compiled) + return compiled.toPandas().iloc[0, 0] else: raise ValueError("Unexpected type: ", type(expr)) diff --git a/ibis/pyspark/compiler.py b/ibis/pyspark/compiler.py index ce7e5c28daa9..a91a0ca46751 100644 --- a/ibis/pyspark/compiler.py +++ b/ibis/pyspark/compiler.py @@ -2,6 +2,7 @@ import functools import pyspark.sql.functions as F +from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.window import Window import ibis.common as com @@ -55,17 +56,19 @@ def compile_datasource(t, expr): def compile_selection(t, expr): op = expr.op() - if isinstance(op.selections[0], types.ColumnExpr): - column_names = [expr.op().name for expr in op.selections] - src_table = t.translate(op.table)[column_names] - elif isinstance(op.selections[0], types.TableExpr): - src_table = t.translate(op.table) - for selection in op.selections[1:]: + src_table = t.translate(op.table) + col_names_in_selection_order = [] + for selection in op.selections: + if isinstance(selection, types.TableExpr): + col_names_in_selection_order.extend(selection.columns) + elif isinstance(selection, types.ColumnExpr): column_name = selection.get_name() - column = t.translate(selection) - src_table = src_table.withColumn(column_name, column) + col_names_in_selection_order.append(column_name) + if column_name not in src_table.columns: + column = t.translate(selection) + src_table = src_table.withColumn(column_name, column) - return src_table + return src_table[col_names_in_selection_order] @compiles(ops.TableColumn) @@ -280,6 +283,185 @@ def compile_greatest(t, expr): return F.greatest(*src_columns) +@compiles(ops.Least) +def compile_least(t, expr): + op = expr.op() + + src_columns = t.translate(op.arg) + if len(src_columns) == 1: + return src_columns[0] + else: + return F.least(*src_columns) + + +@compiles(ops.Abs) +def compile_abs(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.abs(src_column) + + +@compiles(ops.Round) +def compile_round(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + scale = op.digits.op().value if op.digits is not None else 0 + rounded = F.round(src_column, scale=scale) + if scale == 0: + rounded = rounded.astype('long') + return rounded + + +@compiles(ops.Ceil) +def compile_ceil(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.ceil(src_column) + + +@compiles(ops.Floor) +def compile_floor(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.floor(src_column) + + +@compiles(ops.Exp) +def compile_exp(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.exp(src_column) + + +@compiles(ops.Sign) +def compile_sign(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + + return F.when(src_column == 0, F.lit(0.0)) \ + .otherwise(F.when(src_column > 0, F.lit(1.0)).otherwise(-1.0)) + + +@compiles(ops.Sqrt) +def compile_sqrt(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.sqrt(src_column) + + +@compiles(ops.Log) +def compile_log(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.log(float(op.base.op().value), src_column) + + +@compiles(ops.Ln) +def compile_ln(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.log(src_column) + + +@compiles(ops.Log2) +def compile_log2(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.log2(src_column) + + +@compiles(ops.Log10) +def compile_log10(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.log10(src_column) + + +@compiles(ops.Modulus) +def compile_modulus(t, expr): + op = expr.op() + + left = t.translate(op.left) + right = t.translate(op.right) + return left % right + + +@compiles(ops.Negate) +def compile_negate(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return -src_column + + +@compiles(ops.Add) +def compile_add(t, expr): + op = expr.op() + + left = t.translate(op.left) + right = t.translate(op.right) + return left + right + + +@compiles(ops.Divide) +def compile_divide(t, expr): + op = expr.op() + + left = t.translate(op.left) + right = t.translate(op.right) + return left / right + + +@compiles(ops.FloorDivide) +def compile_floor_divide(t, expr): + op = expr.op() + + left = t.translate(op.left) + right = t.translate(op.right) + return F.floor(left / right) + + +@compiles(ops.Power) +def compile_power(t, expr): + op = expr.op() + + left = t.translate(op.left) + right = t.translate(op.right) + return F.pow(left, right) + + +@compiles(ops.IsNan) +def compile_isnan(t, expr): + op = expr.op() + + src_column = t.translate(op.arg) + return F.isnan(src_column) + + +@compiles(ops.IsInf) +def compile_isinf(t, expr): + import numpy as np + op = expr.op() + + @pandas_udf('boolean', PandasUDFType.SCALAR) + def isinf(v): + return np.isinf(v) + + src_column = t.translate(op.arg) + return isinf(src_column) + + @compiles(ops.ValueList) def compile_value_list(t, expr): op = expr.op() diff --git a/ibis/pyspark/operations.py b/ibis/pyspark/operations.py index ccc8025d5d6e..9ae34861dabb 100644 --- a/ibis/pyspark/operations.py +++ b/ibis/pyspark/operations.py @@ -1,4 +1,5 @@ import ibis.expr.operations as ops + class PysparkTable(ops.DatabaseTable): pass diff --git a/ibis/pyspark/tests/test_basic.py b/ibis/pyspark/tests/test_basic.py index 7ba3bf25adc1..d7298d340a5e 100644 --- a/ibis/pyspark/tests/test_basic.py +++ b/ibis/pyspark/tests/test_basic.py @@ -24,7 +24,7 @@ def client(): def test_basic(client): table = client.table('table1') result = table.compile().toPandas() - expected = pd.DataFrame({'id': range(0, 10)}) + expected = pd.DataFrame({'id': range(0, 10), 'str_col': 'value'}) tm.assert_frame_equal(result, expected) @@ -36,6 +36,7 @@ def test_projection(client): expected1 = pd.DataFrame( { 'id': range(0, 10), + 'str_col': 'value', 'v': range(0, 10) } ) @@ -48,6 +49,7 @@ def test_projection(client): expected2 = pd.DataFrame( { 'id': range(0, 10), + 'str_col': 'value', 'v': range(0, 10), 'v2': range(0, 10) } @@ -126,10 +128,18 @@ def test_selection(client): result1 = table[['id']].compile() result2 = table[['id', 'id2']].compile() + result3 = table[[table, (table.id + 1).name('plus1')]].compile() + result4 = table[[(table.id + 1).name('plus1'), table]].compile() df = table.compile() tm.assert_frame_equal(result1.toPandas(), df[['id']].toPandas()) tm.assert_frame_equal(result2.toPandas(), df[['id', 'id2']].toPandas()) + tm.assert_frame_equal(result3.toPandas(), + df[[df.columns]].withColumn('plus1', df.id + 1) + .toPandas()) + tm.assert_frame_equal(result4.toPandas(), + df.withColumn('plus1', df.id + 1) + [['plus1', *df.columns]].toPandas()) def test_join(client): diff --git a/ibis/tests/backends.py b/ibis/tests/backends.py index 52f6b1f1d187..8db83c811c95 100644 --- a/ibis/tests/backends.py +++ b/ibis/tests/backends.py @@ -551,7 +551,8 @@ def awards_players(self) -> ir.TableExpr: return self.connection.table('awards_players') -class PySpark(Backend, RoundHalfToEven): +class PySpark(Backend, RoundAwayFromZero): + @staticmethod def skip_if_missing_dependencies() -> None: pytest.importorskip('pyspark')