Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap ring table #3765

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions py/server/deephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
from .csv import write as write_csv
from .stream.kafka import consumer as kafka_consumer
from .stream.kafka import producer as kafka_producer
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table, \
ring_table
from .replay import TableReplayer
from ._gc import garbage_collect
from .dbc import read_sql

__all__ = ["read_csv", "write_csv", "kafka_consumer", "kafka_producer", "empty_table", "time_table", "merge",
"merge_sorted", "new_table", "input_table", "DynamicTableWriter", "TableReplayer", "garbage_collect",
"read_sql", "DHError", "SortDirection"]
"merge_sorted", "new_table", "input_table", "ring_table", "DynamicTableWriter", "TableReplayer",
"garbage_collect", "read_sql", "DHError", "SortDirection"]
2 changes: 2 additions & 0 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class NodeType(Enum):
include_constituent=True. The constituent level is the lowest in a rollup table. These nodes have column names
and types from the source table of the RollupTable. """


class SearchDisplayMode(Enum):
"""An enum of search display modes for layout hints"""
DEFAULT = _JSearchDisplayMode.Default
Expand All @@ -97,6 +98,7 @@ class SearchDisplayMode(Enum):
HIDE = _JSearchDisplayMode.Hide
"""Hide the search bar, regardless of user or system settings."""


class _FormatOperationsRecorder(Protocol):
"""A mixin for creating format operations to be applied to individual nodes of either RollupTable or TreeTable."""

Expand Down
25 changes: 25 additions & 0 deletions py/server/deephaven/table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_JTable = jpy.get_type("io.deephaven.engine.table.Table")
_J_INPUT_TABLE_ATTRIBUTE = _JTable.INPUT_TABLE_ATTRIBUTE
_JRingTableTools = jpy.get_type("io.deephaven.engine.table.impl.sources.ring.RingTableTools")


def empty_table(size: int) -> Table:
Expand Down Expand Up @@ -291,3 +292,27 @@ def input_table(col_defs: Dict[str, DType] = None, init_table: Table = None,
DHError
"""
return InputTable(col_defs=col_defs, init_table=init_table, key_cols=key_cols)


def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table:
"""Creates a ring table that retains the latest 'capacity' number of rows from the parent table.
Latest rows are determined solely by the new rows added to the parent table, deleted rows are ignored,
and updated rows are not expected and will raise an exception.

Ring table is mostly used with stream tables which do not retain their own data for more than an update cycle.

Args:
parent (Table): the parent table
capacity (int): the capacity of the ring table
initialize (bool): whether to initialize the ring table with a snapshot of the parent table, default is True

Returns:
a Table

Raises:
DHError
"""
try:
return Table(j_table=_JRingTableTools.of(parent.j_table, capacity, initialize))
except Exception as e:
raise DHError(e, "failed to create a ring table.") from e
26 changes: 25 additions & 1 deletion py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \
string_col, datetime_col, pyobj_col, jobj_col
from deephaven.constants import NULL_DOUBLE, NULL_FLOAT, NULL_LONG, NULL_INT, NULL_SHORT, NULL_BYTE
from deephaven.table_factory import DynamicTableWriter
from deephaven.table_factory import DynamicTableWriter, ring_table
from tests.testbase import BaseTestCase

JArrayList = jpy.get_type("java.util.ArrayList")
Expand Down Expand Up @@ -274,6 +274,30 @@ def test_input_table(self):
keyed_input_table.delete(t.select(["String", "Double"]))
self.assertEqual(keyed_input_table.size, 0)

def test_ring_table(self):
cols = [
bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=(1, -1)),
char_col(name="Char", data='-1'),
short_col(name="Short", data=[1, -1]),
int_col(name="Int", data=[1, -1]),
long_col(name="Long", data=[1, -1]),
long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
float_col(name="Float", data=[1.01, -1.01]),
double_col(name="Double", data=[1.01, -1.01]),
string_col(name="String", data=["foo", "bar"]),
]
t = new_table(cols=cols)
keyed_input_table = input_table(init_table=t, key_cols=["String"])
ring_t = ring_table(parent=keyed_input_table, capacity=6, initialize=False)
for i in range(5):
keyed_input_table.delete(t.select(["String"]))
keyed_input_table.add(t)
self.assertTrue(keyed_input_table.is_refreshing)
self.assertEqual(keyed_input_table.size, 2)
self.assertTrue(ring_t.is_refreshing)
self.wait_ticking_table_update(ring_t, 6, 5)


if __name__ == '__main__':
unittest.main()