Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safety of device_context #11

Closed
eric-wieser opened this issue Aug 11, 2020 · 14 comments · Fixed by #1576
Closed

Thread-safety of device_context #11

eric-wieser opened this issue Aug 11, 2020 · 14 comments · Fixed by #1576
Assignees

Comments

@eric-wieser
Copy link

eric-wieser commented Aug 11, 2020

Just to record the issues raised by @hameerabbasi and I in the numba dev call:

  • If two threads use with device_context in parallel, do they interfere with each other?
  • If two async tasks use with_device_context in parallel, do they interfere with each other? (xref PEP 567)
  • Can a child thread reuse the queue of a parent thread?
@diptorupd diptorupd added this to the gold milestone Oct 8, 2020
@diptorupd diptorupd self-assigned this Oct 8, 2020
@diptorupd
Copy link
Contributor

@PokhodenkoSA this requirement should be considered when we reevaluate the queue manager design. If we make the get_active_queues re-entrant that should be enough? Basically, we have to remove the use of the static global stack for the queues. Adding this for gold.

@diptorupd
Copy link
Contributor

@oleksandr-pavlyk since we are talking about the specs, these questions should be addressed.

@oleksandr-pavlyk
Copy link
Contributor

oleksandr-pavlyk commented Apr 6, 2021

Here is an example I used to verify that dpctl works correctly with coroutines:

import dpctl
import dpctl.memory as dpmem
import random
import asyncio

async def task1():
    q = dpctl.SyclQueue("opencl:gpu")
    abc = b"abcdefghijklmnopqrstuvwxyz"
    for _ in range(100):
        with dpctl.device_context(q):
            m = dpmem.MemoryUSMShared(len(abc))
            m.copy_from_host(abc)
        await asyncio.sleep(0.1*random.random())

async def task2():
    q = dpctl.SyclQueue("level_zero:gpu")
    for _ in range(100):
        with dpctl.device_context(q):
            m = dpmem.MemoryUSMShared(10)
            m.copy_from_host(b'\x00' * 10)
        await asyncio.sleep(0.1*random.random())

async def main():
    j1 = asyncio.create_task(task1())
    j2 = asyncio.create_task(task2())
    await j1
    await j2

    print("done")

asyncio.run(main())

It executes normally, and outputs "done" as expected without errors.

@eric-wieser
Copy link
Author

@oleksandr-pavlyk, the relevant test is one that uses await inside the with statement. Your test does not test anything related to await in a meaningful way.

@oleksandr-pavlyk
Copy link
Contributor

oleksandr-pavlyk commented Apr 6, 2021

Thank you @eric-wieser, having moved await inside the with context I still see no issues:

import dpctl
import dpctl.memory as dpmem
import random
import asyncio

async def task1():
    q = dpctl.SyclQueue("opencl:gpu")
    abc = b"abcdefghijklmnopqrstuvwxyz"
    m = dpmem.MemoryUSMShared(len(abc))
    for _ in range(100):
        with dpctl.device_context(q) as lq:
            cd = dpctl.get_current_queue().sycl_device
            assert cd.backend == q.sycl_device.backend
            m.copy_from_host(abc)
            await asyncio.sleep(0.1*random.random())

async def task2():
    q = dpctl.SyclQueue("level_zero:gpu")
    m = dpmem.MemoryUSMShared(10)
    for _ in range(100):
        with dpctl.device_context(q) as lq:
            cd = dpctl.get_current_queue().sycl_device
            assert cd.backend == q.sycl_device.backend
            m.copy_from_host(b'\x00' * 10)
            await asyncio.sleep(0.1*random.random())

async def main():
    j1 = asyncio.create_task(task1())
    j2 = asyncio.create_task(task2())
    await j1
    await j2

    print("done")

asyncio.run(main())

The code uses two queues with different backends, and checks that the current queue, which is being reset with the context, has the expected backend.

(idp) [07:08:54 nuc04 dpctl]$ time python async.py
done

real    0m6.056s
user    0m2.015s
sys     0m0.265s

@eric-wieser
Copy link
Author

eric-wieser commented Apr 6, 2021

Yes, that code also is unlikely to detect any problems because it doesn't actually use the context after resuming from the await.

Swapping the with with the for should ensure the problem surfaces.

To be clear, #265 already outlines the solution here - the state must be stored in PEP 567's ContextVars.

@oleksandr-pavlyk
Copy link
Contributor

Ok, indeed

import dpctl
import dpctl.memory as dpmem
import random
import asyncio

async def task1():
    q = dpctl.SyclQueue("opencl:gpu")
    abc = b"abcdefghijklmnopqrstuvwxyz"
    m = dpmem.MemoryUSMShared(len(abc))
    with dpctl.device_context(q) as lq:
        for _ in range(100):
            cd = dpctl.get_current_queue().sycl_device
            assert cd.backend == q.sycl_device.backend
            m.copy_from_host(abc)
            await asyncio.sleep(0.1*random.random())

async def task2():
    q = dpctl.SyclQueue("level_zero:gpu")
    m = dpmem.MemoryUSMShared(10)
    with dpctl.device_context(q) as lq:
        for _ in range(100):
            cd = dpctl.get_current_queue().sycl_device
            assert cd.backend == q.sycl_device.backend
            m.copy_from_host(b'\x00' * 10)
            await asyncio.sleep(0.1*random.random())

async def main():
    j1 = asyncio.create_task(task1())
    j2 = asyncio.create_task(task2())
    await j1
    await j2

    print("done")

asyncio.run(main())

trips.

@oleksandr-pavlyk
Copy link
Contributor

@eric-wieser

Here is a Cython file I used:

# filename: stack.pyx
# distutils: language = c++
# cython: language_level=3

import contextvars
from contextlib import contextmanager

from libcpp.vector cimport vector

ctypedef vector.vector[size_t] stack_t

cdef class Stack:
    cdef stack_t stack

    def __cinit__(self, size_t v=0):
        self.stack = stack_t()
        self.stack.push_back(v)

    def push(self, size_t v):
        self.stack.push_back(v)

    def pop(self):
        self.stack.pop_back()

    def top(self):
        return self.stack.at(0)

    def set_global(self, size_t v):
        self.stack[0] = v

    def current(self):
        return self.stack.back()

    def copy(self):
        cdef Stack _copy = Stack.__new__(Stack, self.stack[0])
        for i in range(1, self.stack.size()):
            _copy.stack.push_back(self.stack[i])
        return _copy

_st = Stack()

@contextmanager
def working_stack_context(v):
    tmp = None
    try:
        tmp = _st.copy()
        tmp.push(v)
        yield tmp
    finally:
        if tmp is not None:
            tmp.pop()
        else:
            raise TypeError("Argument {} is not of size_t".format(v))

@contextmanager
def broken_stack_context(v):
    tmp = None
    try:
        tmp = _st
        tmp.push(v)
        yield tmp
    finally:
        if tmp is not None:
            tmp.pop()
        else:
            raise TypeError("Argument {} is not of size_t".format(v))

Using the asyncio driver like in the previous comment:

# filename: asyncio_run.py
import random
import asyncio
import stack

stack_context = stack.working_stack_context

async def task1():
    v = 11
    with stack_context(v) as s:
        for i in range(100):
            c = s.current()
            assert c == v, "task1 check failed at i={}, (c, v) = ({}, {})".format(i, c, v)
            await asyncio.sleep(0.1*random.random())

async def task2():
    v = 7
    with stack_context(v) as s:
        for j in range(100):
            c = s.current()
            assert c == v, "task2 check failed at j={}, (c,v)=({},{})".format(j, c, v)
            await asyncio.sleep(0.1*random.random())

async def main():
    j1 = asyncio.create_task(task1())
    j2 = asyncio.create_task(task2())
    await j1
    await j2

    print("done")

asyncio.run(main())

The code executes just fine:

(idp) [08:45:45 nuc04 async_stack]$ python async_run.py
done

As soon as I replace stack_context = stack.working_stack_context with stack_context = stack.broken_stack_context things fall apart:

(idp) [08:47:56 nuc04 async_stack]$ python async_run.py
Traceback (most recent call last):
  File "async_run.py", line 31, in <module>
    asyncio.run(main())
  File "~/.conda/envs/idp/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "~/.conda/envs/idp/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "async_run.py", line 26, in main
    await j1
  File "async_run.py", line 12, in task1
    assert c == v, "task1 check failed at i={}, (c, v) = ({}, {})".format(i, c, v)
AssertionError: task1 check failed at i=1, (c, v) = (7, 11)

I was never able to do make it work with contextvars.ContextVar. Can you perhaps refer me to an example?

@eric-wieser
Copy link
Author

broken_stack_context fails for obvious reasons, the two tasks both push to the same stack when they enter the with block, meaning after both have hit their first await, one sees the others value.

Your working_stack_context succeeds because it creates a fresh stack for each task - of course two unrelated objects won't interfere with each other.
However, it's not representative of the original problem any more - in the real API, you don't use s at all, but use _st in both pieces of code.

Cython here is a distraction I think, and not really relevant to what you're struggling with.

this example from the python docs seems reasonable.

@oleksandr-pavlyk
Copy link
Contributor

You are right, use of Cython can be avoided.

# filename: stack2.py

import contextvars
from contextlib import contextmanager

class Stack:
    def __init__(self, v):
        self.stack = [v]

    def push(self, v):
        self.stack.append(v)

    def pop(self):
        self.stack.pop()

    def top(self):
        return self.stack[0]

    def set_global(self, v):
        self.stack[0] = v

    def current(self):
        return self.stack[-1]

    def __len__(self):
        return len(self.stack)

    def copy(self):
        _cpy = Stack.__new__(Stack, self.stack[0])
        _cpy.__init__(self.stack[0])
        for i in range(1, len(self)):
            _cpy.push(self.stack[i])
        return _cpy


_stack = contextvars.ContextVar('global stack', default=Stack(0))

@contextmanager
def stack_context(v):
    token = None
    try:
        tmp = _stack.get().copy()
        tmp.push(v)
        token = _stack.set(tmp)
        yield tmp
    finally:
        if token is not None:
            _stack.reset(token)
        else:
            raise TypeError("Argument {} can be used".format(v))

This context manager works as expected with async_run.py, where I do import stack2 as stack, and stack_context = stack.stack_context.

The think I was missing was the concept of Token, and how it is needed to restore to the previous context state.

So in dpctl we need two changes:

  1. Use contextvars.ContextVar to store the global instance of the queue manager
  2. QueueMgr class needs to implement a .copy method.

@eric-wieser
Copy link
Author

I think the mistake here is using DPCTLQueueMgr_GetCurrentQueue at all

DPCTLQueueMgr is a nice API for use from C where thread-locals are the only thing to worry about - but it's completely superfluous in Python, where ContextVar is already the manager. I'd recommend a current_queue variable in Python / Cython that stores a ContextVar pointing to a cl::sycl::queue

@diptorupd
Copy link
Contributor

diptorupd commented Apr 12, 2021

I think the mistake here is using DPCTLQueueMgr_GetCurrentQueue at all

Yes, the whole issue arises because we are keeping the state in C rather than in Python. I initially did it this way to make it easy for us to get the current queue by just calling DPCTLQueueMgr_GetCurrentQueue from any native code. E.g., one could set the "current queue" in Python and then retrieve that queue inside an LLVM module numba-dppy generates by simply calling the C API function.

I support not storing the state globally in the C library. Also, we can use async-context-managers as syntactic sugar.

@shssf @PokhodenkoSA @reazulhoque I am adding you to the discussion here, as these changes will impact how numba-dppy and dpnp use dpctl.

@eric-wieser
Copy link
Author

For reference, there is a C API for contextvars too; PyContextVar_Get is probably all you need.

@eric-wieser
Copy link
Author

Also, we can use async-context-managers as syntactic sugar.

I see no benefit to using async context manager here. Async context managers are for when your context manager requires access to the event loop to enter and leave the context. dpctl doesn't need access to the event loop, so there's no point doing this.

oleksandr-pavlyk added a commit that referenced this issue Feb 17, 2023
This function caches queues by (context, device) key.
The cache is stored in contextvars.ContextVar variable, learning
our lessons from issue gh-11.

get_device_cached_queue(dev : dpctl.SyclDevice) -> dpctl.SyclQueue
get_device_cached_queue(
     (ctx: dpctl.SyclContext, dev : dpctl.SyclDevice)
) -> dpctl.SyclQueue

Function retrieves the queue from cache, or adds the new queue instance
there if previously absent.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants