-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT: PySpark backend #1913
FEAT: PySpark backend #1913
Conversation
Hello @icexelloss! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found: There are currently no PEP 8 issues detected in this Pull Request. Cheers! 🍻 Comment last updated at 2019-08-22 18:41:54 UTC |
Currently this implementation passes test_aggregation and test_numeric. @hjoo and I are working on passing the rest of tests under "all" but I'd like to have this up to see if this approach makes sense. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basic approach looks good!
ibis/pyspark/compiler.py
Outdated
|
||
return decorator | ||
|
||
def translate(self, expr, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, I think PySparkExprTranslator
can be a pass through
ibis/pyspark/compiler.py
Outdated
elif isinstance(selection, types.ColumnExpr): | ||
column_name = selection.get_name() | ||
col_names_in_selection_order.append(column_name) | ||
if column_name not in src_table.columns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this do the wrong thing for expressions like t[['a', 'b']].mutate(a=lambda t: t.a + 1)
? In that example you still need to compile the new expression for a
even though src_table
still has an a
column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. This is wrong. Let me fix it.
ibis/tests/backends.py
Outdated
|
||
|
||
class PySpark(Backend, RoundAwayFromZero): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can kill the newline here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
ibis/pyspark/compiler.py
Outdated
op = expr.op() | ||
|
||
src_column = t.translate(op.arg) | ||
return F.log(float(op.base.op().value), src_column) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want to translate the first argument here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
ibis/pyspark/compiler.py
Outdated
op = expr.op() | ||
|
||
src_column = t.translate(op.arg) | ||
scale = op.digits.op().value if op.digits is not None else 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to translate this instead of pulling out the value from the ibis literal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
ibis/pyspark/compiler.py
Outdated
return client._session.table(name) | ||
|
||
|
||
def compile_table_and_cache(t, expr, cache): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cpcloud I found that when compiling selections I keep recompile the same TableExpr over and over again because it is referenced from multiple places. I end up adding cache for expressions that has been compiled and pass it around.
Is this a reasonable approach? How did you deal with similar problems with Pandas backend?
14a9b60
to
4bb17f9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more round, and this should be good to merge.
if context: | ||
return col | ||
else: | ||
return t.translate(expr.op().arg.op().table, scope).select(col) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, what are you trying to pull out here with this chain of op
accesses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this is a bit convoluted:
The expression is sth like "some_col.max()"
What I need to translate that to is "df.select(max(some_col))" to make it a lazy Spark expression
The expr.op().arg.op().table
thing is to get to the table of the column. I will add a comment
|
||
left_df = t.translate(op.left) | ||
right_df = t.translate(op.right) | ||
# TODO: Handle multiple predicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
|
||
left_df = t.translate(op.left) | ||
right_df = t.translate(op.right) | ||
# TODO: Handle multiple predicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write an xfailing test that will pass when this is implemented?
Looks like there's a merge conflict as well. |
97a4ece
to
9fad68e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @icexelloss
Thanks @hjoo! |
* implement pyspark compiler numeric operations to pass all/test_numeric.py
4e5a86c
to
213e371
Compare
Codecov Report
@@ Coverage Diff @@
## master #1913 +/- ##
==========================================
- Coverage 87.46% 85.99% -1.48%
==========================================
Files 89 93 +4
Lines 16405 16718 +313
Branches 2093 2120 +27
==========================================
+ Hits 14349 14376 +27
- Misses 1660 1943 +283
- Partials 396 399 +3
|
Merging. Thanks @icexelloss! |
This is a Pyspark backend for ibis. This is different from the spark backend where the ibis expr is compiled to SQL string. Instead, the pyspark backend compiles the ibis expr to pyspark.DataFrame exprs.