Skip to content

Commit

Permalink
chore: Strip down unused stuff. #1796
Browse files Browse the repository at this point in the history
  • Loading branch information
mturoci committed Feb 1, 2023
1 parent c98d704 commit 2b246a6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 129 deletions.
2 changes: 1 addition & 1 deletion py/h2o_wavelite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
.. include:: ../docs/index.md
"""
from .core import AsyncSite, Ref, data, pack, Expando, expando_to_dict, clone_expando, copy_expando
from .core import Ref, data, pack, Expando, expando_to_dict, clone_expando, copy_expando
from .server import Q, wave_serve
from .routing import on, handle_on
from .types import *
Expand Down
41 changes: 7 additions & 34 deletions py/h2o_wavelite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ class PageBase:
url: The URL of the remote page.
"""

def __init__(self, url: str):
self.url = url
def __init__(self):
self._changes = []

def add(self, key: str, card: Any) -> Ref:
Expand Down Expand Up @@ -412,46 +411,20 @@ def __delitem__(self, key: str):

class AsyncPage(PageBase):
"""
Represents a reference to a remote Wave page. Similar to `h2o_wave.core.Page` except that this class exposes ``async`` methods.
Args:
site: The parent site.
url: The URL of this page.
Represents a reference to a Wave page.
"""

def __init__(self, site: 'AsyncSite', url: str):
self.site = site
super().__init__(url)
def __init__(self, send: Optional[Callable] = None):
self.send = send
super().__init__()

async def save(self):
"""
Save the page. Sends all local changes made to this page to the remote site.
Save the page. Sends all local changes made to this page to the browser.
"""
p = self._diff()
if p:
logger.debug(p)
await self.site._save(p)


class AsyncSite:
"""
Represents a reference to the remote Wave site. Similar to `h2o_wave.core.Site` except that this class exposes `async` methods.
"""

def __init__(self, send: Optional[Callable] = None):
self.send = send

def __getitem__(self, url) -> AsyncPage:
return AsyncPage(self, url)

def __delitem__(self, key: str):
page = self[key]
page.drop()

async def _save(self, patch: str):
await self.send(patch)

await self.send(p)

def marshal(d: Any) -> str:
"""
Expand Down
99 changes: 5 additions & 94 deletions py/h2o_wavelite/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import contextvars
import functools
import json
import logging
import os
import pickle
import traceback
from concurrent.futures import Executor
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple

from .core import AsyncSite, Expando, expando_to_dict, marshal
from .core import AsyncPage, Expando
from .ui import markdown_card

logger = logging.getLogger(__name__)


def _noop(): pass


def _session_for(sessions: dict, session_id: str):
session = sessions.get(session_id, None)
if session is None:
Expand All @@ -51,17 +44,14 @@ class Query:

def __init__(
self,
site: AsyncSite,
client_id: str,
page: AsyncPage,
app_state: Expando,
user_state: Expando,
client_state: Expando,
args: Expando,
events: Expando,
):
self.site = site
"""A reference to the current site."""
self.page = site[f'/{client_id}']
self.page = page
"""A reference to the current page."""
self.app = app_state
"""A `h2o_wave.core.Expando` instance to hold application-specific state."""
Expand All @@ -74,67 +64,6 @@ def __init__(
self.events = events
"""A `h2o_wave.core.Expando` instance containing events from the active request."""

async def sleep(self, delay: float, result=None) -> Any:
"""
Suspend execution for the specified number of seconds.
Always use `q.sleep()` instead of `time.sleep()` in Wave apps.
Args:
delay: Number of seconds to sleep.
result: Result to return after delay, if any.
Returns:
The `result` argument, if any, as is.
"""
return await asyncio.sleep(delay, result)

async def exec(self, executor: Optional[Executor], func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Execute a function in the background using the specified executor.
To execute a function in-process, use `q.run()`.
Args:
executor: The executor to be used. If None, executes the function in-process.
func: The function to to be called.
args: Arguments to be passed to the function.
kwargs: Keywords arguments to be passed to the function.
Returns:
The result of the function call.
"""
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)

loop = asyncio.get_event_loop()

if contextvars is not None: # Python 3.7+ only.
return await loop.run_in_executor(
executor,
contextvars.copy_context().run,
functools.partial(func, *args, **kwargs)
)

if kwargs:
return await loop.run_in_executor(executor, functools.partial(func, *args, **kwargs))

return await loop.run_in_executor(executor, func, *args)

async def run(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Execute a function in the background, in-process.
Equivalent to calling `q.exec()` without an executor.
Args:
func: The function to to be called.
args: Arguments to be passed to the function.
kwargs: Keywords arguments to be passed to the function.
Returns:
The result of the function call.
"""
return await self.exec(None, func, *args, **kwargs)


Q = Query
"""Alias for Query context."""
Expand All @@ -152,7 +81,7 @@ def __init__(self, handle: HandleAsync, send: Optional[Callable] = None, recv: O
self._recv = recv
self._handle = handle
self._state: WebAppState = _load_state()
self._site: AsyncSite = AsyncSite(send)
self._page: AsyncPage = AsyncPage(send)

async def _run(self):
# Handshake.
Expand All @@ -178,8 +107,7 @@ async def _process(self, args: dict):
events_state = {k: Expando(v) for k, v in events_state.items()}
del args['']
q = Q(
site=self._site,
client_id='',
page=self._page,
app_state=app_state,
user_state=_session_for(user_state, ''),
client_state=_session_for(client_state, ''),
Expand All @@ -204,9 +132,6 @@ async def _process(self, args: dict):
except:
logger.exception('Failed transmitting unhandled exception')

def _shutdown(self):
_save_state(self._state)


_CHECKPOINT_DIR_ENV_VAR = 'H2O_WAVE_CHECKPOINT_DIR'

Expand Down Expand Up @@ -256,20 +181,6 @@ def _load_state() -> WebAppState:
return _empty_state()


def _save_state(state: WebAppState):
f = _get_checkpoint_file_path()
if not f:
return

app_state, sessions, _ = state
checkpoint = (
expando_to_dict(app_state),
{k: expando_to_dict(v) for k, v in sessions.items()},
)
logger.info(f'Creating checkpoint at {f} ...')
with open(f, 'wb') as p:
pickle.dump(checkpoint, p)

async def _parse_msg(msg: str) -> Optional[dict]:
# protocol: t addr data
parts = msg.split(' ', 3)
Expand Down

0 comments on commit 2b246a6

Please sign in to comment.