From f20e502a662f3b5b6ce6af60df05c827ae83ac43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 22 Dec 2023 09:05:46 +0100 Subject: [PATCH 1/6] fork SerializableLock from dask, adapt tests --- xarray/backends/dask_lock.py | 105 ++++++++++++++++++++++++++++++++++ xarray/backends/locks.py | 7 +-- xarray/tests/test_backends.py | 2 - 3 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 xarray/backends/dask_lock.py diff --git a/xarray/backends/dask_lock.py b/xarray/backends/dask_lock.py new file mode 100644 index 00000000000..0950eb7fd1a --- /dev/null +++ b/xarray/backends/dask_lock.py @@ -0,0 +1,105 @@ +# The code in this module is taken from dask. For reference, +# here is a copy of the dask copyright notice: + +# BSD 3-Clause License +# +# Copyright (c) 2014, Anaconda, Inc. and contributors +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import uuid +from collections.abc import Hashable +from threading import Lock +from typing import ClassVar +from weakref import WeakValueDictionary + + +class SerializableLock: + """A Serializable per-process Lock + + This wraps a normal ``threading.Lock`` object and satisfies the same + interface. However, this lock can also be serialized and sent to different + processes. It will not block concurrent operations between processes (for + this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file`` + but will consistently deserialize into the same lock. + + So if we make a lock in one process:: + + lock = SerializableLock() + + And then send it over to another process multiple times:: + + bytes = pickle.dumps(lock) + a = pickle.loads(bytes) + b = pickle.loads(bytes) + + Then the deserialized objects will operate as though they were the same + lock, and collide as appropriate. + + This is useful for consistently protecting resources on a per-process + level. + + The creation of locks is itself not threadsafe. + """ + + _locks: ClassVar[WeakValueDictionary[Hashable, Lock]] = WeakValueDictionary() + token: Hashable + lock: Lock + + def __init__(self, token: Hashable | None = None): + self.token = token or str(uuid.uuid4()) + if self.token in SerializableLock._locks: + self.lock = SerializableLock._locks[self.token] + else: + self.lock = Lock() + SerializableLock._locks[self.token] = self.lock + + def acquire(self, *args, **kwargs): + return self.lock.acquire(*args, **kwargs) + + def release(self, *args, **kwargs): + return self.lock.release(*args, **kwargs) + + def __enter__(self): + self.lock.__enter__() + + def __exit__(self, *args): + self.lock.__exit__(*args) + + def locked(self): + return self.lock.locked() + + def __getstate__(self): + return self.token + + def __setstate__(self, token): + self.__init__(token) + + def __str__(self): + return f"<{self.__class__.__name__}: {self.token}>" + + __repr__ = __str__ diff --git a/xarray/backends/locks.py b/xarray/backends/locks.py index bba12a29609..358dc91ffb2 100644 --- a/xarray/backends/locks.py +++ b/xarray/backends/locks.py @@ -6,12 +6,7 @@ from collections.abc import MutableMapping from typing import Any -try: - from dask.utils import SerializableLock -except ImportError: - # no need to worry about serializing the lock - SerializableLock = threading.Lock # type: ignore - +from xarray.backends.dask_lock import SerializableLock # Locks used by multiple backends. # Neither HDF5 nor the netCDF-C library are thread-safe. diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index a8722d59659..7ab4febaa5c 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -432,8 +432,6 @@ def test_dataset_compute(self) -> None: assert_identical(expected, computed) def test_pickle(self) -> None: - if not has_dask: - pytest.xfail("pickling requires dask for SerializableLock") expected = Dataset({"foo": ("x", [42])}) with self.roundtrip(expected, allow_cleanup_failure=ON_WINDOWS) as roundtripped: with roundtripped: From 49332670f2f5a4e1049c0ff12d8e83e10a4f89c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 22 Dec 2023 09:30:29 +0100 Subject: [PATCH 2/6] from __future__ import annotations --- xarray/backends/dask_lock.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xarray/backends/dask_lock.py b/xarray/backends/dask_lock.py index 0950eb7fd1a..1541a789648 100644 --- a/xarray/backends/dask_lock.py +++ b/xarray/backends/dask_lock.py @@ -31,6 +31,8 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import annotations + import uuid from collections.abc import Hashable from threading import Lock From fe5b4a99bfdd8ab9c9f55a1e2fce1eb6e94f0f89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 22 Dec 2023 09:51:19 +0100 Subject: [PATCH 3/6] use new location of SerializableLock --- xarray/tests/test_distributed.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index bfc37121597..af801966616 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -27,6 +27,7 @@ ) import xarray as xr +from xarray.backends.dask_lock import SerializableLock from xarray.backends.locks import HDF5_LOCK, CombinedLock from xarray.tests import ( assert_allclose, @@ -273,7 +274,7 @@ async def test_async(c, s, a, b) -> None: def test_hdf5_lock() -> None: - assert isinstance(HDF5_LOCK, dask.utils.SerializableLock) + assert isinstance(HDF5_LOCK, SerializableLock) @gen_cluster(client=True) From 4f6e83169bb571b2744e0bfa6f1629f5e199f44c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 3 Jan 2024 12:56:24 +0100 Subject: [PATCH 4/6] move dask code into xarray/backends/locks.py --- xarray/backends/dask_lock.py | 107 ------------------------------- xarray/backends/locks.py | 79 ++++++++++++++++++++++- xarray/tests/test_distributed.py | 3 +- 3 files changed, 77 insertions(+), 112 deletions(-) delete mode 100644 xarray/backends/dask_lock.py diff --git a/xarray/backends/dask_lock.py b/xarray/backends/dask_lock.py deleted file mode 100644 index 1541a789648..00000000000 --- a/xarray/backends/dask_lock.py +++ /dev/null @@ -1,107 +0,0 @@ -# The code in this module is taken from dask. For reference, -# here is a copy of the dask copyright notice: - -# BSD 3-Clause License -# -# Copyright (c) 2014, Anaconda, Inc. and contributors -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from __future__ import annotations - -import uuid -from collections.abc import Hashable -from threading import Lock -from typing import ClassVar -from weakref import WeakValueDictionary - - -class SerializableLock: - """A Serializable per-process Lock - - This wraps a normal ``threading.Lock`` object and satisfies the same - interface. However, this lock can also be serialized and sent to different - processes. It will not block concurrent operations between processes (for - this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file`` - but will consistently deserialize into the same lock. - - So if we make a lock in one process:: - - lock = SerializableLock() - - And then send it over to another process multiple times:: - - bytes = pickle.dumps(lock) - a = pickle.loads(bytes) - b = pickle.loads(bytes) - - Then the deserialized objects will operate as though they were the same - lock, and collide as appropriate. - - This is useful for consistently protecting resources on a per-process - level. - - The creation of locks is itself not threadsafe. - """ - - _locks: ClassVar[WeakValueDictionary[Hashable, Lock]] = WeakValueDictionary() - token: Hashable - lock: Lock - - def __init__(self, token: Hashable | None = None): - self.token = token or str(uuid.uuid4()) - if self.token in SerializableLock._locks: - self.lock = SerializableLock._locks[self.token] - else: - self.lock = Lock() - SerializableLock._locks[self.token] = self.lock - - def acquire(self, *args, **kwargs): - return self.lock.acquire(*args, **kwargs) - - def release(self, *args, **kwargs): - return self.lock.release(*args, **kwargs) - - def __enter__(self): - self.lock.__enter__() - - def __exit__(self, *args): - self.lock.__exit__(*args) - - def locked(self): - return self.lock.locked() - - def __getstate__(self): - return self.token - - def __setstate__(self, token): - self.__init__(token) - - def __str__(self): - return f"<{self.__class__.__name__}: {self.token}>" - - __repr__ = __str__ diff --git a/xarray/backends/locks.py b/xarray/backends/locks.py index 358dc91ffb2..045ee522fa8 100644 --- a/xarray/backends/locks.py +++ b/xarray/backends/locks.py @@ -2,11 +2,84 @@ import multiprocessing import threading +import uuid import weakref -from collections.abc import MutableMapping -from typing import Any +from collections.abc import Hashable, MutableMapping +from typing import Any, ClassVar +from weakref import WeakValueDictionary + + +# SerializableLock is adapted from Dask: +# https://github.com/dask/dask/blob/74e898f0ec712e8317ba86cc3b9d18b6b9922be0/dask/utils.py#L1160-L1224 +# Used under the terms of Dask's license, see licenses/DASK_LICENSE. +class SerializableLock: + """A Serializable per-process Lock + + This wraps a normal ``threading.Lock`` object and satisfies the same + interface. However, this lock can also be serialized and sent to different + processes. It will not block concurrent operations between processes (for + this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file`` + but will consistently deserialize into the same lock. + + So if we make a lock in one process:: + + lock = SerializableLock() + + And then send it over to another process multiple times:: + + bytes = pickle.dumps(lock) + a = pickle.loads(bytes) + b = pickle.loads(bytes) + + Then the deserialized objects will operate as though they were the same + lock, and collide as appropriate. + + This is useful for consistently protecting resources on a per-process + level. + + The creation of locks is itself not threadsafe. + """ + + _locks: ClassVar[ + WeakValueDictionary[Hashable, threading.Lock] + ] = WeakValueDictionary() + token: Hashable + lock: threading.Lock + + def __init__(self, token: Hashable | None = None): + self.token = token or str(uuid.uuid4()) + if self.token in SerializableLock._locks: + self.lock = SerializableLock._locks[self.token] + else: + self.lock = threading.Lock() + SerializableLock._locks[self.token] = self.lock + + def acquire(self, *args, **kwargs): + return self.lock.acquire(*args, **kwargs) + + def release(self, *args, **kwargs): + return self.lock.release(*args, **kwargs) + + def __enter__(self): + self.lock.__enter__() + + def __exit__(self, *args): + self.lock.__exit__(*args) + + def locked(self): + return self.lock.locked() + + def __getstate__(self): + return self.token + + def __setstate__(self, token): + self.__init__(token) + + def __str__(self): + return f"<{self.__class__.__name__}: {self.token}>" + + __repr__ = __str__ -from xarray.backends.dask_lock import SerializableLock # Locks used by multiple backends. # Neither HDF5 nor the netCDF-C library are thread-safe. diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index af801966616..aa53bcf329b 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -27,8 +27,7 @@ ) import xarray as xr -from xarray.backends.dask_lock import SerializableLock -from xarray.backends.locks import HDF5_LOCK, CombinedLock +from xarray.backends.locks import HDF5_LOCK, CombinedLock, SerializableLock from xarray.tests import ( assert_allclose, assert_identical, From 871f97cc702b5fdd6f77e4af319d1af0c1e7947f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 3 Jan 2024 13:04:42 +0100 Subject: [PATCH 5/6] add whats-new.rst entry --- doc/whats-new.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 3a6c15d1704..540e75d9967 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -44,6 +44,8 @@ Bug fixes - Reverse index output of bottleneck's rolling move_argmax/move_argmin functions (:issue:`8541`, :pull:`8552`). By `Kai Mühlbauer `_. +- Fork `SerializableLock` from dask and use as default lock for netcdf4 backends (:issue:`8442`, :pull:`8571`). + By `Kai Mühlbauer `_. Documentation From 9236ae8eb0fc6c37798f735b89447a5eb67e9959 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 3 Jan 2024 09:28:12 -0700 Subject: [PATCH 6/6] Update doc/whats-new.rst --- doc/whats-new.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 540e75d9967..94b85ea224e 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -44,7 +44,7 @@ Bug fixes - Reverse index output of bottleneck's rolling move_argmax/move_argmin functions (:issue:`8541`, :pull:`8552`). By `Kai Mühlbauer `_. -- Fork `SerializableLock` from dask and use as default lock for netcdf4 backends (:issue:`8442`, :pull:`8571`). +- Vendor `SerializableLock` from dask and use as default lock for netcdf4 backends (:issue:`8442`, :pull:`8571`). By `Kai Mühlbauer `_.