Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
icexelloss committed Aug 22, 2019
1 parent 4764a4e commit e00dc00
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 111 deletions.
4 changes: 2 additions & 2 deletions ibis/pyspark/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ibis.pyspark.client import PysparkClient
from ibis.pyspark.client import PySparkClient


def connect(session):
Expand All @@ -7,7 +7,7 @@ def connect(session):
which pipes them into SparkContext. See documentation for SparkContext:
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext
"""
client = PysparkClient(session)
client = PySparkClient(session)

# Spark internally stores timestamps as UTC values, and timestamp data that
# is brought in without a specified time zone is converted as local time to
Expand Down
30 changes: 17 additions & 13 deletions ibis/pyspark/client.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
from pyspark.sql.column import Column

import ibis.common.exceptions as com
import ibis.expr.types as types
from ibis.pyspark.compiler import translate
from ibis.pyspark.operations import PysparkTable
from ibis.pyspark.compiler import PySparkExprTranslator
from ibis.pyspark.operations import PySparkTable
from ibis.spark.client import SparkClient

from pyspark.sql.column import Column


class PysparkClient(SparkClient):
class PySparkClient(SparkClient):
"""
An ibis client that uses Pyspark SQL Dataframe
An ibis client that uses PySpark SQL Dataframe
"""

dialect = None
table_class = PysparkTable
table_class = PySparkTable

def __init__(self, session):
super().__init__(session)
self.translator = PySparkExprTranslator()

def compile(self, expr, *args, **kwargs):
"""Compile an ibis expression to a Pyspark DataFrame object
"""Compile an ibis expression to a PySpark DataFrame object
"""
return translate(expr)
return self.translator.translate(expr)

def execute(self, expr, params=None, limit='default', **kwargs):

if isinstance(expr, types.TableExpr):
return self.compile(expr).toPandas()
elif isinstance(expr, types.ColumnExpr):
Expand All @@ -32,8 +36,8 @@ def execute(self, expr, params=None, limit='default', **kwargs):
if isinstance(compiled, Column):
# attach result column to a fake DataFrame and
# select the result
compiled = self._session.range(0, 1) \
.select(compiled)
compiled = self._session.range(0, 1).select(compiled)
return compiled.toPandas().iloc[0, 0]
else:
raise ValueError("Unexpected type: ", type(expr))
raise com.IbisError(
"Cannot execute expression of type: {}".format(type(expr)))
Loading

0 comments on commit e00dc00

Please sign in to comment.