Skip to content

Commit

Permalink
Fix executor deadlock on IO errors
Browse files Browse the repository at this point in the history
This change ensures that the executor does not deadlock on IO errors,
for example when IO does not support non-ascii characters.
  • Loading branch information
ento authored Aug 25, 2020
1 parent d2fd581 commit a3ef6dc
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 63 deletions.
118 changes: 55 additions & 63 deletions poetry/installation/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,32 +131,29 @@ def _write(self, operation, line):
return

if self._io.is_debug():
self._lock.acquire()
section = self._sections[id(operation)]
section.write_line(line)
self._lock.release()
with self._lock:
section = self._sections[id(operation)]
section.write_line(line)

return

self._lock.acquire()
section = self._sections[id(operation)]
section.output.clear()
section.write(line)
self._lock.release()
with self._lock:
section = self._sections[id(operation)]
section.output.clear()
section.write(line)

def _execute_operation(self, operation):
try:
if self.supports_fancy_output():
if id(operation) not in self._sections:
if self._should_write_operation(operation):
self._lock.acquire()
self._sections[id(operation)] = self._io.section()
self._sections[id(operation)].write_line(
" <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format(
message=self.get_operation_message(operation),
),
)
self._lock.release()
with self._lock:
self._sections[id(operation)] = self._io.section()
self._sections[id(operation)].write_line(
" <fg=blue;options=bold>•</> {message}: <fg=blue>Pending...</>".format(
message=self.get_operation_message(operation),
),
)
else:
if self._should_write_operation(operation):
if not operation.skipped:
Expand Down Expand Up @@ -190,37 +187,37 @@ def _execute_operation(self, operation):
if result == -2:
raise KeyboardInterrupt
except Exception as e:
from clikit.ui.components.exception_trace import ExceptionTrace

if not self.supports_fancy_output():
io = self._io
else:
message = " <error>•</error> {message}: <error>Failed</error>".format(
message=self.get_operation_message(operation, error=True),
)
self._write(operation, message)
io = self._sections.get(id(operation), self._io)

self._lock.acquire()

trace = ExceptionTrace(e)
trace.render(io)
io.write_line("")
try:
from clikit.ui.components.exception_trace import ExceptionTrace

self._shutdown = True
self._lock.release()
if not self.supports_fancy_output():
io = self._io
else:
message = " <error>•</error> {message}: <error>Failed</error>".format(
message=self.get_operation_message(operation, error=True),
)
self._write(operation, message)
io = self._sections.get(id(operation), self._io)

with self._lock:
trace = ExceptionTrace(e)
trace.render(io)
io.write_line("")
finally:
with self._lock:
self._shutdown = True
except KeyboardInterrupt:
message = " <warning>•</warning> {message}: <warning>Cancelled</warning>".format(
message=self.get_operation_message(operation, warning=True),
)
if not self.supports_fancy_output():
self._io.write_line(message)
else:
self._write(operation, message)

self._lock.acquire()
self._shutdown = True
self._lock.release()
try:
message = " <warning>•</warning> {message}: <warning>Cancelled</warning>".format(
message=self.get_operation_message(operation, warning=True),
)
if not self.supports_fancy_output():
self._io.write_line(message)
else:
self._write(operation, message)
finally:
with self._lock:
self._shutdown = True

def _do_execute_operation(self, operation):
method = operation.job_type
Expand Down Expand Up @@ -266,14 +263,12 @@ def _do_execute_operation(self, operation):
return result

def _increment_operations_count(self, operation, executed):
self._lock.acquire()
if executed:
self._executed_operations += 1
self._executed[operation.job_type] += 1
else:
self._skipped[operation.job_type] += 1

self._lock.release()
with self._lock:
if executed:
self._executed_operations += 1
self._executed[operation.job_type] += 1
else:
self._skipped[operation.job_type] += 1

def run_pip(self, *args, **kwargs): # type: (...) -> int
try:
Expand Down Expand Up @@ -622,9 +617,8 @@ def _download_archive(self, operation, link): # type: (Operation, Link) -> Path
progress.set_format(message + " <b>%percent%%</b>")

if progress:
self._lock.acquire()
progress.start()
self._lock.release()
with self._lock:
progress.start()

done = 0
archive = self._chef.get_cache_directory_for_link(link) / link.filename
Expand All @@ -637,16 +631,14 @@ def _download_archive(self, operation, link): # type: (Operation, Link) -> Path
done += len(chunk)

if progress:
self._lock.acquire()
progress.set_progress(done)
self._lock.release()
with self._lock:
progress.set_progress(done)

f.write(chunk)

if progress:
self._lock.acquire()
progress.finish()
self._lock.release()
with self._lock:
progress.finish()

return archive

Expand Down
26 changes: 26 additions & 0 deletions tests/installation/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,32 @@ def test_execute_should_show_operation_as_cancelled_on_subprocess_keyboard_inter
assert expected == io.fetch_output()


def test_execute_should_gracefully_handle_io_error(config, mocker, io):
env = MockEnv()
executor = Executor(env, pool, config, io)
executor.verbose()

original_write_line = executor._io.write_line

def write_line(string, flags=None):
# Simulate UnicodeEncodeError
string.encode("ascii")
original_write_line(string, flags)

mocker.patch.object(io, "write_line", side_effect=write_line)

assert 1 == executor.execute([Install(Package("clikit", "0.2.3"))])

expected = r"""
Package operations: 1 install, 0 updates, 0 removals
\s*Unicode\w+Error
"""

assert re.match(expected, io.fetch_output())


def test_executor_should_delete_incomplete_downloads(
config, io, tmp_dir, mocker, pool, mock_file_downloads
):
Expand Down

0 comments on commit a3ef6dc

Please sign in to comment.