Skip to content

Commit

Permalink
Add pandas udf support, add SQL persist broadcast (#27)
Browse files Browse the repository at this point in the history
* setup fugue sql

* SQL: add basic extensions and tests

* update

* update

* clean up sql files

* fix syntax, add save load

* add test for load

* FugueSQLWorkflow

* update version

* Add pandas udf support, add SQL persist broadcast

* update

* update

* update

* update

* update
  • Loading branch information
Han Wang authored Jun 12, 2020
1 parent a55dc33 commit 05ab46d
Show file tree
Hide file tree
Showing 23 changed files with 7,687 additions and 7,458 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
.PHONY: help clean dev docs package test

.EXPORT_ALL_VARIABLES:
ARROW_PRE_0_15_IPC_FORMAT = 1

help:
@echo "The following make targets are available:"
@echo " devenv create venv and install all deps for dev env (assumes python3 cmd exists)"
Expand All @@ -15,9 +18,11 @@ devenv:
python3 -m venv venv
. venv/bin/activate
pip3 install -r requirements.txt
pip3 install .[all]

dev:
pip3 install -r requirements.txt
pip3 install .[all]

docs:
rm -rf docs/api
Expand Down
21 changes: 20 additions & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,20 @@
__version__ = "0.2.5"
# flake8: noqa

__version__ = "0.2.6"

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.dataframe import DataFrame, LocalBoundedDataFrame, LocalDataFrame
from fugue.dataframe.dataframes import DataFrames
from fugue.dataframe.iterable_dataframe import IterableDataFrame
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.dataframe.utils import to_local_bounded_df, to_local_df
from fugue.execution.execution_engine import ExecutionEngine, SQLEngine
from fugue.execution.native_execution_engine import NativeExecutionEngine, SqliteEngine
from fugue.extensions.creator import Creator, creator
from fugue.extensions.outputter import Outputter, outputter
from fugue.extensions.processor import Processor, processor
from fugue.extensions.transformer import CoTransformer, Transformer, transformer
from fugue.workflow.workflow import FugueWorkflow, WorkflowDataFrame
from fugue.workflow.workflow_context import FugueWorkflowContext
2 changes: 2 additions & 0 deletions fugue/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# flake8: noqa
from fugue.collections.partition import PartitionSpec, PartitionCursor
5 changes: 5 additions & 0 deletions fugue/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# flake8: noqa
from fugue.extensions.transformer import Transformer, CoTransformer, transformer
from fugue.extensions.creator import Creator, creator
from fugue.extensions.processor import Processor, processor
from fugue.extensions.outputter import Outputter, outputter
1 change: 1 addition & 0 deletions fugue/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa

from fugue.workflow.workflow import FugueWorkflow, WorkflowDataFrame
from fugue.workflow.workflow_context import FugueWorkflowContext
1 change: 1 addition & 0 deletions fugue_dask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# flake8: noqa
from fugue_dask.dataframe import DaskDataFrame
from fugue_dask.execution_engine import DaskExecutionEngine
3 changes: 3 additions & 0 deletions fugue_spark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8: noqa
from fugue_spark.dataframe import SparkDataFrame
from fugue_spark.execution_engine import SparkExecutionEngine
3 changes: 3 additions & 0 deletions fugue_spark/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Dict, Any

FUGUE_SPARK_DEFAULT_CONF: Dict[str, Any] = {"fugue.spark.use_pandas_udf": False}
62 changes: 60 additions & 2 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import pandas as pd
import pyarrow as pa
import pyspark.sql as ps
from fugue.collections.partition import (
Expand All @@ -11,12 +12,14 @@
from fugue.constants import KEYWORD_ROWCOUNT
from fugue.dataframe import DataFrame, DataFrames, IterableDataFrame, LocalDataFrame
from fugue.dataframe.arrow_dataframe import ArrowDataFrame
from fugue.dataframe.pandas_dataframe import PandasDataFrame
from fugue.dataframe.utils import get_join_schemas
from fugue.execution.execution_engine import (
_DEFAULT_JOIN_KEYS,
ExecutionEngine,
SQLEngine,
)
from fugue_spark.constants import FUGUE_SPARK_DEFAULT_CONF
from fugue_spark.dataframe import SparkDataFrame
from fugue_spark.utils.convert import to_schema, to_spark_schema, to_type_safe_input
from fugue_spark.utils.io import SparkIO
Expand All @@ -28,10 +31,11 @@
from pyspark import StorageLevel
from pyspark.rdd import RDD
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col
from pyspark.sql.functions import PandasUDFType, broadcast, col, pandas_udf
from triad.collections import ParamDict, Schema
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.hash import to_uuid
from triad.utils.iter import EmptyAwareIterable
from triad.utils.threading import RunOnce

Expand Down Expand Up @@ -69,7 +73,8 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non
if spark_session is None:
spark_session = SparkSession.builder.getOrCreate()
self._spark_session = spark_session
cf = {x[0]: x[1] for x in spark_session.sparkContext.getConf().getAll()}
cf = dict(FUGUE_SPARK_DEFAULT_CONF)
cf.update({x[0]: x[1] for x in spark_session.sparkContext.getConf().getAll()})
cf.update(ParamDict(conf))
super().__init__(cf)
self._fs = FileSystem()
Expand Down Expand Up @@ -183,6 +188,19 @@ def map(
metadata: Any = None,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
) -> DataFrame:
if (
self.conf.get_or_throw("fugue.spark.use_pandas_udf", bool)
and len(partition_spec.partition_by) > 0
and not any(pa.types.is_nested(t) for t in Schema(output_schema).types)
):
return self._map_by_pandas_udf(
df,
map_func=map_func,
output_schema=output_schema,
partition_spec=partition_spec,
metadata=metadata,
on_init=on_init,
)
df = self.to_df(self.repartition(df, partition_spec))
mapper = _Mapper(df, map_func, output_schema, partition_spec, on_init)
sdf = df.native.rdd.mapPartitionsWithIndex(mapper.run, True)
Expand Down Expand Up @@ -272,6 +290,46 @@ def _register(self, df: SparkDataFrame, name: str) -> SparkDataFrame:
df.native.createOrReplaceTempView(name)
return df

def _map_by_pandas_udf(
self,
df: DataFrame,
map_func: Callable[[PartitionCursor, LocalDataFrame], LocalDataFrame],
output_schema: Any,
partition_spec: PartitionSpec,
metadata: Any = None,
on_init: Optional[Callable[[int, DataFrame], Any]] = None,
) -> DataFrame:
presort = partition_spec.presort
presort_keys = list(presort.keys())
presort_asc = list(presort.values())
output_schema = Schema(output_schema)
input_schema = df.schema
on_init_once: Any = None if on_init is None else RunOnce(
on_init, lambda *args, **kwargs: to_uuid(id(on_init), id(args[0]))
)

def _udf(pdf: Any) -> pd.DataFrame: # pragma: no cover
if pdf.shape[0] == 0:
return PandasDataFrame([], output_schema).as_pandas()
if len(presort_keys) > 0:
pdf = pdf.sort_values(presort_keys, ascending=presort_asc)
input_df = PandasDataFrame(
pdf.reset_index(drop=True), input_schema, pandas_df_wrapper=True
)
if on_init_once is not None:
on_init_once(0, input_df)
cursor = partition_spec.get_cursor(input_schema, 0)
cursor.set(input_df.peek_array(), 0, 0)
output_df = map_func(cursor, input_df)
return output_df.as_pandas()

df = self.to_df(df)
udf = pandas_udf(
_udf, to_spark_schema(output_schema), PandasUDFType.GROUPED_MAP
)
sdf = df.native.groupBy(*partition_spec.partition_by).apply(udf)
return SparkDataFrame(sdf, metadata=metadata)


class _Mapper(object): # pragma: no cover
# pytest can't identify the coverage, but this part is fully tested
Expand Down
2 changes: 2 additions & 0 deletions fugue_sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# flake8: noqa
from fugue_sql.workflow import FugueSQLWorkflow
44 changes: 27 additions & 17 deletions fugue_sql/antlr/fugue_sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ fugueNestableTask
| fugueSelectTask
;

fugueSelectTask:
(assign=fugueAssignment)? (partition=fuguePrepartition)? q=query (persist=fuguePersist)? (broadcast=fugueBroadcast)?
;

fugueNestableTaskNoSelect:
(assign=fugueAssignment)? df=fugueNestableTaskCollectionNoSelect
(assign=fugueAssignment)? df=fugueNestableTaskCollectionNoSelect (persist=fuguePersist)? (broadcast=fugueBroadcast)?
;

fugueNestableTaskCollectionNoSelect
Expand All @@ -118,32 +122,28 @@ fugueNestableTaskCollectionNoSelect
| fugueLoadTask
;

fugueSelectTask:
(assign=fugueAssignment)? (partition=fuguePrepartition)? q=query (persist=fuguePersist)? (broadcast=fugueBroadcast)?
;

fugueTransformTask:
TRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommonWild (persist=fuguePersist)? (broadcast=fugueBroadcast)?
TRANSFORM (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommonWild
;

fugueProcessTask:
PROCESS (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommon (persist=fuguePersist)? (broadcast=fugueBroadcast)?
PROCESS (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommon
;

fugueZipTask:
ZIP dfs=fugueDataFrames (how=fugueZipType)? (BY by=fugueCols)? (PRESORT presort=fugueColsSort)? (persist=fuguePersist)? (broadcast=fugueBroadcast)?
ZIP dfs=fugueDataFrames (how=fugueZipType)? (BY by=fugueCols)? (PRESORT presort=fugueColsSort)?
;

fugueCreateTask:
CREATE params=fugueSingleOutputExtensionCommon (persist=fuguePersist)? (broadcast=fugueBroadcast)?
CREATE params=fugueSingleOutputExtensionCommon
;

fugueCreateDataTask:
CREATE DATA? data=fugueJsonArray SCHEMA schema=fugueSchema (persist=fuguePersist)? (broadcast=fugueBroadcast)?
CREATE DATA? data=fugueJsonArray SCHEMA schema=fugueSchema
;

fugueLoadTask:
LOAD (fmt=fugueFileFormat)? path=fuguePath (params=fugueParams)? (COLUMNS columns=fugueLoadColumns)? (persist=fuguePersist)? (broadcast=fugueBroadcast)?
LOAD (fmt=fugueFileFormat)? path=fuguePath (params=fugueParams)? (COLUMNS columns=fugueLoadColumns)?
;

fugueOutputTask:
Expand Down Expand Up @@ -183,8 +183,13 @@ fuguePath
: STRING
;

fuguePersist:
PERSIST (value=identifier)?
fuguePersist
: PERSIST (value=fuguePersistValue)?
| checkpoint=CHECKPOINT
;

fuguePersistValue
: multipartIdentifier
;

fugueBroadcast:
Expand All @@ -211,7 +216,7 @@ fugueAssignment:

fugueAssignmentSign
: COLONEQUAL
// | CHECKPOINT // TODO: add checkpoint
// | CHECKPOINTSIGN // TODO: add checkpoint
| {self.simpleAssign}? EQUAL
;

Expand Down Expand Up @@ -800,17 +805,21 @@ fromStatementBody

querySpecification
: transformClause
fromClause?
optionalFromClause
whereClause? #transformQuerySpecification
| selectClause
fromClause?
optionalFromClause
lateralView*
whereClause?
aggregationClause?
havingClause?
windowClause? #regularQuerySpecification
;

optionalFromClause // add this to easily capture no FROM cases
: fromClause?
;

transformClause
: (SELECT kind=TRANSFORM '(' namedExpressionSeq ')'
| kind=MAP namedExpressionSeq
Expand Down Expand Up @@ -1796,7 +1805,8 @@ JSON: 'JSON';
SINGLE: 'SINGLE';

COLONEQUAL: ':=';
CHECKPOINT: '??';
CHECKPOINT: 'CHECKPOINT';
CHECKPOINTSIGN: '??';

//================================
// End of the Fugue keywords list
Expand Down
Loading

0 comments on commit 05ab46d

Please sign in to comment.