From e251a705c99bb6e2eaa77037794ac7d47ff6e818 Mon Sep 17 00:00:00 2001 From: Paul Hobson Date: Mon, 17 Jul 2023 15:08:03 -0700 Subject: [PATCH] Remove `SerializebleLock` in `to_snowflake` (#39) Co-authored-by: James Bourbeau --- README.md | 4 ++-- dask_snowflake/core.py | 22 +++++++++------------- dask_snowflake/tests/test_core.py | 8 +++----- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index eea7b17..b3f6c12 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,13 @@ it out! `dask-snowflake` can be installed with `pip`: -``` +```shell pip install dask-snowflake ``` or with `conda`: -``` +```shell conda install -c conda-forge dask-snowflake ``` diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index bf794c5..9e8ee3e 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -18,7 +18,7 @@ from dask.delayed import delayed from dask.highlevelgraph import HighLevelGraph from dask.layers import DataFrameIOLayer -from dask.utils import SerializableLock, parse_bytes +from dask.utils import parse_bytes @delayed @@ -32,18 +32,14 @@ def write_snowflake( **connection_kwargs, } with snowflake.connector.connect(**connection_kwargs) as conn: - # NOTE: Use a process-wide lock to avoid a `boto` multithreading issue - # https://github.com/snowflakedb/snowflake-connector-python/issues/156 - with SerializableLock(token="write_snowflake"): - write_pandas( - conn=conn, - df=df, - schema=connection_kwargs.get("schema", None), - # NOTE: since ensure_db_exists uses uppercase for the table name - table_name=name.upper(), - parallel=1, - quote_identifiers=False, - ) + write_pandas( + conn=conn, + df=df, + schema=connection_kwargs.get("schema", None), + # NOTE: since ensure_db_exists uses uppercase for the table name + table_name=name.upper(), + quote_identifiers=False, + ) @delayed diff --git a/dask_snowflake/tests/test_core.py b/dask_snowflake/tests/test_core.py index d8dfbfd..2993599 100644 --- a/dask_snowflake/tests/test_core.py +++ b/dask_snowflake/tests/test_core.py @@ -103,22 +103,20 @@ def test_to_snowflake_compute_false(table, connection_kwargs, client): def test_arrow_options(table, connection_kwargs, client): - # We use a single partition Dask DataFrame to ensure the - # categories used below are always in the same order. - to_snowflake(ddf.repartition(1), name=table, connection_kwargs=connection_kwargs) + to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs) query = f"SELECT * FROM {table}" df_out = read_snowflake( query, connection_kwargs=connection_kwargs, - arrow_options={"categories": ["A"]}, + arrow_options={"types_mapper": lambda x: pd.Float32Dtype()}, npartitions=2, ) # FIXME: Why does read_snowflake return lower-case columns names? df_out.columns = df_out.columns.str.upper() # FIXME: We need to sort the DataFrame because paritions are written # in a non-sequential order. - expected = df.astype({"A": "category"}) + expected = df.astype(pd.Float32Dtype()) dd.utils.assert_eq( expected, df_out.sort_values(by="A").reset_index(drop=True), check_dtype=False )