-
Notifications
You must be signed in to change notification settings - Fork 1.6k
ENG-8212: Redis Oplock implementation #5932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When an update is emitted for a token, but the websocket for that token is on another instance of the app, post it to the lost+found channel where other instances are listening for updates to send to their clients.
Set the groundwork for being able to broadcast updates to all connected states.
For more efficient and fair lock queueing, each StateManagerRedis uses a single task to monitor the keyspace for lock release/expire and then wakes up the next caller that was waiting in the queue (no fairness between separate processes though). Now lockers will wait for an `asyncio.Event` which is set by the redis pubsub waiter. If any locker waits longer than the lock_expiration, it will just try to get the lock in case there was some mixup with the pub/sub, the locker won't be blocked forever.
* When taking a lock from redis, hold it for 80% of the lock expiration timeout * While the lock is held, other events processed against the instance will use the cached in-memory copy of the state. * When the timeout expires or another process signals intention to access a locked state, flush the modifed states to redis and release the lock. Set REFLEX_OPLOCK_ENABLED=1 to use this feature
CodSpeed Performance ReportMerging #5932 will not alter performanceComparing Summary
|
…senf/redis_oplock
Always check redis for contended leases before granting a lease. It's a bit slower, but much more reliable and avoids racy lock_expiration timeouts when contention occurs before the lease is created or when the pubsub hasn't caught up to reality. Always start _lock_update_task in __post_init__ to avoid race where the lease is granted, then contended, but the pubsub task hasn't started to catch the contention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements an opportunistic locking (oplock) mechanism for the Redis state manager to improve performance when there's no lock contention. The key change is that when REFLEX_OPLOCK_ENABLED=1 is set, the state manager holds Redis locks for 80% of the expiration timeout and caches states in memory, allowing subsequent operations to avoid Redis round-trips entirely.
Key Changes:
- Holds locks for 80% of
lock_expirationtime to enable fast in-memory state access during uncontended periods - Uses Redis pubsub keyspace notifications to detect lock contention and flush cached states when other processes need access
- Adds comprehensive test coverage including mock Redis implementation and integration tests
- Gracefully handles cancellation and ensures state consistency by shielding flush operations from cancellation
- Maintains backward compatibility - feature is opt-in via environment variable
Implementation Quality:
- Well-structured with extensive test coverage (8 new test cases covering basic ops, contention, cancellation, and edge cases)
- Proper error handling and cleanup logic with
asyncio.shieldto prevent data loss - CI updated to test both with and without oplock enabled
- Good separation of concerns with
_try_modify_statehandling retry logic
Confidence Score: 4/5
- This PR is generally safe to merge with some considerations for production deployment
- Score of 4 reflects solid implementation with comprehensive testing, but this is a complex concurrency feature that introduces new failure modes. The oplock mechanism is well-tested and properly handles contention, cancellation, and edge cases. However, it's a significant architectural change to the state management system that could have unexpected interactions in production under high load or network issues. The feature is appropriately gated behind an environment variable and thoroughly tested in CI. One minor style suggestion was provided for code clarity.
- Pay close attention to
reflex/istate/manager/redis.py- this is the core implementation with complex async logic and lock management
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| reflex/istate/manager/redis.py | 4/5 | Implements opportunistic locking (oplock) for Redis state manager. Holds locks for 80% of expiration time to enable fast in-memory access when uncontended. Adds pubsub-based lock contention detection and local state caching. |
| reflex/environment.py | 5/5 | Adds two environment variables: REFLEX_REDIS_STATE_MANAGER_DEBUG for debug logging and REFLEX_OPLOCK_ENABLED to enable opportunistic locking feature. |
| tests/units/mock_redis.py | 5/5 | New mock Redis implementation supporting pubsub, keyspace notifications, and set operations. Also provides real_redis helper for integration testing. |
| tests/units/istate/manager/test_redis.py | 5/5 | Comprehensive test coverage for Redis state manager oplock feature. Tests basic operations, lock contention, lease management, cancellation handling, and substate fetching. |
| .github/workflows/unit_tests.yml | 5/5 | Adds Redis service container and additional test run with REFLEX_OPLOCK_ENABLED=true to verify oplock functionality in CI. |
Sequence Diagram
sequenceDiagram
participant Client1 as Client 1 (Process A)
participant SM1 as StateManager 1
participant Redis
participant PubSub as Redis PubSub
participant SM2 as StateManager 2
participant Client2 as Client 2 (Process B)
Note over Client1,Client2: Scenario: Oplock enabled, uncontended access
Client1->>SM1: modify_state(token)
SM1->>Redis: SET token_lock (NX, PX=lock_expiration)
Redis-->>SM1: OK (lock acquired)
SM1->>Redis: GET state data
Redis-->>SM1: state
SM1->>SM1: Cache state in _cached_states
SM1->>SM1: Create lease_breaker task (sleep 80% of lock_expiration)
SM1-->>Client1: yield cached_state
Client1->>Client1: Modify state in memory
Client1->>SM1: Exit context (fast, no Redis write)
Note over SM1: Lock held, state cached for subsequent calls
Client1->>SM1: modify_state(token) [2nd call]
SM1->>SM1: Check _cached_states
SM1-->>Client1: yield cached_state (no Redis lock!)
Client1->>Client1: Modify state in memory
Client1->>SM1: Exit context (fast, no Redis write)
Note over Client2,SM2: Contention scenario begins
Client2->>SM2: modify_state(token)
SM2->>Redis: SADD token_lock_waiters (signal contention)
Redis->>PubSub: keyspace event: sadd
PubSub->>SM1: Lock contention detected
SM1->>SM1: Cancel lease_breaker task
SM1->>Redis: SET state data (flush cached state)
SM1->>Redis: DEL token_lock (release)
Redis->>PubSub: keyspace event: del
PubSub->>SM2: Lock released notification
SM2->>Redis: SET token_lock (acquire)
Redis-->>SM2: OK
SM2->>Redis: GET state data
Redis-->>SM2: state (with Client1's changes)
SM2->>SM2: Cache state, create new lease_breaker
SM2-->>Client2: yield state
Client2->>Client2: Modify state
Client2->>SM2: Exit context
Note over SM1,SM2: Lease timeout scenario (no contention)
SM1->>SM1: lease_breaker wakes after 80% timeout
SM1->>Redis: SET state data (flush)
SM1->>Redis: DEL token_lock (release)
SM1->>SM1: Clear _cached_states[token]
10 files reviewed, 1 comment
No point in continually spamming "no running event loop" to the console.
the cached in-memory copy of the state.
locked state, flush the modifed states to redis and release the lock.
Set REFLEX_OPLOCK_ENABLED=1 to use this feature
Increases overall test coverage by ~1% without REFLEX_REDIS_URL