Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rx.background and StateManager.modify_state provides safe exclusive access to state #1676

Merged
merged 62 commits into from
Sep 21, 2023

Conversation

masenf
Copy link
Collaborator

@masenf masenf commented Aug 24, 2023

App.modify_state provides exclusive access to state AND sends a state update after exiting the async contextmanager.

Background EventHandler example

@rx.background enables an EventHandler to not block the event loop!

A background task can either yield events that will update the state or enter async with self context block to directly munge the state.

import asyncio

import reflex as rx


class State(rx.State):
    counter: int = 0
    _task_id: int = 0

    @rx.background
    async def handle_event(self):
        async with self:
            task_id = self._task_id
            self._task_id += 1
        print(f"[{task_id}] background event starting; counter = {self.counter}")
        for ix in range(5):
            await asyncio.sleep(2)
            print(f"[{task_id}] [{ix}] background event stale counter = {self.counter}")
            async with self:
                print(f"[{task_id}] [{ix}] background event bumping counter = {self.counter}")
                self.counter += 1
            yield rx.console_log(f"Background task {task_id} is on iteration {ix}")

    @rx.background
    async def handle_event_yield_only(self):
        async with self:
            task_id = self._task_id
            self._task_id += 1
        print(f"[{task_id}] background event starting; counter = {self.counter}")
        for ix in range(5):
            await asyncio.sleep(2)
            yield State.increment_arbitrary(5)
            yield rx.console_log(f"Background task {task_id} is on iteration {ix}")

    def increment(self):
        self.counter += 1

    def increment_arbitrary(self, amount: int):
        self.counter += int(amount)

    def reset_counter(self):
        self.counter = 0

    async def chain_background_task(self, direct: bool):
        if direct:
            await self.non_blocking_pause()
        else:
            # must yield / return, no direct calling of background tasks
            yield State.non_blocking_pause()

    async def blocking_pause(self):
        await asyncio.sleep(5)
        return rx.console_log("Blocking pause done")

    @rx.event.background
    async def non_blocking_pause(self):
        await asyncio.sleep(5)
        return rx.console_log("Non-blocking pause done")


def index() -> rx.Component:
    return rx.vstack(
        rx.heading(State.counter),
        rx.button("Delayed Increment", on_click=State.handle_event),
        rx.button("Yield Increment", on_click=State.handle_event_yield_only),
        rx.button("Increment", on_click=State.increment),
        rx.button("Blocking Pause", on_click=State.blocking_pause),
        rx.button("Non-Blocking Pause", on_click=State.non_blocking_pause),
        rx.button("Chain background", on_click=lambda: State.chain_background_task(False)),
        rx.button("Chain background direct", on_click=lambda: State.chain_background_task(True)),
        rx.button("Reset", on_click=State.reset_counter),
    )


app = rx.App()
app.add_page(index)
app.compile()

Background task example app

import asyncio
import uuid

import reflex as rx


TASKS = set()


def background_task(token) -> asyncio.Task:
    task_id = uuid.uuid4()
    async def _task():
        for ix in range(10):
            await asyncio.sleep(1)
            async with app.modify_state(token) as state:
                state.messages.append(f"[{task_id}] [{ix}] Hello to {token}")
        print(f"Task {task_id} finished")
    return asyncio.create_task(_task())


class State(rx.State):
    messages: list[str] = []
    counter: int = 0

    async def start_background_task(self):
        """Start a background task."""
        t = background_task(self.get_token())
        t.add_done_callback(TASKS.discard)
        TASKS.add(t)

    async def increment_counter(self):
        await asyncio.sleep(0.25)
        self.counter += 1

    @rx.var
    def reversed_messages(self) -> list[str]:
        return self.messages[::-1]


def index() -> rx.Component:
    return rx.fragment(
        rx.color_mode_button(rx.color_mode_icon(), float="right"),
        rx.vstack(
            rx.heading(f"Counter: {State.counter}"),
            rx.button("Start background task", on_click=State.start_background_task),
            rx.button("Increment counter", on_click=State.increment_counter),
            rx.foreach(
                State.reversed_messages,
                lambda m: rx.text(m),
            ),
        ),
    )


app = rx.App()
app.add_page(index)
app.compile()

Run it with Redis?

docker run -d -p 6379:6379 redis
REDIS_URL=localhost:6379 reflex run

Bugs

Click Increment counter about 10 times quickly, then click Start Background Task... then it deadlocks 😔 Fixed via b471403

Remaining Work

  • redis lock expiry
  • unit tests for StateProxy and background events
  • end-to-end integration tests w/ redis

masenf added 3 commits August 24, 2023 12:40
App.modify_state provides exclusive access to state AND sends a state update
after exiting the async contextmanager
…ations

beat a race condition where the lock is held, but gets dropped before we can
subscribe to notifications for the del key event
masenf added 3 commits August 30, 2023 13:12
When encountering a background task EventHandler, move processing to an asyncio
task and yield a final state update to unblock other client events.

Background tasks do not get direct access to the State, but instead get `self`
as a `StateProxy`, which protects against mutation of the vars. The
`StateProxy` exposes async contextmanager protocol which refreshes the internal
proxied instance from the state_manager and holds an exclusive lock on the
token state while in context, allowing safe, stable reads and writes of state
vars from a long running background process.
@masenf masenf changed the title [WiP] StateManager: modify_state provides exclusive access to state rx.background and StateManager.modify_state provides safe exclusive access to state Aug 31, 2023
@Alek99 Alek99 self-requested a review August 31, 2023 19:40
Copy link
Contributor

@picklelo picklelo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, working really well for me! I think the async with self syntax is nice. I didn't review the state.py file yet, will add comments to that

reflex/app.py Outdated Show resolved Hide resolved
reflex/app.py Show resolved Hide resolved
reflex/app.py Outdated Show resolved Hide resolved
reflex/state.py Outdated Show resolved Hide resolved
reflex/state.py Outdated Show resolved Hide resolved
reflex/state.py Outdated Show resolved Hide resolved
reflex/state.py Outdated Show resolved Hide resolved
improve the annotation and semantics of calling _process_background with a
non-backgroundable event
Provide key expiration for `_redis_lock` operation.

Raise LockExpiredError when trying to commit state after the lock expires.

Use class-level constants for redis keyspace notification options and events
that indicate a lock for a given key was released.

Use keyspace notifications for the particular lock key we're waiting on, rather
than keyevent notifications for all deleted or expired keys.
avoid sharing locks between instances of StateManager
It's easier to test if we're not blocking for minimum 1 second
This isn't ideal, but needed for py3.8 and py3.9 to work correctly.
Cloudpickle cannot work with states defined inside functions, they must be
defined in an actual module.
Update tests to work when the StateManager is backed by an actual redis instance
github actions can be kinda slow, so give us some more wiggling room
Try to avoid CI flakiness by waiting much longer for lock expiry when running in CI
allows ImmutableMutableProxy to carry its immutable status
It's right next to the definition of rx.background itself and only defined
once, then used within the State.
Inline with previously added MutableProxy and ImmutableMutableProxy
Avoid mixing logic between two different StateManager implementations.

Fix all tests to identify StateManager type based on these classes.

Update all integration (AppHarness) tests to work with redis. Replace
emit_state_update with get_state, set_state, and modify_state functions on
AppHarness.
@masenf
Copy link
Collaborator Author

masenf commented Sep 19, 2023

@picklelo thanks for the feedback. I addressed almost everything here: https://github.com/reflex-dev/reflex/pull/1676/files/f6ba2ab4eb855837ae517e05ffc187765bbaaa93..1093229068b6d7f5aec2917c5d08b0ad43fb4211

I also went ahead and fixed all of the AppHarness tests to work with redis, so now we're running with both StateManagerMemory and StateManagerRedis for better coverage. This will also ensure that future tests and features are at least considering how they interop with redis backing store. The refactoring of the StateManager as an abstract base class also opens the door to simpler implementation of future backing stores if we ever need or want that.

As for async with self I've been pondering over potentially better syntax, but i haven't found anything i particularly like. I don't really want to expose an underscore method as part of the API, but I also am reluctant to restrict downstream users from another name. And I'm a sucker for the conciseness of async with self. I think we should put that one to the community and see what people think.

As an aside, I had to kick the CI on reflex-web a few times; I wouldn't think it's my changes here, but the error is weird and I don't really understand it. Edit: same error on a different PR that's already merged.

Copy link
Collaborator Author

@masenf masenf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further testing has revealed that this sort of breaks yielding updates in normal event handlers. Need to write a test case and investigate.

Fixed and test case written ✔️

Copy link
Contributor

@picklelo picklelo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!!

@@ -89,7 +97,7 @@ class App(Base):
state: Type[State] = DefaultState

# Class to manage many client states.
state_manager: StateManager = StateManager()
state_manager: StateManager = StateManagerMemory(state=DefaultState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set this if we're overriding it again in the __init__?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to make it Optional and default it to None because then we have to put is not None checks all over the place.

But if we do something like StateManager.create(state=DefaultState) here, and redis is used, then we create an unserializable default, which pydantic doesn't like.

So the default StateManager is the StateManagerMemory, then inside __init__, we call StateManager.create(...) to update the reference to either memory or redis.


def get_state(self, token: str) -> State:

class StateManagerMemory(StateManager):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, this is much cleaner we can extend to other backends in the future too

@picklelo picklelo merged commit 351611c into main Sep 21, 2023
36 checks passed
Alek99 pushed a commit that referenced this pull request Sep 26, 2023
@picklelo picklelo deleted the masenf/oob-state-update branch October 9, 2023 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants