diff --git a/sdk/python/packages/flet-core/src/flet_core/event_handler.py b/sdk/python/packages/flet-core/src/flet_core/event_handler.py index 8a1c4cccc..0887ea133 100644 --- a/sdk/python/packages/flet-core/src/flet_core/event_handler.py +++ b/sdk/python/packages/flet-core/src/flet_core/event_handler.py @@ -1,4 +1,6 @@ -from flet_core.utils import is_asyncio, is_coroutine +import asyncio + +from flet_core.utils import is_asyncio class EventHandler: @@ -39,12 +41,12 @@ async def __async_handler(self, e): r.data = e.data r.control = e.control r.page = e.page - if is_coroutine(h): + if asyncio.iscoroutinefunction(h): await h(r) else: h(r) else: - if is_coroutine(h): + if asyncio.iscoroutinefunction(h): await h(e) else: h(e) diff --git a/sdk/python/packages/flet-core/src/flet_core/page.py b/sdk/python/packages/flet-core/src/flet_core/page.py index e090ae5d6..645b3f8a7 100644 --- a/sdk/python/packages/flet-core/src/flet_core/page.py +++ b/sdk/python/packages/flet-core/src/flet_core/page.py @@ -9,6 +9,7 @@ from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple, Union, cast from urllib.parse import urlparse +from warnings import warn import flet_core from flet_core.adaptive_control import AdaptiveControl @@ -49,7 +50,8 @@ ThemeMode, ThemeModeString, ) -from flet_core.utils import is_asyncio, is_coroutine +from flet_core.utils import is_asyncio +from flet_core.utils.concurrency_utils import is_pyodide from flet_core.view import View logger = logging.getLogger(flet_core.__name__) @@ -124,7 +126,7 @@ def __init__( self.__pool = pool self._index = {self._Control__uid: self} # index with all page controls - self.__lock = threading.Lock() if not is_asyncio() else NopeLock() + self.__lock = threading.Lock() if not is_pyodide() else NopeLock() self.__async_lock = asyncio.Lock() if is_asyncio() else AsyncNopeLock() self.__views = [View()] @@ -289,12 +291,8 @@ def update(self, *controls): self.__handle_mount_unmount(*r) async def update_async(self, *controls): - async with self.__async_lock: - if len(controls) == 0: - r = await self.__update_async(self) - else: - r = await self.__update_async(*controls) - await self.__handle_mount_unmount_async(*r) + warn("Obsolete. Use page.update() method instead.") + self.update(*controls) def add(self, *controls): with self.__lock: @@ -303,10 +301,8 @@ def add(self, *controls): self.__handle_mount_unmount(*r) async def add_async(self, *controls): - async with self.__async_lock: - self._controls.extend(controls) - r = await self.__update_async(self) - await self.__handle_mount_unmount_async(*r) + warn("Obsolete. Use page.add() method instead.") + self.add(*controls) def insert(self, at, *controls): with self.__lock: @@ -318,13 +314,8 @@ def insert(self, at, *controls): self.__handle_mount_unmount(*r) async def insert_async(self, at, *controls): - async with self.__async_lock: - n = at - for control in controls: - self._controls.insert(n, control) - n += 1 - r = await self.__update_async(self) - await self.__handle_mount_unmount_async(*r) + warn("Obsolete. Use page.insert() method instead.") + self.insert(at, *controls) def remove(self, *controls): with self.__lock: @@ -334,11 +325,8 @@ def remove(self, *controls): self.__handle_mount_unmount(*r) async def remove_async(self, *controls): - async with self.__async_lock: - for control in controls: - self._controls.remove(control) - r = await self.__update_async(self) - await self.__handle_mount_unmount_async(*r) + warn("Obsolete. Use page.remove() method instead.") + self.remove(*controls) def remove_at(self, index): with self.__lock: @@ -347,18 +335,16 @@ def remove_at(self, index): self.__handle_mount_unmount(*r) async def remove_at_async(self, index): - async with self.__async_lock: - self._controls.pop(index) - r = await self.__update_async(self) - await self.__handle_mount_unmount_async(*r) + warn("Obsolete. Use page.remove_at() method instead.") + self.remove_at(index) def clean(self): self._clean(self) self._controls.clear() async def clean_async(self): - await self._clean_async(self) - self._controls.clear() + warn("Obsolete. Use page.clean() method instead.") + self.clean() def _clean(self, control: Control): with self.__lock: @@ -373,19 +359,6 @@ def _clean(self, control: Control): for c in removed_controls: c.will_unmount() - async def _clean_async(self, control: Control): - async with self.__async_lock: - control._previous_children.clear() - assert control.uid is not None - removed_controls = [] - for child in control._get_children(): - removed_controls.extend( - self._remove_control_recursively(self.index, child) - ) - await self._send_command_async("clean", [control.uid]) - for c in removed_controls: - await c.will_unmount_async() - def _close(self): self.__pubsub.unsubscribe_all() removed_controls = self._remove_control_recursively(self.index, self) @@ -395,12 +368,8 @@ def _close(self): self.__close_internal() async def _close_async(self): - await self.__pubsub.unsubscribe_all_async() - removed_controls = self._remove_control_recursively(self.index, self) - for c in removed_controls: - await c.will_unmount_async() - c._dispose() - self.__close_internal() + warn("Obsolete. Use page.clean() method instead.") + self.clean() def __close_internal(self): self._controls.clear() @@ -421,17 +390,6 @@ def __update(self, *controls) -> Tuple[List[Control], List[Control]]: self.__update_control_ids(added_controls, results) return added_controls, removed_controls - async def __update_async(self, *controls) -> Tuple[List[Control], List[Control]]: - if self.__conn is None: - raise PageDisconnectedException("Page has been disconnected") - commands, added_controls, removed_controls = self.__prepare_update(*controls) - self.__validate_controls_page(added_controls) - results = ( - await self.__conn.send_commands_async(self._session_id, commands) - ).results - self.__update_control_ids(added_controls, results) - return added_controls, removed_controls - def __prepare_update(self, *controls): added_controls = [] removed_controls = [] @@ -474,20 +432,13 @@ def __handle_mount_unmount(self, added_controls, removed_controls): for ctrl in added_controls: ctrl.did_mount() - async def __handle_mount_unmount_async(self, added_controls, removed_controls): - for ctrl in removed_controls: - await ctrl.will_unmount_async() - ctrl.page = None - for ctrl in added_controls: - await ctrl.did_mount_async() - def error(self, message=""): with self.__lock: self._send_command("error", [message]) async def error_async(self, message=""): - async with self.__async_lock: - await self._send_command_async("error", [message]) + warn("Obsolete. Use page.error() method instead.") + self.error(message) async def on_event_async(self, e: Event): logger.debug(f"page.on_event_async: {e.target} {e.name} {e.data}") @@ -500,13 +451,14 @@ async def on_event_async(self, e: Event): ce = ControlEvent(e.target, e.name, e.data, self._index[e.target], self) handler = self._index[e.target].event_handlers.get(e.name) if handler: - if is_coroutine(handler): + if asyncio.iscoroutinefunction(handler): await handler(ce) + elif is_pyodide(): + handler(ce) else: - # TODO - handle pyodide mode # run in thread pool - await asyncio.get_running_loop().run_in_executor( - self.__pool, handler, ce + self.__loop.call_soon_threadsafe( + self.__loop.run_in_executor, self.__pool, handler, ce ) def __on_page_change_event(self, data): @@ -1167,7 +1119,7 @@ def auth(self): # pubsub @property - def pubsub(self): + def pubsub(self) -> PubSub: return self.__pubsub # overlay diff --git a/sdk/python/packages/flet-core/src/flet_core/utils/__init__.py b/sdk/python/packages/flet-core/src/flet_core/utils/__init__.py index 9d2094154..476ac9586 100644 --- a/sdk/python/packages/flet-core/src/flet_core/utils/__init__.py +++ b/sdk/python/packages/flet-core/src/flet_core/utils/__init__.py @@ -1,4 +1,4 @@ -from flet_core.utils.concurrency_utils import is_asyncio, is_coroutine +from flet_core.utils.concurrency_utils import is_asyncio, is_pyodide from flet_core.utils.slugify import slugify from flet_core.utils.strings import random_string from flet_core.utils.vector import Vector diff --git a/sdk/python/packages/flet-core/src/flet_core/utils/concurrency_utils.py b/sdk/python/packages/flet-core/src/flet_core/utils/concurrency_utils.py index bc12d3512..dd78bb556 100644 --- a/sdk/python/packages/flet-core/src/flet_core/utils/concurrency_utils.py +++ b/sdk/python/packages/flet-core/src/flet_core/utils/concurrency_utils.py @@ -1,5 +1,4 @@ import asyncio -import inspect import sys @@ -10,5 +9,5 @@ def is_asyncio(): return False -def is_coroutine(method): - return inspect.iscoroutinefunction(method) +def is_pyodide(): + return sys.platform == "emscripten" diff --git a/sdk/python/packages/flet-pyodide/src/flet/flet.py b/sdk/python/packages/flet-pyodide/src/flet/flet.py index 7fc40cc50..f09c291f3 100644 --- a/sdk/python/packages/flet-pyodide/src/flet/flet.py +++ b/sdk/python/packages/flet-pyodide/src/flet/flet.py @@ -7,7 +7,6 @@ from flet.pyodide_connection import PyodideConnection from flet_core.event import Event from flet_core.page import Page -from flet_core.utils import is_coroutine logger = logging.getLogger(flet.__name__) @@ -66,7 +65,7 @@ async def on_session_created(session_data): logger.info(f"Session started: {session_data.sessionID}") try: assert target is not None - if is_coroutine(target): + if asyncio.iscoroutinefunction(target): await target(page) else: target(page) diff --git a/sdk/python/packages/flet-runtime/src/flet_runtime/app.py b/sdk/python/packages/flet-runtime/src/flet_runtime/app.py index 5ff43e4fe..fe2bb49b9 100644 --- a/sdk/python/packages/flet-runtime/src/flet_runtime/app.py +++ b/sdk/python/packages/flet-runtime/src/flet_runtime/app.py @@ -16,7 +16,7 @@ from flet_core.event import Event from flet_core.page import Page from flet_core.types import AppView, WebRenderer -from flet_core.utils import is_coroutine, random_string +from flet_core.utils import random_string from flet_runtime.flet_socket_server import FletSocketServer from flet_runtime.utils import ( get_arch, @@ -233,7 +233,7 @@ async def on_session_created(session_data): logger.info(f"Session started: {session_data.sessionID}") try: assert session_handler is not None - if is_coroutine(session_handler): + if asyncio.iscoroutinefunction(session_handler): await session_handler(page) else: # run in thread pool diff --git a/sdk/python/packages/flet-runtime/src/flet_runtime/flet_socket_server.py b/sdk/python/packages/flet-runtime/src/flet_runtime/flet_socket_server.py index 2c6f588e9..c8061c845 100644 --- a/sdk/python/packages/flet-runtime/src/flet_runtime/flet_socket_server.py +++ b/sdk/python/packages/flet-runtime/src/flet_runtime/flet_socket_server.py @@ -20,6 +20,7 @@ RegisterWebClientRequestPayload, ) from flet_core.utils import random_string +from flet_runtime.pubsub import PubSubHub from flet_runtime.utils import get_free_tcp_port, is_windows logger = logging.getLogger(flet_runtime.__name__) @@ -43,6 +44,7 @@ def __init__( self.__on_session_created = on_session_created self.__blocking = blocking self.__pool = pool + self.pubsubhub = PubSubHub(loop=asyncio.get_running_loop(), pool=pool) self.__running_tasks = set() async def start(self): diff --git a/sdk/python/packages/flet-runtime/src/flet_runtime/pubsub.py b/sdk/python/packages/flet-runtime/src/flet_runtime/pubsub.py index 551b2731e..f3b0d7470 100644 --- a/sdk/python/packages/flet-runtime/src/flet_runtime/pubsub.py +++ b/sdk/python/packages/flet-runtime/src/flet_runtime/pubsub.py @@ -1,26 +1,35 @@ import asyncio import logging import threading -from typing import Any, Callable, Dict, Iterable +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Awaitable, Callable, Coroutine, Dict, Iterable, Optional, Union +from warnings import warn import flet_runtime -from flet_core.locks import AsyncNopeLock, NopeLock -from flet_core.utils import is_asyncio +from flet_core.locks import NopeLock +from flet_core.utils.concurrency_utils import is_pyodide logger = logging.getLogger(flet_runtime.__name__) class PubSubHub: - def __init__(self): + def __init__( + self, + loop: Optional[asyncio.AbstractEventLoop] = None, + pool: Optional[ThreadPoolExecutor] = None, + ): logger.debug("Creating new PubSubHub instance") - self.__lock = threading.Lock() if not is_asyncio() else NopeLock() - self.__async_lock = asyncio.Lock() if is_asyncio() else AsyncNopeLock() - self.__subscribers: Dict[str, Callable] = {} # key: session_id, value: handler + self.__loop = loop + self.__pool = pool + self.__lock = threading.Lock() if not is_pyodide() else NopeLock() + self.__subscribers: Dict[ + str, Union[Callable, Callable[..., Awaitable[Any]]] + ] = {} # key: session_id, value: handler self.__topic_subscribers: Dict[ - str, Dict[str, Callable] + str, Dict[str, Union[Callable, Callable[..., Awaitable[Any]]]] ] = {} # key: topic, value: dict[session_id, handler] self.__subscriber_topics: Dict[ - str, Dict[str, Callable] + str, Dict[str, Union[Callable, Callable[..., Awaitable[Any]]]] ] = {} # key: session_id, value: dict[topic, handler] def send_all(self, message: Any): @@ -29,12 +38,6 @@ def send_all(self, message: Any): for handler in self.__subscribers.values(): self.__send(handler, [message]) - async def send_all_async(self, message: Any): - logger.debug(f"pubsub.send_all_async({message})") - async with self.__async_lock: - for handler in self.__subscribers.values(): - await self.__send_async(handler, [message]) - def send_all_on_topic(self, topic: str, message: Any): logger.debug(f"pubsub.send_all_on_topic({topic}, {message})") with self.__lock: @@ -42,13 +45,6 @@ def send_all_on_topic(self, topic: str, message: Any): for handler in self.__topic_subscribers[topic].values(): self.__send(handler, [topic, message]) - async def send_all_on_topic_async(self, topic: str, message: Any): - logger.debug(f"pubsub.send_all_on_topic_async({topic}, {message})") - async with self.__async_lock: - if topic in self.__topic_subscribers: - for handler in self.__topic_subscribers[topic].values(): - await self.__send_async(handler, [topic, message]) - def send_others(self, except_session_id: str, message: Any): logger.debug(f"pubsub.send_others({except_session_id}, {message})") with self.__lock: @@ -56,13 +52,6 @@ def send_others(self, except_session_id: str, message: Any): if except_session_id != session_id: self.__send(handler, [message]) - async def send_others_async(self, except_session_id: str, message: Any): - logger.debug(f"pubsub.send_others_async({except_session_id}, {message})") - async with self.__async_lock: - for session_id, handler in self.__subscribers.items(): - if except_session_id != session_id: - await self.__send_async(handler, [message]) - def send_others_on_topic(self, except_session_id: str, topic: str, message: Any): logger.debug( f"pubsub.send_others_on_topic({except_session_id}, {topic}, {message})" @@ -73,39 +62,27 @@ def send_others_on_topic(self, except_session_id: str, topic: str, message: Any) if except_session_id != session_id: self.__send(handler, [topic, message]) - async def send_others_on_topic_async( - self, except_session_id: str, topic: str, message: Any - ): - logger.debug( - f"pubsub.send_others_on_topic_async({except_session_id}, {topic}, {message})" - ) - async with self.__async_lock: - if topic in self.__topic_subscribers: - for session_id, handler in self.__topic_subscribers[topic].items(): - if except_session_id != session_id: - await self.__send_async(handler, [topic, message]) - def subscribe(self, session_id: str, handler: Callable): logger.debug(f"pubsub.subscribe({session_id})") with self.__lock: self.__subscribers[session_id] = handler - async def subscribe_async(self, session_id: str, handler): - logger.debug(f"pubsub.subscribe_async({session_id})") - async with self.__async_lock: - self.__subscribers[session_id] = handler - - def subscribe_topic(self, session_id: str, topic: str, handler: Callable): + def subscribe_topic( + self, + session_id: str, + topic: str, + handler: Union[Callable, Callable[..., Awaitable[Any]]], + ): logger.debug(f"pubsub.subscribe_topic({session_id}, {topic})") with self.__lock: self.__subscribe_topic(session_id, topic, handler) - async def subscribe_topic_async(self, session_id: str, topic: str, handler): - logger.debug(f"pubsub.subscribe_topic_async({session_id}, {topic})") - async with self.__async_lock: - self.__subscribe_topic(session_id, topic, handler) - - def __subscribe_topic(self, session_id: str, topic: str, handler): + def __subscribe_topic( + self, + session_id: str, + topic: str, + handler: Union[Callable, Callable[..., Awaitable[Any]]], + ): topic_subscribers = self.__topic_subscribers.get(topic) if topic_subscribers is None: topic_subscribers = {} @@ -122,21 +99,11 @@ def unsubscribe(self, session_id: str): with self.__lock: self.__unsubscribe(session_id) - async def unsubscribe_async(self, session_id: str): - logger.debug(f"pubsub.unsubscribe_async({session_id})") - async with self.__async_lock: - self.__unsubscribe(session_id) - def unsubscribe_topic(self, session_id: str, topic: str): logger.debug(f"pubsub.unsubscribe({session_id}, {topic})") with self.__lock: self.__unsubscribe_topic(session_id, topic) - async def unsubscribe_topic_async(self, session_id: str, topic: str): - logger.debug(f"pubsub.unsubscribe_topic_async({session_id}, {topic})") - async with self.__async_lock: - self.__unsubscribe_topic(session_id, topic) - def unsubscribe_all(self, session_id: str): logger.debug(f"pubsub.unsubscribe_all({session_id})") with self.__lock: @@ -145,14 +112,6 @@ def unsubscribe_all(self, session_id: str): for topic in self.__subscriber_topics[session_id].keys(): self.__unsubscribe_topic(session_id, topic) - async def unsubscribe_all_async(self, session_id: str): - logger.debug(f"pubsub.unsubscribe_all_async({session_id})") - async with self.__async_lock: - self.__unsubscribe(session_id) - if session_id in self.__subscriber_topics: - for topic in self.__subscriber_topics[session_id].keys(): - self.__unsubscribe_topic(session_id, topic) - def __unsubscribe(self, session_id: str): logger.debug(f"pubsub.__unsubscribe({session_id})") self.__subscribers.pop(session_id, None) @@ -170,16 +129,20 @@ def __unsubscribe_topic(self, session_id: str, topic: str): if len(subscriber_topics) == 0: self.__subscriber_topics.pop(session_id, None) - def __send(self, handler: Callable, args: Iterable): - th = threading.Thread( - target=handler, - args=args, - daemon=True, - ) - th.start() + def __send( + self, handler: Union[Callable, Callable[..., Awaitable[Any]]], args: Iterable + ): + assert self.__loop, "PubSub event loop is not set" - async def __send_async(self, handler, args): - asyncio.create_task(handler(*args)) + if asyncio.iscoroutinefunction(handler): + asyncio.run_coroutine_threadsafe(handler(*args), self.__loop) + else: + if self.__pool: + self.__loop.call_soon_threadsafe( + self.__loop.run_in_executor, self.__pool, handler, *args + ) + else: + handler(*args) class PubSub: @@ -191,54 +154,61 @@ def send_all(self, message: Any): self.__pubsub.send_all(message) async def send_all_async(self, message: Any): - await self.__pubsub.send_all_async(message) + warn("Obsolete. Use pubsub.send_all() method instead.") + self.send_all(message) def send_all_on_topic(self, topic: str, message: Any): self.__pubsub.send_all_on_topic(topic, message) async def send_all_on_topic_async(self, topic: str, message: Any): - await self.__pubsub.send_all_on_topic_async(topic, message) + warn("Obsolete. Use pubsub.send_all_on_topic() method instead.") + self.send_all_on_topic(topic, message) def send_others(self, message: Any): self.__pubsub.send_others(self.__session_id, message) async def send_others_async(self, message: Any): - await self.__pubsub.send_others_async(self.__session_id, message) + warn("Obsolete. Use pubsub.send_others() method instead.") + self.send_others(message) def send_others_on_topic(self, topic: str, message: Any): self.__pubsub.send_others_on_topic(self.__session_id, topic, message) async def send_others_on_topic_async(self, topic: str, message: Any): - await self.__pubsub.send_others_on_topic_async( - self.__session_id, topic, message - ) + warn("Obsolete. Use pubsub.send_others_on_topic() method instead.") + self.send_others_on_topic(topic, message) def subscribe(self, handler: Callable): self.__pubsub.subscribe(self.__session_id, handler) async def subscribe_async(self, handler: Callable): - await self.__pubsub.subscribe_async(self.__session_id, handler) + warn("Obsolete. Use pubsub.subscribe() method instead.") + self.subscribe(handler) def subscribe_topic(self, topic: str, handler: Callable): self.__pubsub.subscribe_topic(self.__session_id, topic, handler) async def subscribe_topic_async(self, topic: str, handler: Callable): - await self.__pubsub.subscribe_topic_async(self.__session_id, topic, handler) + warn("Obsolete. Use pubsub.subscribe_topic() method instead.") + self.subscribe_topic(topic, handler) def unsubscribe(self): self.__pubsub.unsubscribe(self.__session_id) async def unsubscribe_async(self): - await self.__pubsub.unsubscribe_async(self.__session_id) + warn("Obsolete. Use pubsub.unsubscribe() method instead.") + self.unsubscribe() def unsubscribe_topic(self, topic: str): self.__pubsub.unsubscribe_topic(self.__session_id, topic) async def unsubscribe_topic_async(self, topic: str): - await self.__pubsub.unsubscribe_topic_async(self.__session_id, topic) + warn("Obsolete. Use pubsub.unsubscribe_topic() method instead.") + self.unsubscribe_topic(topic) def unsubscribe_all(self): self.__pubsub.unsubscribe_all(self.__session_id) async def unsubscribe_all_async(self): - await self.__pubsub.unsubscribe_all_async(self.__session_id) + warn("Obsolete. Use pubsub.unsubscribe_all() method instead.") + self.unsubscribe_all() diff --git a/sdk/python/packages/flet/src/flet/fastapi/flet_app.py b/sdk/python/packages/flet/src/flet/fastapi/flet_app.py index 28b238680..093e3029a 100644 --- a/sdk/python/packages/flet/src/flet/fastapi/flet_app.py +++ b/sdk/python/packages/flet/src/flet/fastapi/flet_app.py @@ -24,7 +24,7 @@ PageCommandsBatchResponsePayload, RegisterWebClientRequestPayload, ) -from flet_core.utils import is_coroutine, random_string +from flet_core.utils import random_string from flet_runtime.pubsub import PubSubHub from flet_runtime.uploads import build_upload_url @@ -90,7 +90,7 @@ async def handle(self, websocket: WebSocket): async with _pubsubhubs_lock: psh = _pubsubhubs.get(self.__session_handler, None) if psh is None: - psh = PubSubHub() + psh = PubSubHub(loop=asyncio.get_running_loop(), pool=app_manager.pool) _pubsubhubs[self.__session_handler] = psh self.pubsubhub = psh @@ -133,7 +133,7 @@ async def __on_session_created(self, session_data): session_id = session_data.sessionID try: assert self.__session_handler is not None - if is_coroutine(self.__session_handler): + if asyncio.iscoroutinefunction(self.__session_handler): await self.__session_handler(self.__page) else: # run in thread pool