Skip to content

Commit

Permalink
implement pyspark numeric operations to pass all/test_numeric.py (#9)
Browse files Browse the repository at this point in the history
* implement pyspark compiler numeric operations to pass all/test_numeric.py
  • Loading branch information
hjoo authored and icexelloss committed Aug 22, 2019
1 parent 675a89f commit 9ad663f
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 13 deletions.
2 changes: 1 addition & 1 deletion ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import ibis.spark.api as spark # noqa: F401

with suppress(ImportError):
import ibis.pyspark.api as pyspark
import ibis.pyspark.api as pyspark # noqa: F401


def hdfs_connect(
Expand Down
14 changes: 13 additions & 1 deletion ibis/pyspark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ibis.pyspark.operations import PysparkTable
from ibis.spark.client import SparkClient

from pyspark.sql.column import Column


class PysparkClient(SparkClient):
"""
Expand All @@ -21,7 +23,17 @@ def execute(self, expr, params=None, limit='default', **kwargs):

if isinstance(expr, types.TableExpr):
return self.compile(expr).toPandas()
elif isinstance(expr, types.ColumnExpr):
# expression must be named for the projection
expr = expr.name('tmp')
return self.compile(expr.to_projection()).toPandas()['tmp']
elif isinstance(expr, types.ScalarExpr):
return self.compile(expr).toPandas().iloc[0, 0]
compiled = self.compile(expr)
if isinstance(compiled, Column):
# attach result column to a fake DataFrame and
# select the result
compiled = self._session.range(0, 1) \
.select(compiled)
return compiled.toPandas().iloc[0, 0]
else:
raise ValueError("Unexpected type: ", type(expr))
200 changes: 191 additions & 9 deletions ibis/pyspark/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools

import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.window import Window

import ibis.common as com
Expand Down Expand Up @@ -55,17 +56,19 @@ def compile_datasource(t, expr):
def compile_selection(t, expr):
op = expr.op()

if isinstance(op.selections[0], types.ColumnExpr):
column_names = [expr.op().name for expr in op.selections]
src_table = t.translate(op.table)[column_names]
elif isinstance(op.selections[0], types.TableExpr):
src_table = t.translate(op.table)
for selection in op.selections[1:]:
src_table = t.translate(op.table)
col_names_in_selection_order = []
for selection in op.selections:
if isinstance(selection, types.TableExpr):
col_names_in_selection_order.extend(selection.columns)
elif isinstance(selection, types.ColumnExpr):
column_name = selection.get_name()
column = t.translate(selection)
src_table = src_table.withColumn(column_name, column)
col_names_in_selection_order.append(column_name)
if column_name not in src_table.columns:
column = t.translate(selection)
src_table = src_table.withColumn(column_name, column)

return src_table
return src_table[col_names_in_selection_order]


@compiles(ops.TableColumn)
Expand Down Expand Up @@ -280,6 +283,185 @@ def compile_greatest(t, expr):
return F.greatest(*src_columns)


@compiles(ops.Least)
def compile_least(t, expr):
op = expr.op()

src_columns = t.translate(op.arg)
if len(src_columns) == 1:
return src_columns[0]
else:
return F.least(*src_columns)


@compiles(ops.Abs)
def compile_abs(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.abs(src_column)


@compiles(ops.Round)
def compile_round(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
scale = op.digits.op().value if op.digits is not None else 0
rounded = F.round(src_column, scale=scale)
if scale == 0:
rounded = rounded.astype('long')
return rounded


@compiles(ops.Ceil)
def compile_ceil(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.ceil(src_column)


@compiles(ops.Floor)
def compile_floor(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.floor(src_column)


@compiles(ops.Exp)
def compile_exp(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.exp(src_column)


@compiles(ops.Sign)
def compile_sign(t, expr):
op = expr.op()

src_column = t.translate(op.arg)

return F.when(src_column == 0, F.lit(0.0)) \
.otherwise(F.when(src_column > 0, F.lit(1.0)).otherwise(-1.0))


@compiles(ops.Sqrt)
def compile_sqrt(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.sqrt(src_column)


@compiles(ops.Log)
def compile_log(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.log(float(op.base.op().value), src_column)


@compiles(ops.Ln)
def compile_ln(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.log(src_column)


@compiles(ops.Log2)
def compile_log2(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.log2(src_column)


@compiles(ops.Log10)
def compile_log10(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.log10(src_column)


@compiles(ops.Modulus)
def compile_modulus(t, expr):
op = expr.op()

left = t.translate(op.left)
right = t.translate(op.right)
return left % right


@compiles(ops.Negate)
def compile_negate(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return -src_column


@compiles(ops.Add)
def compile_add(t, expr):
op = expr.op()

left = t.translate(op.left)
right = t.translate(op.right)
return left + right


@compiles(ops.Divide)
def compile_divide(t, expr):
op = expr.op()

left = t.translate(op.left)
right = t.translate(op.right)
return left / right


@compiles(ops.FloorDivide)
def compile_floor_divide(t, expr):
op = expr.op()

left = t.translate(op.left)
right = t.translate(op.right)
return F.floor(left / right)


@compiles(ops.Power)
def compile_power(t, expr):
op = expr.op()

left = t.translate(op.left)
right = t.translate(op.right)
return F.pow(left, right)


@compiles(ops.IsNan)
def compile_isnan(t, expr):
op = expr.op()

src_column = t.translate(op.arg)
return F.isnan(src_column)


@compiles(ops.IsInf)
def compile_isinf(t, expr):
import numpy as np
op = expr.op()

@pandas_udf('boolean', PandasUDFType.SCALAR)
def isinf(v):
return np.isinf(v)

src_column = t.translate(op.arg)
return isinf(src_column)


@compiles(ops.ValueList)
def compile_value_list(t, expr):
op = expr.op()
Expand Down
1 change: 1 addition & 0 deletions ibis/pyspark/operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ibis.expr.operations as ops


class PysparkTable(ops.DatabaseTable):
pass
12 changes: 11 additions & 1 deletion ibis/pyspark/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def client():
def test_basic(client):
table = client.table('table1')
result = table.compile().toPandas()
expected = pd.DataFrame({'id': range(0, 10)})
expected = pd.DataFrame({'id': range(0, 10), 'str_col': 'value'})

tm.assert_frame_equal(result, expected)

Expand All @@ -36,6 +36,7 @@ def test_projection(client):
expected1 = pd.DataFrame(
{
'id': range(0, 10),
'str_col': 'value',
'v': range(0, 10)
}
)
Expand All @@ -48,6 +49,7 @@ def test_projection(client):
expected2 = pd.DataFrame(
{
'id': range(0, 10),
'str_col': 'value',
'v': range(0, 10),
'v2': range(0, 10)
}
Expand Down Expand Up @@ -126,10 +128,18 @@ def test_selection(client):

result1 = table[['id']].compile()
result2 = table[['id', 'id2']].compile()
result3 = table[[table, (table.id + 1).name('plus1')]].compile()
result4 = table[[(table.id + 1).name('plus1'), table]].compile()

df = table.compile()
tm.assert_frame_equal(result1.toPandas(), df[['id']].toPandas())
tm.assert_frame_equal(result2.toPandas(), df[['id', 'id2']].toPandas())
tm.assert_frame_equal(result3.toPandas(),
df[[df.columns]].withColumn('plus1', df.id + 1)
.toPandas())
tm.assert_frame_equal(result4.toPandas(),
df.withColumn('plus1', df.id + 1)
[['plus1', *df.columns]].toPandas())


def test_join(client):
Expand Down
3 changes: 2 additions & 1 deletion ibis/tests/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ def awards_players(self) -> ir.TableExpr:
return self.connection.table('awards_players')


class PySpark(Backend, RoundHalfToEven):
class PySpark(Backend, RoundAwayFromZero):

@staticmethod
def skip_if_missing_dependencies() -> None:
pytest.importorskip('pyspark')
Expand Down

0 comments on commit 9ad663f

Please sign in to comment.