Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: event handler methods are not thread-safe #329

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions openfeature/_event_support.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import threading
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List

Expand All @@ -15,7 +16,10 @@
from openfeature.client import OpenFeatureClient


_global_lock = threading.RLock()
_global_handlers: Dict[ProviderEvent, List[EventHandler]] = defaultdict(list)

_client_lock = threading.RLock()
_client_handlers: Dict[OpenFeatureClient, Dict[ProviderEvent, List[EventHandler]]] = (
defaultdict(lambda: defaultdict(list))
)
Expand All @@ -24,41 +28,47 @@
def run_client_handlers(
client: OpenFeatureClient, event: ProviderEvent, details: EventDetails
) -> None:
for handler in _client_handlers[client][event]:
handler(details)
with _client_lock:
for handler in _client_handlers[client][event]:
handler(details)


def run_global_handlers(event: ProviderEvent, details: EventDetails) -> None:
for handler in _global_handlers[event]:
handler(details)
with _global_lock:
for handler in _global_handlers[event]:
handler(details)


def add_client_handler(
client: OpenFeatureClient, event: ProviderEvent, handler: EventHandler
) -> None:
handlers = _client_handlers[client][event]
handlers.append(handler)
with _client_lock:
handlers = _client_handlers[client][event]
handlers.append(handler)

_run_immediate_handler(client, event, handler)


def remove_client_handler(
client: OpenFeatureClient, event: ProviderEvent, handler: EventHandler
) -> None:
handlers = _client_handlers[client][event]
handlers.remove(handler)
with _client_lock:
handlers = _client_handlers[client][event]
handlers.remove(handler)


def add_global_handler(event: ProviderEvent, handler: EventHandler) -> None:
_global_handlers[event].append(handler)
with _global_lock:
_global_handlers[event].append(handler)

from openfeature.api import get_client

_run_immediate_handler(get_client(), event, handler)


def remove_global_handler(event: ProviderEvent, handler: EventHandler) -> None:
_global_handlers[event].remove(handler)
with _global_lock:
_global_handlers[event].remove(handler)


def run_handlers_for_provider(
Expand All @@ -72,9 +82,10 @@ def run_handlers_for_provider(
# run the global handlers
run_global_handlers(event, details)
# run the handlers for clients associated to this provider
for client in _client_handlers:
if client.provider == provider:
run_client_handlers(client, event, details)
with _client_lock:
for client in _client_handlers:
if client.provider == provider:
run_client_handlers(client, event, details)


def _run_immediate_handler(
Expand All @@ -91,5 +102,7 @@ def _run_immediate_handler(


def clear() -> None:
_global_handlers.clear()
_client_handlers.clear()
with _global_lock:
_global_handlers.clear()
with _client_lock:
_client_handlers.clear()
28 changes: 28 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -356,3 +359,28 @@ def test_provider_event_late_binding():

# Then
spy.provider_configuration_changed.assert_called_once_with(details)


def test_client_handlers_thread_safety():
provider = NoOpProvider()
set_provider(provider)

def add_handlers_task():
def handler(*args, **kwargs):
time.sleep(0.005)

for _ in range(10):
time.sleep(0.01)
client = get_client(str(uuid.uuid4()))
client.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, handler)

def emit_events_task():
for _ in range(10):
time.sleep(0.01)
provider.emit_provider_configuration_changed(ProviderEventDetails())

with ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(add_handlers_task)
f2 = executor.submit(emit_events_task)
f1.result()
f2.result()