Skip to content

Commit

Permalink
[Review] Refactor ConfigContainer to use dask config (dask-contrib#392)
Browse files Browse the repository at this point in the history
* Add dask config module to dask-sql

* Update context to use dask-sql config instead of ConfigContainer

* Remove distributed utils_test fixtures and add client fixture

* Reduce connection timeout for non reachable test

* Rerun tests

* Mount tempfile directory in independent worker container

* Skip test_fsql on external cluster

* Relax external cluster's conda packages

* Add fixme note to failing fugue test due to missing triad module

* Update case sensitivity test

* Update setup to include the config yaml file for dask-sql

* Add sql schema yaml and update setup to include the schema

* Remove explicit config dict from dask_sql.config

* Update set_config docstring and prevent setting non sql configs

* Remove configContainer in favor of dask-sql config module

* Add config unit tests

* Add dask sql configuration docs and include dask-sphinx-theme ext for rendering config yaml files

* Fix dask-sphinx-theme version constraint for config extention

* Update set_config docstring

* Rerun tests

* Remove context.set_config api in favor of directly using dask

Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
  • Loading branch information
ayushdg and charlesbluca authored Mar 7, 2022
1 parent cd38818 commit 0372ebc
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 196 deletions.
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
recursive-include dask_sql *.yaml

include versioneer.py
include dask_sql/_version.py
1 change: 1 addition & 0 deletions dask_sql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import config
from ._version import get_versions
from .cmd import cmd_loop
from .context import Context
Expand Down
12 changes: 12 additions & 0 deletions dask_sql/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os

import dask
import yaml

fn = os.path.join(os.path.dirname(__file__), "sql.yaml")

with open(fn) as f:
defaults = yaml.safe_load(f)

dask.config.update_defaults(defaults)
dask.config.ensure_file(source=fn, comment=True)
118 changes: 27 additions & 91 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config
from dask.base import optimize
from dask.distributed import Client

Expand Down Expand Up @@ -421,6 +422,7 @@ def sql(
return_futures: bool = True,
dataframes: Dict[str, Union[dd.DataFrame, pd.DataFrame]] = None,
gpu: bool = False,
config_options: Dict[str, Any] = None,
) -> Union[dd.DataFrame, pd.DataFrame]:
"""
Query the registered tables with the given SQL.
Expand Down Expand Up @@ -448,36 +450,39 @@ def sql(
to register before executing this query
gpu (:obj:`bool`): Whether or not to load the additional Dask or pandas dataframes (if any) on GPU;
requires cuDF / dask-cuDF if enabled. Defaults to False.
config_options (:obj:`Dict[str,Any]`): Specific configuration options to pass during
query execution
Returns:
:obj:`dask.dataframe.DataFrame`: the created data frame of this query.
"""
if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)
with dask_config.set(config_options):
if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)

rel, select_names, _ = self._get_ral(sql)
rel, select_names, _ = self._get_ral(sql)

dc = RelConverter.convert(rel, context=self)
dc = RelConverter.convert(rel, context=self)

if dc is None:
return
if dc is None:
return

if select_names:
# Rename any columns named EXPR$* to a more human readable name
cc = dc.column_container
cc = cc.rename(
{
df_col: select_name
for df_col, select_name in zip(cc.columns, select_names)
}
)
dc = DataContainer(dc.df, cc)
if select_names:
# Rename any columns named EXPR$* to a more human readable name
cc = dc.column_container
cc = cc.rename(
{
df_col: select_name
for df_col, select_name in zip(cc.columns, select_names)
}
)
dc = DataContainer(dc.df, cc)

df = dc.assign()
if not return_futures:
df = df.compute()
df = dc.assign()
if not return_futures:
df = df.compute()

return df

Expand Down Expand Up @@ -588,71 +593,6 @@ def register_model(
schema_name = schema_name or self.schema_name
self.schema[schema_name].models[model_name.lower()] = (model, training_columns)

def set_config(
self,
config_options: Union[Tuple[str, Any], Dict[str, Any]],
schema_name: str = None,
):
"""
Add configuration options to a schema.
A configuration option could be used to set the behavior of certain configurirable operations.
Eg: `dask.groupby.agg.split_out` can be used to split the output of a groupby agrregation to multiple partitions.
Args:
config_options (:obj:`Tuple[str,val]` or :obj:`Dict[str,val]`): config_option and value to set
schema_name (:obj:`str`): Optionally select schema for setting configs
Example:
.. code-block:: python
from dask_sql import Context
c = Context()
c.set_config(("dask.groupby.aggregate.split_out", 1))
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)
"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.set_config(config_options)

def drop_config(
self, config_strs: Union[str, List[str]], schema_name: str = None,
):
"""
Drop user set configuration options from schema
Args:
config_strs (:obj:`str` or :obj:`List[str]`): config key or keys to drop
schema_name (:obj:`str`): Optionally select schema for dropping configs
Example:
.. code-block:: python
from dask_sql import Context
c = Context()
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)
c.drop_config(
[
"dask.groupby.aggregate.split_out",
"dask.groupby.aggregate.split_every",
]
)
"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.drop_config(config_strs)

def ipython_magic(self, auto_include=False): # pragma: no cover
"""
Register a new ipython/jupyter magic function "sql"
Expand Down Expand Up @@ -730,7 +670,7 @@ def run_server(

def stop_server(self): # pragma: no cover
"""
Stop a SQL server started by ``run_server`.
Stop a SQL server started by ``run_server``.
"""
if self.sql_server is not None:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -848,11 +788,7 @@ def _get_ral(self, sql):
)

# True if the SQL query should be case sensitive and False otherwise
case_sensitive = (
self.schema[self.schema_name]
.config.get_config_by_prefix("dask.sql.identifier.case.sensitive")
.get("dask.sql.identifier.case.sensitive", True)
)
case_sensitive = dask_config.get("sql.identifier.case_sensitive", default=True)

generator_builder = RelationalAlgebraGeneratorBuilder(
self.schema_name, case_sensitive, java.util.ArrayList()
Expand Down
85 changes: 0 additions & 85 deletions dask_sql/datacontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,88 +255,3 @@ def __init__(self, name: str):
self.models: Dict[str, Tuple[Any, List[str]]] = {}
self.functions: Dict[str, UDF] = {}
self.function_lists: List[FunctionDescription] = []
self.config: ConfigContainer = ConfigContainer()


class ConfigContainer:
"""
Helper class that contains configuration options required for specific operations
Configurations are stored in a dictionary where keys strings are delimited by `.`
for easier nested access of multiple configurations
Example:
Dask groupby aggregate operations can be configured via with the `split_out` option
to determine number of output partitions or the `split_every` option to determine
the number of partitions used during the groupby tree reduction step.
"""

def __init__(self):
self.config_dict = {
# Do not set defaults here unless needed
# This mantains the list of configuration options supported that can be set
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": None,
}

def set_config(self, config_options: Union[Tuple[str, Any], Dict[str, Any]]):
"""
Accepts either a tuple of (config, val) or a dictionary containing multiple
{config1: val1, config2: val2} pairs and updates the schema config with these values
"""
if isinstance(config_options, tuple):
config_options = [config_options]
self.config_dict.update(config_options)

def drop_config(self, config_strs: Union[str, List[str]]):
if isinstance(config_strs, str):
config_strs = [config_strs]
for config_key in config_strs:
self.config_dict.pop(config_key)

def get_config_by_prefix(self, config_prefix: str):
"""
Returns all configuration options matching the prefix in `config_prefix`
Example:
.. code-block:: python
from dask_sql.datacontainer import ConfigContainer
sql_config = ConfigContainer()
sql_config.set_config(
{
"dask.groupby.aggregate.split_out":1,
"dask.groupby.aggregate.split_every": 1,
"dask.sort.persist": True,
}
)
sql_config.get_config_by_prefix("dask.groupby")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1
# }
sql_config.get_config_by_prefix("dask")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1,
# "dask.sort.persist": True
# }
sql_config.get_config_by_prefix("dask.sort")
# Returns {"dask.sort.persist": True}
sql_config.get_config_by_prefix("missing.key")
sql_config.get_config_by_prefix(None)
# Both return {}
"""
return (
{
key: val
for key, val in self.config_dict.items()
if key.startswith(config_prefix)
}
if config_prefix
else {}
)
12 changes: 3 additions & 9 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config

try:
import dask_cudf
Expand Down Expand Up @@ -231,15 +232,8 @@ def _do_aggregations(
for col in group_columns:
collected_aggregations[None].append((col, col, "first"))

groupby_agg_options = context.schema[
context.schema_name
].config.get_config_by_prefix("dask.groupby.aggregate")
# Update the config string to only include the actual param value
# i.e. dask.groupby.aggregate.split_out -> split_out
for config_key in list(groupby_agg_options.keys()):
groupby_agg_options[
config_key.rpartition(".")[2]
] = groupby_agg_options.pop(config_key)
groupby_agg_options = dask_config.get("sql.groupby")

# Now we can go ahead and use these grouped aggregations
# to perform the actual aggregation
# It is very important to start with the non-filtered entry.
Expand Down
28 changes: 28 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
properties:

sql:
type: object
properties:

groupby:
type: object
properties:

split_out:
type: integer
description: |
Number of output partitions for a groupby operation
split_every:
type: [integer, "null"]
description: |
Number of branches per reduction step for a groupby operation.
identifier:
type: object
properties:

case_sensitive:
type: boolean
description: |
Whether sql identifiers are considered case sensitive while parsing.
7 changes: 7 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
sql:
groupby:
split_out: 1
split_every: null

identifier:
case_sensitive: True
6 changes: 5 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon"]
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
"dask_sphinx_theme.ext.dask_config_sphinx_ext",
]

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
- cytoolz=0.11.0=py38h1e0a361_0
- dask=2.28.0=py_0
- dask-core=2.28.0=py_0
- dask-sphinx-theme=1.3.2=pyh9f0ad1d_0
- dask-sphinx-theme>=2.0.3
- distributed=2.28.0=py38h32f6830_0
- docutils=0.16=py38h32f6830_1
- fontconfig=2.13.1=h1056068_1002
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Any pandas or dask dataframe can be used as input and ``dask-sql`` understands a
pages/server
pages/cmd
pages/how_does_it_work
pages/configuration


.. note::
Expand Down
18 changes: 18 additions & 0 deletions docs/pages/configuration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.. _configuration:

Configuration in Dask-SQL
==========================

``dask-sql`` supports a list of configuration options to configure behavior of certain operations.
``dask-sql`` uses `Dask's config <https://docs.dask.org/en/stable/configuration.html>`_
module and configuration options can be specified with YAML files, via environment variables,
or directly, either through the `dask.config.set <https://docs.dask.org/en/stable/configuration.html#dask.config.set>`_ method
or the ``config_options`` argument in the :func:`dask_sql.Context.sql` method.

Configuration Reference
-----------------------

.. dask-config-block::
:location: sql
:config: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/6ddb78a3b3c4ac5051aa17105e576211d0e32f6b/sql.yaml
:schema: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/2d37f64c48c2b6ebdca6634b4c5e3c22a59e1cdf/sql-schema.yaml
Loading

0 comments on commit 0372ebc

Please sign in to comment.