diff --git a/MANIFEST.in b/MANIFEST.in index 8597b2c71..2b6351550 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,4 @@ +recursive-include dask_sql *.yaml + include versioneer.py include dask_sql/_version.py diff --git a/dask_sql/__init__.py b/dask_sql/__init__.py index 02b99eb21..d343a4c5c 100644 --- a/dask_sql/__init__.py +++ b/dask_sql/__init__.py @@ -1,3 +1,4 @@ +from . import config from ._version import get_versions from .cmd import cmd_loop from .context import Context diff --git a/dask_sql/config.py b/dask_sql/config.py new file mode 100644 index 000000000..e653114c1 --- /dev/null +++ b/dask_sql/config.py @@ -0,0 +1,12 @@ +import os + +import dask +import yaml + +fn = os.path.join(os.path.dirname(__file__), "sql.yaml") + +with open(fn) as f: + defaults = yaml.safe_load(f) + +dask.config.update_defaults(defaults) +dask.config.ensure_file(source=fn, comment=True) diff --git a/dask_sql/context.py b/dask_sql/context.py index 0b6b3c5c8..008ce76c1 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -6,6 +6,7 @@ import dask.dataframe as dd import pandas as pd +from dask import config as dask_config from dask.base import optimize from dask.distributed import Client @@ -421,6 +422,7 @@ def sql( return_futures: bool = True, dataframes: Dict[str, Union[dd.DataFrame, pd.DataFrame]] = None, gpu: bool = False, + config_options: Dict[str, Any] = None, ) -> Union[dd.DataFrame, pd.DataFrame]: """ Query the registered tables with the given SQL. @@ -448,36 +450,39 @@ def sql( to register before executing this query gpu (:obj:`bool`): Whether or not to load the additional Dask or pandas dataframes (if any) on GPU; requires cuDF / dask-cuDF if enabled. Defaults to False. + config_options (:obj:`Dict[str,Any]`): Specific configuration options to pass during + query execution Returns: :obj:`dask.dataframe.DataFrame`: the created data frame of this query. """ - if dataframes is not None: - for df_name, df in dataframes.items(): - self.create_table(df_name, df, gpu=gpu) + with dask_config.set(config_options): + if dataframes is not None: + for df_name, df in dataframes.items(): + self.create_table(df_name, df, gpu=gpu) - rel, select_names, _ = self._get_ral(sql) + rel, select_names, _ = self._get_ral(sql) - dc = RelConverter.convert(rel, context=self) + dc = RelConverter.convert(rel, context=self) - if dc is None: - return + if dc is None: + return - if select_names: - # Rename any columns named EXPR$* to a more human readable name - cc = dc.column_container - cc = cc.rename( - { - df_col: select_name - for df_col, select_name in zip(cc.columns, select_names) - } - ) - dc = DataContainer(dc.df, cc) + if select_names: + # Rename any columns named EXPR$* to a more human readable name + cc = dc.column_container + cc = cc.rename( + { + df_col: select_name + for df_col, select_name in zip(cc.columns, select_names) + } + ) + dc = DataContainer(dc.df, cc) - df = dc.assign() - if not return_futures: - df = df.compute() + df = dc.assign() + if not return_futures: + df = df.compute() return df @@ -588,71 +593,6 @@ def register_model( schema_name = schema_name or self.schema_name self.schema[schema_name].models[model_name.lower()] = (model, training_columns) - def set_config( - self, - config_options: Union[Tuple[str, Any], Dict[str, Any]], - schema_name: str = None, - ): - """ - Add configuration options to a schema. - A configuration option could be used to set the behavior of certain configurirable operations. - - Eg: `dask.groupby.agg.split_out` can be used to split the output of a groupby agrregation to multiple partitions. - - Args: - config_options (:obj:`Tuple[str,val]` or :obj:`Dict[str,val]`): config_option and value to set - schema_name (:obj:`str`): Optionally select schema for setting configs - - Example: - .. code-block:: python - - from dask_sql import Context - - c = Context() - c.set_config(("dask.groupby.aggregate.split_out", 1)) - c.set_config( - { - "dask.groupby.aggregate.split_out": 2, - "dask.groupby.aggregate.split_every": 4, - } - ) - - """ - schema_name = schema_name or self.schema_name - self.schema[schema_name].config.set_config(config_options) - - def drop_config( - self, config_strs: Union[str, List[str]], schema_name: str = None, - ): - """ - Drop user set configuration options from schema - - Args: - config_strs (:obj:`str` or :obj:`List[str]`): config key or keys to drop - schema_name (:obj:`str`): Optionally select schema for dropping configs - - Example: - .. code-block:: python - - from dask_sql import Context - - c = Context() - c.set_config( - { - "dask.groupby.aggregate.split_out": 2, - "dask.groupby.aggregate.split_every": 4, - } - ) - c.drop_config( - [ - "dask.groupby.aggregate.split_out", - "dask.groupby.aggregate.split_every", - ] - ) - """ - schema_name = schema_name or self.schema_name - self.schema[schema_name].config.drop_config(config_strs) - def ipython_magic(self, auto_include=False): # pragma: no cover """ Register a new ipython/jupyter magic function "sql" @@ -730,7 +670,7 @@ def run_server( def stop_server(self): # pragma: no cover """ - Stop a SQL server started by ``run_server`. + Stop a SQL server started by ``run_server``. """ if self.sql_server is not None: loop = asyncio.get_event_loop() @@ -848,11 +788,7 @@ def _get_ral(self, sql): ) # True if the SQL query should be case sensitive and False otherwise - case_sensitive = ( - self.schema[self.schema_name] - .config.get_config_by_prefix("dask.sql.identifier.case.sensitive") - .get("dask.sql.identifier.case.sensitive", True) - ) + case_sensitive = dask_config.get("sql.identifier.case_sensitive", default=True) generator_builder = RelationalAlgebraGeneratorBuilder( self.schema_name, case_sensitive, java.util.ArrayList() diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index 10956ec1d..db77c9dfc 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -255,88 +255,3 @@ def __init__(self, name: str): self.models: Dict[str, Tuple[Any, List[str]]] = {} self.functions: Dict[str, UDF] = {} self.function_lists: List[FunctionDescription] = [] - self.config: ConfigContainer = ConfigContainer() - - -class ConfigContainer: - """ - Helper class that contains configuration options required for specific operations - Configurations are stored in a dictionary where keys strings are delimited by `.` - for easier nested access of multiple configurations - Example: - Dask groupby aggregate operations can be configured via with the `split_out` option - to determine number of output partitions or the `split_every` option to determine - the number of partitions used during the groupby tree reduction step. - """ - - def __init__(self): - self.config_dict = { - # Do not set defaults here unless needed - # This mantains the list of configuration options supported that can be set - # "dask.groupby.aggregate.split_out": 1, - # "dask.groupby.aggregate.split_every": None, - } - - def set_config(self, config_options: Union[Tuple[str, Any], Dict[str, Any]]): - """ - Accepts either a tuple of (config, val) or a dictionary containing multiple - {config1: val1, config2: val2} pairs and updates the schema config with these values - """ - if isinstance(config_options, tuple): - config_options = [config_options] - self.config_dict.update(config_options) - - def drop_config(self, config_strs: Union[str, List[str]]): - if isinstance(config_strs, str): - config_strs = [config_strs] - for config_key in config_strs: - self.config_dict.pop(config_key) - - def get_config_by_prefix(self, config_prefix: str): - """ - Returns all configuration options matching the prefix in `config_prefix` - - Example: - .. code-block:: python - - from dask_sql.datacontainer import ConfigContainer - - sql_config = ConfigContainer() - sql_config.set_config( - { - "dask.groupby.aggregate.split_out":1, - "dask.groupby.aggregate.split_every": 1, - "dask.sort.persist": True, - } - ) - - sql_config.get_config_by_prefix("dask.groupby") - # Returns { - # "dask.groupby.aggregate.split_out": 1, - # "dask.groupby.aggregate.split_every": 1 - # } - - sql_config.get_config_by_prefix("dask") - # Returns { - # "dask.groupby.aggregate.split_out": 1, - # "dask.groupby.aggregate.split_every": 1, - # "dask.sort.persist": True - # } - - sql_config.get_config_by_prefix("dask.sort") - # Returns {"dask.sort.persist": True} - - sql_config.get_config_by_prefix("missing.key") - sql_config.get_config_by_prefix(None) - # Both return {} - - """ - return ( - { - key: val - for key, val in self.config_dict.items() - if key.startswith(config_prefix) - } - if config_prefix - else {} - ) diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index d71458eed..c8cfab62c 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -6,6 +6,7 @@ import dask.dataframe as dd import pandas as pd +from dask import config as dask_config try: import dask_cudf @@ -231,15 +232,8 @@ def _do_aggregations( for col in group_columns: collected_aggregations[None].append((col, col, "first")) - groupby_agg_options = context.schema[ - context.schema_name - ].config.get_config_by_prefix("dask.groupby.aggregate") - # Update the config string to only include the actual param value - # i.e. dask.groupby.aggregate.split_out -> split_out - for config_key in list(groupby_agg_options.keys()): - groupby_agg_options[ - config_key.rpartition(".")[2] - ] = groupby_agg_options.pop(config_key) + groupby_agg_options = dask_config.get("sql.groupby") + # Now we can go ahead and use these grouped aggregations # to perform the actual aggregation # It is very important to start with the non-filtered entry. diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml new file mode 100644 index 000000000..06c766854 --- /dev/null +++ b/dask_sql/sql-schema.yaml @@ -0,0 +1,28 @@ +properties: + + sql: + type: object + properties: + + groupby: + type: object + properties: + + split_out: + type: integer + description: | + Number of output partitions for a groupby operation + + split_every: + type: [integer, "null"] + description: | + Number of branches per reduction step for a groupby operation. + + identifier: + type: object + properties: + + case_sensitive: + type: boolean + description: | + Whether sql identifiers are considered case sensitive while parsing. diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml new file mode 100644 index 000000000..1976e72c3 --- /dev/null +++ b/dask_sql/sql.yaml @@ -0,0 +1,7 @@ +sql: + groupby: + split_out: 1 + split_every: null + + identifier: + case_sensitive: True diff --git a/docs/conf.py b/docs/conf.py index 42211bd9f..cf0a57e98 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -29,7 +29,11 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon"] +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "dask_sphinx_theme.ext.dask_config_sphinx_ext", +] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/environment.yaml b/docs/environment.yaml index 73603a226..3f957b058 100644 --- a/docs/environment.yaml +++ b/docs/environment.yaml @@ -23,7 +23,7 @@ dependencies: - cytoolz=0.11.0=py38h1e0a361_0 - dask=2.28.0=py_0 - dask-core=2.28.0=py_0 - - dask-sphinx-theme=1.3.2=pyh9f0ad1d_0 + - dask-sphinx-theme>=2.0.3 - distributed=2.28.0=py38h32f6830_0 - docutils=0.16=py38h32f6830_1 - fontconfig=2.13.1=h1056068_1002 diff --git a/docs/index.rst b/docs/index.rst index f9fbeeba1..a57448fff 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -70,6 +70,7 @@ Any pandas or dask dataframe can be used as input and ``dask-sql`` understands a pages/server pages/cmd pages/how_does_it_work + pages/configuration .. note:: diff --git a/docs/pages/configuration.rst b/docs/pages/configuration.rst new file mode 100644 index 000000000..2716335b0 --- /dev/null +++ b/docs/pages/configuration.rst @@ -0,0 +1,18 @@ +.. _configuration: + +Configuration in Dask-SQL +========================== + +``dask-sql`` supports a list of configuration options to configure behavior of certain operations. +``dask-sql`` uses `Dask's config `_ +module and configuration options can be specified with YAML files, via environment variables, +or directly, either through the `dask.config.set `_ method +or the ``config_options`` argument in the :func:`dask_sql.Context.sql` method. + +Configuration Reference +----------------------- + +.. dask-config-block:: + :location: sql + :config: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/6ddb78a3b3c4ac5051aa17105e576211d0e32f6b/sql.yaml + :schema: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/2d37f64c48c2b6ebdca6634b4c5e3c22a59e1cdf/sql-schema.yaml diff --git a/setup.py b/setup.py index 6096eeac6..2d04f3320 100755 --- a/setup.py +++ b/setup.py @@ -83,7 +83,7 @@ def build(self): long_description=long_description, long_description_content_type="text/markdown", packages=find_packages(include=["dask_sql", "dask_sql.*"]), - package_data={"dask_sql": ["jar/DaskSQL.jar"]}, + package_data={"dask_sql": ["jar/DaskSQL.jar", "sql*.yaml"]}, python_requires=">=3.8", setup_requires=sphinx_requirements, install_requires=[ diff --git a/tests/integration/test_compatibility.py b/tests/integration/test_compatibility.py index d37796838..e087460df 100644 --- a/tests/integration/test_compatibility.py +++ b/tests/integration/test_compatibility.py @@ -939,12 +939,14 @@ def test_integration_1(): def test_query_case_sensitivity(): c = Context() - c.set_config(("dask.sql.identifier.case.sensitive", False)) df = pd.DataFrame({"id": [0, 1]}) c.create_table("test", df) try: - c.sql("select ID from test") + c.sql( + "select ID from test", + config_options={"sql.identifier.case_sensitive": False}, + ) except ParsingException as pe: assert False, f"Queries should be case insensitve but raised exception {pe}" diff --git a/tests/integration/test_groupby.py b/tests/integration/test_groupby.py index 1281ff027..e6eba9060 100644 --- a/tests/integration/test_groupby.py +++ b/tests/integration/test_groupby.py @@ -356,14 +356,14 @@ def test_stats_aggregation(c, timeseries_df): @pytest.mark.parametrize("split_out", [None, 2, 4]) def test_groupby_split_out(c, input_table, split_out, request): user_table = request.getfixturevalue(input_table) - c.set_config(("dask.groupby.aggregate.split_out", split_out)) df = c.sql( f""" SELECT user_id, SUM(b) AS "S" FROM {input_table} GROUP BY user_id - """ + """, + config_options={"sql.groupby.split_out": split_out}, ) expected_df = ( user_table.groupby(by="user_id").agg({"b": "sum"}).reset_index(drop=False) @@ -372,7 +372,6 @@ def test_groupby_split_out(c, input_table, split_out, request): expected_df = expected_df.sort_values("user_id") assert df.npartitions == split_out if split_out else 1 dd.assert_eq(df.compute().sort_values("user_id"), expected_df, check_index=False) - c.drop_config("dask.groupby.aggregate.split_out") @pytest.mark.parametrize( @@ -394,7 +393,6 @@ def test_groupby_split_every(c, gpu, split_every, expected_keys): ) # Need an input with multiple partitions to demonstrate split_every c.create_table("split_every_input", input_ddf) - c.set_config(("dask.groupby.aggregate.split_every", split_every)) df = c.sql( """ @@ -402,7 +400,8 @@ def test_groupby_split_every(c, gpu, split_every, expected_keys): user_id, SUM(b) AS "S" FROM split_every_input GROUP BY user_id - """ + """, + config_options={"sql.groupby.split_every": split_every}, ) expected_df = ( input_ddf.groupby(by="user_id") @@ -415,5 +414,4 @@ def test_groupby_split_every(c, gpu, split_every, expected_keys): assert len(df.dask.keys()) == expected_keys dd.assert_eq(df, expected_df, check_index=False) - c.drop_config("dask.groupby.aggregate.split_every") c.drop_table("split_every_input") diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 000000000..fc131894c --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,98 @@ +import os +from unittest import mock + +import pytest +import yaml +from dask import config as dask_config + +# Required to instantiate default sql config +import dask_sql # noqa: F401 + + +def test_custom_yaml(tmpdir): + custom_config = {} + custom_config["sql"] = dask_config.get("sql") + custom_config["sql"]["groupby"]["split_out"] = 16 + custom_config["sql"]["foo"] = {"bar": [1, 2, 3], "baz": None} + + with open(os.path.join(tmpdir, "custom-sql.yaml"), mode="w") as f: + yaml.dump(custom_config, f) + + dask_config.refresh( + paths=[tmpdir] + ) # Refresh config to read from updated environment + assert custom_config["sql"] == dask_config.get("sql") + dask_config.refresh() + + +def test_env_variable(): + with mock.patch.dict("os.environ", {"DASK_SQL__GROUPBY__SPLIT_OUT": "200"}): + dask_config.refresh() + assert dask_config.get("sql.groupby.split-out") == 200 + dask_config.refresh() + + +def test_default_config(): + config_fn = os.path.join(os.path.dirname(__file__), "../../dask_sql", "sql.yaml") + with open(config_fn) as f: + default_config = yaml.safe_load(f) + assert "sql" in default_config + assert default_config["sql"] == dask_config.get("sql") + + +def test_schema(): + jsonschema = pytest.importorskip("jsonschema") + + config_fn = os.path.join(os.path.dirname(__file__), "../../dask_sql", "sql.yaml") + schema_fn = os.path.join( + os.path.dirname(__file__), "../../dask_sql", "sql-schema.yaml" + ) + + with open(config_fn) as f: + config = yaml.safe_load(f) + + with open(schema_fn) as f: + schema = yaml.safe_load(f) + + jsonschema.validate(config, schema) + + +def test_schema_is_complete(): + config_fn = os.path.join(os.path.dirname(__file__), "../../dask_sql", "sql.yaml") + schema_fn = os.path.join( + os.path.dirname(__file__), "../../dask_sql", "sql-schema.yaml" + ) + + with open(config_fn) as f: + config = yaml.safe_load(f) + + with open(schema_fn) as f: + schema = yaml.safe_load(f) + + def test_matches(c, s): + for k, v in c.items(): + if list(c) != list(s["properties"]): + raise ValueError( + "\nThe sql.yaml and sql-schema.yaml files are not in sync.\n" + "This usually happens when we add a new configuration value,\n" + "but don't add the schema of that value to the dask-schema.yaml file\n" + "Please modify these files to include the missing values: \n\n" + " sql.yaml: {}\n" + " sql-schema.yaml: {}\n\n" + "Examples in these files should be a good start, \n" + "even if you are not familiar with the jsonschema spec".format( + sorted(c), sorted(s["properties"]) + ) + ) + if isinstance(v, dict): + test_matches(c[k], s["properties"][k]) + + test_matches(config, schema) + + +def test_dask_setconfig(): + dask_config.set({"sql.foo.bar": 1}) + with dask_config.set({"sql.foo.baz": "2"}): + assert dask_config.get("sql.foo") == {"bar": 1, "baz": "2"} + assert dask_config.get("sql.foo") == {"bar": 1} + dask_config.refresh()