Skip to content

Commit

Permalink
Add aquire_read_lock et. al. methods to ReaderWriterLock
Browse files Browse the repository at this point in the history
  • Loading branch information
psarka committed Sep 18, 2023
1 parent 50f97f4 commit 2535dce
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 61 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## [Unreleased]
- Add `.acquire_read_lock`, `.release_read_lock`, `.acquire_write_lock`, and
`.release_write_lock` methods to the inter thread `ReaderWriterLock` as was
promised in the README.

## [0.18]
- Reshuffle the process lock code and properly document it.
Expand Down
141 changes: 105 additions & 36 deletions fasteners/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self,
threads are not properly identified by threading.current_thread
"""
self._writer = None
self._writer_entries = 0
self._pending_writers = collections.deque()
self._readers = {}
self._cond = condition_cls()
Expand Down Expand Up @@ -99,16 +100,27 @@ def owner(self) -> Optional[str]:
return self.READER
return None

@contextlib.contextmanager
def read_lock(self):
"""Context manager that grants a read lock.
def acquire_read_lock(self):
"""Acquire a read lock.
Will wait until no active or pending writers.
Raises:
RuntimeError: if a pending writer tries to acquire a read lock.
"""
me = self._current_thread()
self._acquire_read_lock(me)

def release_read_lock(self):
"""Release a read lock.
Raises:
RuntimeError: if the current thread does not own a read lock.
"""
me = self._current_thread()
self._release_read_lock(me)

def _acquire_read_lock(self, me):
if me in self._pending_writers:
raise RuntimeError("Writer %s can not acquire a read lock"
" while waiting for the write lock"
Expand All @@ -128,23 +140,91 @@ def read_lock(self):
break
# An active or pending writer; guess we have to wait.
self._cond.wait()

def _release_read_lock(self, me, raise_on_not_owned=True):
# I am no longer a reader, remove *one* occurrence of myself.
# If the current thread acquired two read locks, then it will
# still have to remove that other read lock; this allows for
# basic reentrancy to be possible.
with self._cond:
try:
me_instances = self._readers[me]
if me_instances > 1:
self._readers[me] = me_instances - 1
else:
self._readers.pop(me)
except KeyError:
if raise_on_not_owned:
raise RuntimeError(f"Thread {me} does not own a read lock")
self._cond.notify_all()

@contextlib.contextmanager
def read_lock(self):
"""Context manager that grants a read lock.
Will wait until no active or pending writers.
Raises:
RuntimeError: if a pending writer tries to acquire a read lock.
"""
me = self._current_thread()
self._acquire_read_lock(me)
try:
yield self
finally:
# I am no longer a reader, remove *one* occurrence of myself.
# If the current thread acquired two read locks, then it will
# still have to remove that other read lock; this allows for
# basic reentrancy to be possible.
with self._cond:
try:
me_instances = self._readers[me]
if me_instances > 1:
self._readers[me] = me_instances - 1
else:
self._readers.pop(me)
except KeyError:
pass
self._cond.notify_all()
self._release_read_lock(me, raise_on_not_owned=False)

def _acquire_write_lock(self, me):
if self.is_reader():
raise RuntimeError("Reader %s to writer privilege"
" escalation not allowed" % me)

with self._cond:
self._pending_writers.append(me)
while True:
# No readers, and no active writer, am I next??
if len(self._readers) == 0 and self._writer is None:
if self._pending_writers[0] == me:
self._writer = self._pending_writers.popleft()
self._writer_entries = 1
break
self._cond.wait()

def _release_write_lock(self, me, raise_on_not_owned=True):
with self._cond:
self._writer = None
self._writer_entries = 0
self._cond.notify_all()

def acquire_write_lock(self):
"""Acquire a write lock.
Will wait until no active readers. Blocks readers after acquiring.
Guaranteed for locks to be processed in fair order (FIFO).
Raises:
RuntimeError: if an active reader attempts to acquire a lock.
"""
me = self._current_thread()
if self._writer == me:
self._writer_entries += 1
else:
self._acquire_write_lock(me)

def release_write_lock(self):
"""Release a write lock.
Raises:
RuntimeError: if the current thread does not own a write lock.
"""
me = self._current_thread()
if self._writer == me:
self._writer_entries -= 1
if self._writer_entries == 0:
self._release_write_lock(me)
else:
raise RuntimeError(f"Thread {me} does not own a write lock")

@contextlib.contextmanager
def write_lock(self):
Expand All @@ -158,29 +238,18 @@ def write_lock(self):
RuntimeError: if an active reader attempts to acquire a lock.
"""
me = self._current_thread()
i_am_writer = self.is_writer(check_pending=False)
if self.is_reader() and not i_am_writer:
raise RuntimeError("Reader %s to writer privilege"
" escalation not allowed" % me)
if i_am_writer:
# Already the writer; this allows for basic reentrancy.
yield self
if self.is_writer(check_pending=False):
self._writer_entries += 1
try:
yield self
finally:
self._writer_entries -= 1
else:
with self._cond:
self._pending_writers.append(me)
while True:
# No readers, and no active writer, am I next??
if len(self._readers) == 0 and self._writer is None:
if self._pending_writers[0] == me:
self._writer = self._pending_writers.popleft()
break
self._cond.wait()
self._acquire_write_lock(me)
try:
yield self
finally:
with self._cond:
self._writer = None
self._cond.notify_all()
self._release_write_lock(me)


def locked(*args, **kwargs):
Expand Down
Loading

0 comments on commit 2535dce

Please sign in to comment.