From a453b25642f01329e2d6ae41f1b95d6420680ca6 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:25:21 +0300 Subject: [PATCH 1/6] Added suport contextvars for loop.run_in_executor --- Lib/asyncio/base_events.py | 15 +++++++++++-- Lib/test/test_asyncio/test_events.py | 33 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index dc0ca3f02b9bf6..74934b52e0b38d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -16,6 +16,8 @@ import collections import collections.abc import concurrent.futures +import contextvars +import functools import heapq import itertools import logging @@ -728,7 +730,7 @@ def call_soon_threadsafe(self, callback, *args, context=None): self._write_to_self() return handle - def run_in_executor(self, executor, func, *args): + def run_in_executor(self, executor, func, *args, context=None): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') @@ -737,8 +739,17 @@ def run_in_executor(self, executor, func, *args): if executor is None: executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor + + if context is None: + context = contextvars.copy_context() + + if args: + fn = functools.partial(func, *args) + else: + fn = func + return futures.wrap_future( - executor.submit(func, *args), loop=self) + executor.submit(context.run, fn), loop=self) def set_default_executor(self, executor): self._default_executor = executor diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 11cd950df1cedb..8f1438d260f364 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2,6 +2,7 @@ import collections.abc import concurrent.futures +import contextvars import functools import io import os @@ -369,6 +370,38 @@ def run(): time.sleep(0.4) self.assertFalse(called) + def test_run_in_executor_no_context(self): + def run(): + return foo.get() + + foo = contextvars.ContextVar('foo') + foo.set('bar') + f = self.loop.run_in_executor(None, run) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'bar') + + def test_run_in_executor_context(self): + def run(): + return foo.get() + + foo = contextvars.ContextVar('foo') + foo.set('bar') + context = contextvars.copy_context() + f = self.loop.run_in_executor(None, run, context=context) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'bar') + + def test_run_in_executor_context_args(self): + def run(arg): + return (arg, foo.get()) + + foo = contextvars.ContextVar('foo') + foo.set('bar') + context = contextvars.copy_context() + f = self.loop.run_in_executor(None, run, 'yo', context=context) + res = self.loop.run_until_complete(f) + self.assertEqual(res, ('yo', 'bar')) + def test_reader_callback(self): r, w = socket.socketpair() r.setblocking(False) From 6fbe93fc04e551ccb0a0b231056394a6d08b2cc8 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:37:25 +0300 Subject: [PATCH 2/6] Added new entry. --- .../NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst diff --git a/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst b/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst new file mode 100644 index 00000000000000..28c0a9c504da4a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-07-01-02-37-05.bpo-34014.RfrJGJ.rst @@ -0,0 +1 @@ +Added support of contextvars for BaseEventLoop.run_in_executor From 20e139d5e69cedd0cbc8a4b481e47a7a2a87ea4d Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 02:47:02 +0300 Subject: [PATCH 3/6] Set contextvars on top of the module. --- Lib/test/test_asyncio/test_events.py | 29 +++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 8f1438d260f364..fbd4bcdfac4711 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -37,6 +37,9 @@ from test.test_asyncio import utils as test_utils from test import support +foo_ctx = contextvars.ContextVar('foo') +foo_ctx.set('bar') + def tearDownModule(): asyncio.set_event_loop_policy(None) @@ -370,22 +373,32 @@ def run(): time.sleep(0.4) self.assertFalse(called) + def test_run_in_executor_hierarchy(self): + def run(): + foo_ctx.set('foo') + res = foo_ctx.get() + self.assertEqual(res, 'foo') + return res + + f = self.loop.run_in_executor(None, run) + res = self.loop.run_until_complete(f) + self.assertEqual(res, 'foo') + + res = foo_ctx.get() + self.assertEqual(res, 'bar') + def test_run_in_executor_no_context(self): def run(): - return foo.get() + return foo_ctx.get() - foo = contextvars.ContextVar('foo') - foo.set('bar') f = self.loop.run_in_executor(None, run) res = self.loop.run_until_complete(f) self.assertEqual(res, 'bar') def test_run_in_executor_context(self): def run(): - return foo.get() + return foo_ctx.get() - foo = contextvars.ContextVar('foo') - foo.set('bar') context = contextvars.copy_context() f = self.loop.run_in_executor(None, run, context=context) res = self.loop.run_until_complete(f) @@ -393,10 +406,8 @@ def run(): def test_run_in_executor_context_args(self): def run(arg): - return (arg, foo.get()) + return (arg, foo_ctx.get()) - foo = contextvars.ContextVar('foo') - foo.set('bar') context = contextvars.copy_context() f = self.loop.run_in_executor(None, run, 'yo', context=context) res = self.loop.run_until_complete(f) From ae42bf584020a49c2bf0fc7a761776281efe2a6c Mon Sep 17 00:00:00 2001 From: hellysmile Date: Sun, 1 Jul 2018 12:18:13 +0300 Subject: [PATCH 4/6] Simplify run_in_executor partial wrapper --- Lib/asyncio/base_events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 74934b52e0b38d..deb5069321604b 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -744,12 +744,10 @@ def run_in_executor(self, executor, func, *args, context=None): context = contextvars.copy_context() if args: - fn = functools.partial(func, *args) - else: - fn = func + func = functools.partial(func, *args) return futures.wrap_future( - executor.submit(context.run, fn), loop=self) + executor.submit(context.run, func), loop=self) def set_default_executor(self, executor): self._default_executor = executor From 0dd668243383709237138e5e2c0c8d8a996f8ee4 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Wed, 3 Oct 2018 17:03:01 +0300 Subject: [PATCH 5/6] Update run_in_executor for retain_context option --- Lib/asyncio/base_events.py | 28 +++++++++++++++++++++------- Lib/test/test_asyncio/test_events.py | 20 ++++++++++++++++---- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 4287e875d12902..b9282627d14da7 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -739,7 +739,8 @@ def call_soon_threadsafe(self, callback, *args, context=None): self._write_to_self() return handle - def run_in_executor(self, executor, func, *args, context=None): + def run_in_executor(self, executor, func, *args, context=None, + retain_context=False): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') @@ -749,14 +750,27 @@ def run_in_executor(self, executor, func, *args, context=None): executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor - if context is None: - context = contextvars.copy_context() + if retain_context: + if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): + raise RuntimeError( + 'retain_context=True supports only ThreadPoolExecutor') - if args: - func = functools.partial(func, *args) + if context is None: + context = contextvars.copy_context() - return futures.wrap_future( - executor.submit(context.run, func), loop=self) + if args: + runner = functools.partial(func, *args) + else: + runner = func + + runner = functools.partial(context.run, runner) + else: + if args: + runner = functools.partial(func, *args) + else: + runner = func + + return futures.wrap_future(executor.submit(runner), loop=self) def set_default_executor(self, executor): if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index afbf4b98c06040..48fcae03f309df 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -382,7 +382,7 @@ def run(): self.assertEqual(res, 'foo') return res - f = self.loop.run_in_executor(None, run) + f = self.loop.run_in_executor(None, run, retain_context=True) res = self.loop.run_until_complete(f) self.assertEqual(res, 'foo') @@ -393,7 +393,7 @@ def test_run_in_executor_no_context(self): def run(): return foo_ctx.get() - f = self.loop.run_in_executor(None, run) + f = self.loop.run_in_executor(None, run, retain_context=True) res = self.loop.run_until_complete(f) self.assertEqual(res, 'bar') @@ -402,7 +402,8 @@ def run(): return foo_ctx.get() context = contextvars.copy_context() - f = self.loop.run_in_executor(None, run, context=context) + f = self.loop.run_in_executor(None, run, context=context, + retain_context=True) res = self.loop.run_until_complete(f) self.assertEqual(res, 'bar') @@ -411,10 +412,21 @@ def run(arg): return (arg, foo_ctx.get()) context = contextvars.copy_context() - f = self.loop.run_in_executor(None, run, 'yo', context=context) + f = self.loop.run_in_executor(None, run, 'yo', context=context, + retain_context=True) res = self.loop.run_until_complete(f) self.assertEqual(res, ('yo', 'bar')) + def test_run_in_executor_context_subprocess(self): + def run(arg): + pass + + pool = concurrent.futures.ProcessPoolExecutor() + context = contextvars.copy_context() + with self.assertRaises(RuntimeError): + self.loop.run_in_executor(pool, run, retain_context=True) + pool.shutdown() + def test_reader_callback(self): r, w = socket.socketpair() r.setblocking(False) From 95ee37716c351bfbf78a7703c08e3bc046f40475 Mon Sep 17 00:00:00 2001 From: hellysmile Date: Thu, 4 Oct 2018 00:11:48 +0300 Subject: [PATCH 6/6] Simplify partial wrapper for context.run --- Lib/asyncio/base_events.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index b9282627d14da7..fbdf03eb37928f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -750,6 +750,11 @@ def run_in_executor(self, executor, func, *args, context=None, executor = concurrent.futures.ThreadPoolExecutor() self._default_executor = executor + if args: + runner = functools.partial(func, *args) + else: + runner = func + if retain_context: if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): raise RuntimeError( @@ -758,17 +763,7 @@ def run_in_executor(self, executor, func, *args, context=None, if context is None: context = contextvars.copy_context() - if args: - runner = functools.partial(func, *args) - else: - runner = func - runner = functools.partial(context.run, runner) - else: - if args: - runner = functools.partial(func, *args) - else: - runner = func return futures.wrap_future(executor.submit(runner), loop=self)