Skip to content

Commit

Permalink
Drop columns and na -- Fugue SQL (#89)
Browse files Browse the repository at this point in the history
* save and use

* save and use

* drop statements in fugue sql

* bump up version

* bump up version
  • Loading branch information
goodwanghan committed Oct 29, 2020
1 parent 9313356 commit 0b1af42
Show file tree
Hide file tree
Showing 11 changed files with 5,701 additions and 5,107 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ pre-commit install

## Update History

### 0.4.3

* Unified checkpoints and persist
* Drop columns and na implementations in both programming and sql interfaces
* Presort takes array as input
* Fixed jinja template rendering issue
* Fixed path format detection bug

### 0.4.2

* Require pandas 1.0 because of parquet schema
Expand Down
1 change: 1 addition & 0 deletions fugue/extensions/_builtins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
RunSetOperation,
RunSQLSelect,
RunTransformer,
SaveAndUse,
SelectColumns,
Zip,
)
22 changes: 22 additions & 0 deletions fugue/extensions/_builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,28 @@ def process(self, dfs: DataFrames) -> DataFrame:
return dfs[0][columns]


class SaveAndUse(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
kwargs = self.params.get("params", dict())
path = self.params.get_or_throw("path", str)
format_hint = self.params.get("fmt", "")
mode = self.params.get("mode", "overwrite")
partition_spec = self.partition_spec
force_single = self.params.get("single", False)

self.execution_engine.save_df(
df=dfs[0],
path=path,
format_hint=format_hint,
mode=mode,
partition_spec=partition_spec,
force_single=force_single,
**kwargs
)
return self.execution_engine.load_df(path=path, format_hint=format_hint)


class _TransformerRunner(object):
def __init__(
self, df: DataFrame, transformer: Transformer, ignore_errors: List[type]
Expand Down
36 changes: 36 additions & 0 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RunSQLSelect,
RunTransformer,
Save,
SaveAndUse,
SelectColumns,
Show,
Zip,
Expand Down Expand Up @@ -750,6 +751,41 @@ def save(
params=dict(path=path, fmt=fmt, mode=mode, single=single, params=kwargs),
)

def save_and_use(
self: TDF,
path: str,
fmt: str = "",
mode: str = "overwrite",
partition: Any = None,
single: bool = False,
**kwargs: Any,
) -> TDF:
"""Save this dataframe to a persistent storage and load back to use
in the following steps
:param path: output path
:param fmt: format hint can accept ``parquet``, ``csv``, ``json``,
defaults to None, meaning to infer
:param mode: can accept ``overwrite``, ``append``, ``error``,
defaults to "overwrite"
:param partition: |PartitionLikeObject|, how to partition the
dataframe before saving, defaults to empty
:param single: force the output as a single file, defaults to False
:param kwargs: parameters to pass to the underlying framework
For more details and examples, read
:ref:`Save & Load <tutorial:/tutorials/dag.ipynb#save-&-load>`.
"""
if partition is None:
partition = self._metadata.get("pre_partition", PartitionSpec())
df = self.workflow.process(
self,
using=SaveAndUse,
pre_partition=partition,
params=dict(path=path, fmt=fmt, mode=mode, single=single, params=kwargs),
)
return self._to_self_type(df)

@property
def schema(self) -> Schema: # pragma: no cover
"""
Expand Down
15 changes: 15 additions & 0 deletions fugue_sql/_antlr/fugue_sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ fugueNestableTaskCollectionNoSelect
| fugueCreateTask
| fugueCreateDataTask
| fugueLoadTask
| fugueSaveAndUseTask
| fugueDropColumnsTask
| fugueDropnaTask
;

fugueTransformTask:
Expand All @@ -121,6 +124,18 @@ fugueProcessTask:
PROCESS (dfs=fugueDataFrames)? (partition=fuguePrepartition)? params=fugueSingleOutputExtensionCommon
;

fugueSaveAndUseTask:
SAVE AND USE (df=fugueDataFrame)? (partition=fuguePrepartition)? m=fugueSaveMode (single=fugueSingleFile)? (fmt=fugueFileFormat)? path=fuguePath (params=fugueParams)?
;

fugueDropColumnsTask:
DROP COLUMNS cols=fugueCols (IF EXISTS)? (FROM df=fugueDataFrame)?
;

fugueDropnaTask:
DROP ROWS IF how=(ALL|ANY) (NULL|NULLS) (ON cols=fugueCols)? (FROM df=fugueDataFrame)?
;

fugueZipTask:
ZIP dfs=fugueDataFrames (how=fugueZipType)? (BY by=fugueCols)? (PRESORT presort=fugueColsSort)?
;
Expand Down
10,607 changes: 5,501 additions & 5,106 deletions fugue_sql/_antlr/fugue_sqlParser.py

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions fugue_sql/_antlr/fugue_sqlVisitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ def visitFugueProcessTask(self, ctx:fugue_sqlParser.FugueProcessTaskContext):
return self.visitChildren(ctx)


# Visit a parse tree produced by fugue_sqlParser#fugueSaveAndUseTask.
def visitFugueSaveAndUseTask(self, ctx:fugue_sqlParser.FugueSaveAndUseTaskContext):
return self.visitChildren(ctx)


# Visit a parse tree produced by fugue_sqlParser#fugueDropColumnsTask.
def visitFugueDropColumnsTask(self, ctx:fugue_sqlParser.FugueDropColumnsTaskContext):
return self.visitChildren(ctx)


# Visit a parse tree produced by fugue_sqlParser#fugueDropnaTask.
def visitFugueDropnaTask(self, ctx:fugue_sqlParser.FugueDropnaTaskContext):
return self.visitChildren(ctx)


# Visit a parse tree produced by fugue_sqlParser#fugueZipTask.
def visitFugueZipTask(self, ctx:fugue_sqlParser.FugueZipTaskContext):
return self.visitChildren(ctx)
Expand Down
40 changes: 40 additions & 0 deletions fugue_sql/_visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,46 @@ def visitFugueSaveTask(self, ctx: fp.FugueSaveTaskContext):
**data.get("params", {}),
)

def visitFugueSaveAndUseTask(self, ctx: fp.FugueSaveAndUseTaskContext):
data = self.get_dict(
ctx, "partition", "df", "m", "single", "fmt", "path", "params"
)
if "df" in data:
df = data["df"]
else:
df = self.last
return df.save_and_use(
path=data["path"],
fmt=data.get("fmt", ""),
mode=data["m"],
partition=data.get("partition"),
single="single" in data,
**data.get("params", {}),
)

def visitFugueDropColumnsTask(self, ctx: fp.FugueDropColumnsTaskContext):
data = self.get_dict(ctx, "cols", "df")
if "df" in data:
df = data["df"]
else:
df = self.last
return df.drop(
columns=data["cols"],
if_exists=ctx.IF() is not None,
)

def visitFugueDropnaTask(self, ctx: fp.FugueDropnaTaskContext):
data = self.get_dict(ctx, "cols", "df")
if "df" in data:
df = data["df"]
else:
df = self.last
params: Dict[str, Any] = {}
params["how"] = "any" if ctx.ANY() is not None else "all"
if "cols" in data:
params["subset"] = data["cols"]
return df.dropna(**params)

def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
data = self.get_dict(ctx, "fmt", "path", "params", "columns")
return self.workflow.load(
Expand Down
12 changes: 12 additions & 0 deletions fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,18 @@ def test_io(self):
a = dag.load(path2, header=True, columns="c:int,a:long")
a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long"))

def test_save_and_use(self):
path = os.path.join(self.tmpdir, "a")
with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
c = b.save_and_use(path, fmt="parquet")
b.assert_eq(c)

with self.dag() as dag:
b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
d = dag.load(path, fmt="parquet")
b.assert_eq(d)


def mock_creator(p: int) -> DataFrame:
return ArrayDataFrame([[p]], "a:int")
Expand Down
2 changes: 1 addition & 1 deletion fugue_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.2"
__version__ = "0.4.3"
50 changes: 50 additions & 0 deletions tests/fugue_sql/test_workflow_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,31 @@ def test_save():
)


def test_save_and_use():
dag = FugueWorkflow()
a = dag.create(mock_create1, params=dict(n=1))
b = dag.create(mock_create1, params=dict(n=1))
a = a.save_and_use("xx", fmt="parquet", mode="overwrite")
b.save_and_use("xx", mode="append")
b.save_and_use("xx", mode="error")
a = a.save_and_use("xx.csv", fmt="csv", mode="error", single=True, header=True)
a = a.partition(by=["x"]).save_and_use("xx", mode="overwrite")
dag.create(mock_create1, params=dict(n=2)).save_and_use("xx", mode="overwrite")
assert_eq(
"""
a=create using mock_create1(n=1)
b=create using mock_create1(n=1)
a=save and use a overwrite parquet "xx"
save and use b append "xx"
save and use b to "xx"
save and use a to single csv "xx.csv"(header=True)
save and use prepartition by x overwrite "xx"
save and use (create using mock_create1(n=2)) overwrite "xx"
""",
dag,
)


def test_load():
dag = FugueWorkflow()
dag.load("xx")
Expand All @@ -410,6 +435,31 @@ def test_load():
)


def test_drop():
dag = FugueWorkflow()
a = dag.create(mock_create1)
b = a.drop(["a", "b"])
c = a.drop(["a", "b"], if_exists=True)

d = dag.create(mock_create1)
e = d.dropna(how="any")
f = d.dropna(how="all")
g = d.dropna(how="any", subset=["a", "c"])
assert_eq(
"""
a=create using mock_create1
drop columns a,b
drop columns a,b if exists from a
d=create using mock_create1
drop rows if any null
drop rows if all null from d
drop rows if any nulls on a,c from d
""",
dag,
)


def assert_eq(expr, expected: FugueWorkflow):
sql = FugueSQL(expr, "fugueLanguage", ignore_case=True, simple_assign=True)
wf = FugueWorkflow()
Expand Down

0 comments on commit 0b1af42

Please sign in to comment.