Skip to content

Commit

Permalink
Add UpdateGraph wrapper and fix typehints (#3947)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver authored and devinrsmith committed Jun 14, 2023
1 parent f9b92ba commit 8e01ac8
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 82 deletions.
7 changes: 3 additions & 4 deletions py/server/deephaven/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.jcompat import to_sequence
from deephaven.update_graph import UpdateGraph

_JExecutionContext = jpy.get_type("io.deephaven.engine.context.ExecutionContext")
_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraph")


class ExecutionContext(JObjectWrapper, ContextDecorator):
Expand All @@ -38,8 +38,8 @@ def j_object(self) -> jpy.JType:
return self.j_exec_ctx

@property
def update_graph(self) -> _JUpdateGraph:
return self.j_exec_ctx.getUpdateGraph()
def update_graph(self) -> UpdateGraph:
return UpdateGraph(j_update_graph=self.j_exec_ctx.getUpdateGraph())

def __init__(self, j_exec_ctx):
self.j_exec_ctx = j_exec_ctx
Expand Down Expand Up @@ -92,4 +92,3 @@ def get_exec_ctx() -> ExecutionContext:
a ExecutionContext
"""
return ExecutionContext(j_exec_ctx=_JExecutionContext.getContext())

19 changes: 8 additions & 11 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from deephaven.filters import Filter, and_, or_
from deephaven.jcompat import j_unary_operator, j_binary_operator, j_map_to_dict, j_hashmap
from deephaven.jcompat import to_sequence, j_array_list
from deephaven.update_graph import auto_locking_ctx
from deephaven.update_graph import auto_locking_ctx, UpdateGraph
from deephaven.updateby import UpdateByOperation

# Table
Expand All @@ -42,7 +42,6 @@
_JLayoutHintBuilder = jpy.get_type("io.deephaven.engine.util.LayoutHintBuilder")
_JSearchDisplayMode = jpy.get_type("io.deephaven.engine.util.LayoutHintBuilder$SearchDisplayModes")
_JSnapshotWhenOptions = jpy.get_type("io.deephaven.api.snapshot.SnapshotWhenOptions")
_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraph")

# PartitionedTable
_JPartitionedTable = jpy.get_type("io.deephaven.engine.table.PartitionedTable")
Expand Down Expand Up @@ -540,13 +539,11 @@ def is_refreshing(self) -> bool:
return self._is_refreshing

@property
def update_graph(self) -> _JUpdateGraph:
"""None if not refreshing otherwise is this table's update graph."""
if self.is_refreshing:
if self._update_graph is None:
self._update_graph = self.j_table.getUpdateGraph()
return self._update_graph
return None
def update_graph(self) -> UpdateGraph:
"""The update graph of the table."""
if self._update_graph is None:
self._update_graph = UpdateGraph(self.j_table.getUpdateGraph())
return self._update_graph

@property
def is_flat(self) -> bool:
Expand Down Expand Up @@ -2310,7 +2307,7 @@ def table(self) -> Table:
return self._table

@property
def update_graph(self) -> _JUpdateGraph:
def update_graph(self) -> UpdateGraph:
"""The underlying partitioned table's update graph."""
return self.table.update_graph

Expand Down Expand Up @@ -2571,7 +2568,7 @@ def is_refreshing(self) -> bool:
return self.target.is_refreshing

@property
def update_graph(self) -> _JUpdateGraph:
def update_graph(self) -> UpdateGraph:
"""The underlying partitioned table proxy's update graph."""
return self.target.update_graph

Expand Down
6 changes: 3 additions & 3 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
from deephaven.jcompat import to_sequence
from deephaven.numpy import column_to_numpy_array
from deephaven.table import Table
from deephaven.update_graph import UpdateGraph

_JPythonListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonListenerAdapter")
_JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter")
_JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate")
_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonListenerTableUpdateDataReader")
_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraph")


def _col_defs(table: Table, cols: Union[str, List[str]]) -> List[Column]:
Expand Down Expand Up @@ -238,14 +238,14 @@ def modified_columns(self) -> List[str]:
return list(cols) if cols else []


def _do_locked(ug: Union[_JUpdateGraph, Table], f: Callable, lock_type="shared") -> None:
def _do_locked(ug: Union[UpdateGraph, Table], f: Callable, lock_type="shared") -> None:
"""Executes a function while holding the UpdateGraph (UG) lock. Holding the UG lock
ensures that the contents of a table will not change during a computation, but holding
the lock also prevents table updates from happening. The lock should be held for as little
time as possible.
Args:
ug (Union[_JUpdateGraph, Table]): The Update Graph (UG) or a table-like object.
ug (Union[UpdateGraph, Table]): The Update Graph (UG) or a table-like object.
f (Callable): callable to execute while holding the UG lock, could be function or an object with an 'apply'
attribute which is callable
lock_type (str): UG lock type, valid values are "exclusive" and "shared". "exclusive" allows only a single
Expand Down
103 changes: 46 additions & 57 deletions py/server/deephaven/update_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@

import contextlib
from functools import wraps
from typing import Callable, Union
from typing import Callable, Union, Optional

import jpy

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper

_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.UpdateGraph")

Expand All @@ -22,62 +23,61 @@
be released after the table operation finishes. Auto locking is turned on by default."""


def has_exclusive_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
class UpdateGraph(JObjectWrapper):
j_object_type = _JUpdateGraph

@property
def j_object(self) -> jpy.JType:
return self.j_update_graph

def __init__(self, j_update_graph: jpy.JType):
self.j_update_graph = j_update_graph


def has_exclusive_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread is holding the provided Update Graph's (UG) exclusive lock.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread is holding the Update Graph (UG) exclusive lock, False otherwise.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

return ug.exclusiveLock().isHeldByCurrentThread()
return ug.j_update_graph.exclusiveLock().isHeldByCurrentThread()


def has_shared_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
def has_shared_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> bool:
"""Checks if the current thread is holding the provided Update Graph's (UG) shared lock.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
Returns:
True if the current thread is holding the Update Graph (UG) shared lock, False otherwise.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

return ug.sharedLock().isHeldByCurrentThread()
return ug.j_update_graph.sharedLock().isHeldByCurrentThread()


@contextlib.contextmanager
def exclusive_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
def exclusive_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
"""Context manager for running a block of code under an Update Graph (UG) exclusive lock.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
Args:
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

lock = ug.exclusiveLock()
lock = ug.j_update_graph.exclusiveLock()
lock.lock()
try:
yield
Expand All @@ -88,21 +88,18 @@ def exclusive_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "Partit


@contextlib.contextmanager
def shared_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
def shared_lock(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]):
"""Context manager for running a block of code under an Update Graph (UG) shared lock.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

lock = ug.sharedLock()
lock = ug.j_update_graph.sharedLock()

lock.lock()
try:
yield
Expand All @@ -112,19 +109,15 @@ def shared_lock(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "Partition
lock.unlock()


def exclusive_locked(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
def exclusive_locked(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
"""A decorator that ensures the decorated function be called under the Update Graph (UG) exclusive
lock. The lock is released after the function returns regardless of what happens inside the function.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

def inner_wrapper(f: Callable) -> Callable:
Expand All @@ -138,19 +131,15 @@ def do_locked(*arg, **kwargs):
return inner_wrapper


def shared_locked(ug: Union[_JUpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
def shared_locked(ug: Union[UpdateGraph, "Table", "PartitionedTable", "PartitionTableProxy"]) -> Callable:
"""A decorator that ensures the decorated function be called under the Update Graph (UG) shared lock.
The lock is released after the function returns regardless of what happens inside the function.
Args:
ug (Union[_JUpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a table-like object.
ug (Union[UpdateGraph, Table, PartitionedTable, PartitionTableProxy]): The Update Graph (UG) or a
table-like object.
"""
from deephaven.table import Table, PartitionedTable, PartitionedTableProxy
if isinstance(ug, Table):
ug = ug.update_graph
if isinstance(ug, PartitionedTable):
ug = ug.update_graph
if isinstance(ug, PartitionedTableProxy):
if not isinstance(ug, UpdateGraph):
ug = ug.update_graph

def inner_wrapper(f: Callable) -> Callable:
Expand All @@ -164,7 +153,7 @@ def do_locked(*arg, **kwargs):
return inner_wrapper


def _is_arg_refreshing(arg):
def _is_arg_refreshing(arg) -> bool:
if isinstance(arg, list) or isinstance(arg, tuple):
for e in arg:
if _is_arg_refreshing(e):
Expand All @@ -175,7 +164,7 @@ def _is_arg_refreshing(arg):
return False


def _first_refreshing_table(*args, **kwargs):
def _first_refreshing_table(*args, **kwargs) -> Optional["Table"]:
for arg in args:
if _is_arg_refreshing(arg):
return arg
Expand Down
14 changes: 7 additions & 7 deletions py/server/test_helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ def start_jvm(jvm_props: Dict[str, str] = None):
# Start up the JVM
jpy.VerboseExceptions.enabled = True
jvm.init_jvm(
jvm_classpath=_expandWildcardsInList(jvm_classpath.split(os.path.pathsep)),
jvm_classpath=_expand_wildcards_in_list(jvm_classpath.split(os.path.pathsep)),
jvm_properties=jvm_properties,
jvm_options=jvm_options
)

# Set up a Deephaven Python session
py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals()
global py_dh_session
_JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph")
test_update_graph = _JUpdateGraph.newBuilder("PYTHON_TEST").existingOrBuild()
_JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph")
_j_test_update_graph = _JPeriodicUpdateGraph.newBuilder("PYTHON_TEST").existingOrBuild()
_JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession")
py_dh_session = _JPythonScriptSession(test_update_graph, py_scope_jpy)
py_dh_session = _JPythonScriptSession(_j_test_update_graph, py_scope_jpy)


def _expandWildcardsInList(elements):
def _expand_wildcards_in_list(elements):
"""
Takes list of strings, possibly containing wildcard characters, and returns the corresponding full list. This is
intended for appropriately expanding classpath entries.
Expand All @@ -84,11 +84,11 @@ def _expandWildcardsInList(elements):

new_list = []
for element in elements:
new_list.extend(_expandWildcardsInItem(element))
new_list.extend(_expand_wildcards_in_item(element))
return _flatten(new_list)


def _expandWildcardsInItem(element):
def _expand_wildcards_in_item(element):
"""
Java classpaths can include wildcards (``<path>/*`` or ``<path>/*.jar``), but the way we are invoking the jvm
directly bypasses this expansion. This will expand a classpath element into an array of elements.
Expand Down

0 comments on commit 8e01ac8

Please sign in to comment.