-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement pyspark numeric operations to pass all/test_numeric.py #9
implement pyspark numeric operations to pass all/test_numeric.py #9
Conversation
elif isinstance(expr, types.ColumnExpr): | ||
# expression must be named for the projection | ||
expr = expr.name('tmp') | ||
return self.compile(expr.to_projection()).toPandas()['tmp'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does to_projection()
do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns the column expression into a table projection so self.compile() returns a data frame.
ibis/pyspark/client.py
Outdated
# attach result column to a fake DataFrame and | ||
# select the result | ||
compiled = self._session.range(0, 1) \ | ||
.withColumn("result", compiled) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just do
self._session.range(0, 1).select(compiled)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that works.
scale = op.digits.op().value if op.digits is not None else 0 | ||
rounded = F.round(src_column, scale=scale) | ||
if scale == 0: | ||
rounded = rounded.astype('long') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to cast type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected result is np.int64 when decimals or scale is 0. https://github.com/ibis-project/ibis/blob/master/ibis/tests/backends.py#L34
ibis/pyspark/compiler.py
Outdated
import numpy as np | ||
op = expr.op() | ||
|
||
@pandas_udf('double', PandasUDFType.SCALAR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rewrite this using sql operations? e.g
F.when(src_column == 0, lit(0)).otherwise(F.when(src_column > 0, lit(1)).otherwise(-1))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* implement pyspark compiler numeric operations to pass all/test_numeric.py
* implement pyspark compiler numeric operations to pass all/test_numeric.py
* implement pyspark compiler numeric operations to pass all/test_numeric.py
* implement pyspark compiler numeric operations to pass all/test_numeric.py
* implement pyspark compiler numeric operations to pass all/test_numeric.py
This is a Pyspark backend for ibis. This is different from the spark backend where the ibis expr is compiled to SQL string. Instead, the pyspark backend compiles the ibis expr to pyspark.DataFrame exprs. Author: Li Jin <ice.xelloss@gmail.com> Author: Hyonjee Joo <5000208+hjoo@users.noreply.github.com> Closes ibis-project#1913 from icexelloss/pyspark-backend-prototype and squashes the following commits: 213e371 [Li Jin] Add pyspark/__init__.py 8f1c35e [Li Jin] Address comments f173425 [Li Jin] Fix tests 0969b0a [Li Jin] Skip unimplemented tests 1f9409b [Li Jin] Change pyspark imports to optional 26b041c [Li Jin] Add importskip 108ccd8 [Li Jin] Add scope e00dc00 [Li Jin] Address PR comments 4764a4e [Li Jin] Add pyspark marker to setup.cfg 7cc2a9e [Li Jin] Remove dead code 72b45f8 [Li Jin] Fix rebase errors 9ad663f [Hyonjee Joo] implement pyspark numeric operations to pass all/test_numeric.py (#9) 675a89f [Li Jin] Implement compiler rules to pass all/test_aggregation.py 215c0d9 [Li Jin] Link existing tests with PySpark backend (#7) 88705fe [Li Jin] Implement basic join c4a2b79 [Hyonjee Joo] add pyspark compile rule for greatest, fix bug with selection (#4) fa4ad23 [Li Jin] Implement basic aggregation, group_by and window (#3) 54c2f2d [Li Jin] Initial commit of pyspark DataFrame backend (#1)
This PR passes all numeric tests in
ibis/tests/all/test_numeric.py
except fortest_divide_by_zero
which is skipped for the PySpark backend (PySpark does not support col / 0 = inf).Changes made:
Implemented the following operations: least, abs, round, ceil, floor, exp, sign, sqrt, log, ln, log2, log10, modulus, negate, add, divide, floor divide, isnan, isinf.
Implemented column expression execution for PySpark (needed to pass some of the column tests in test_numeric.py)
Fixed selection operation in the PySpark compiler.
Fixed a few lint errors.