Skip to content

Commit

Permalink
Remove SerializebleLock in to_snowflake (#39)
Browse files Browse the repository at this point in the history
Co-authored-by: James Bourbeau <jrbourbeau@gmail.com>
  • Loading branch information
phobson and jrbourbeau authored Jul 17, 2023
1 parent fbfa369 commit e251a70
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ it out!

`dask-snowflake` can be installed with `pip`:

```
```shell
pip install dask-snowflake
```

or with `conda`:

```
```shell
conda install -c conda-forge dask-snowflake
```

Expand Down
22 changes: 9 additions & 13 deletions dask_snowflake/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dask.delayed import delayed
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from dask.utils import SerializableLock, parse_bytes
from dask.utils import parse_bytes


@delayed
Expand All @@ -32,18 +32,14 @@ def write_snowflake(
**connection_kwargs,
}
with snowflake.connector.connect(**connection_kwargs) as conn:
# NOTE: Use a process-wide lock to avoid a `boto` multithreading issue
# https://github.com/snowflakedb/snowflake-connector-python/issues/156
with SerializableLock(token="write_snowflake"):
write_pandas(
conn=conn,
df=df,
schema=connection_kwargs.get("schema", None),
# NOTE: since ensure_db_exists uses uppercase for the table name
table_name=name.upper(),
parallel=1,
quote_identifiers=False,
)
write_pandas(
conn=conn,
df=df,
schema=connection_kwargs.get("schema", None),
# NOTE: since ensure_db_exists uses uppercase for the table name
table_name=name.upper(),
quote_identifiers=False,
)


@delayed
Expand Down
8 changes: 3 additions & 5 deletions dask_snowflake/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,20 @@ def test_to_snowflake_compute_false(table, connection_kwargs, client):


def test_arrow_options(table, connection_kwargs, client):
# We use a single partition Dask DataFrame to ensure the
# categories used below are always in the same order.
to_snowflake(ddf.repartition(1), name=table, connection_kwargs=connection_kwargs)
to_snowflake(ddf, name=table, connection_kwargs=connection_kwargs)

query = f"SELECT * FROM {table}"
df_out = read_snowflake(
query,
connection_kwargs=connection_kwargs,
arrow_options={"categories": ["A"]},
arrow_options={"types_mapper": lambda x: pd.Float32Dtype()},
npartitions=2,
)
# FIXME: Why does read_snowflake return lower-case columns names?
df_out.columns = df_out.columns.str.upper()
# FIXME: We need to sort the DataFrame because paritions are written
# in a non-sequential order.
expected = df.astype({"A": "category"})
expected = df.astype(pd.Float32Dtype())
dd.utils.assert_eq(
expected, df_out.sort_values(by="A").reset_index(drop=True), check_dtype=False
)
Expand Down

0 comments on commit e251a70

Please sign in to comment.