diff --git a/py/server/deephaven/__init__.py b/py/server/deephaven/__init__.py index a7a0688ddfb..55f3027e12c 100644 --- a/py/server/deephaven/__init__.py +++ b/py/server/deephaven/__init__.py @@ -22,11 +22,12 @@ from .csv import write as write_csv from .stream.kafka import consumer as kafka_consumer from .stream.kafka import producer as kafka_producer -from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table +from .table_factory import empty_table, time_table, merge, merge_sorted, new_table, DynamicTableWriter, input_table, \ + ring_table from .replay import TableReplayer from ._gc import garbage_collect from .dbc import read_sql __all__ = ["read_csv", "write_csv", "kafka_consumer", "kafka_producer", "empty_table", "time_table", "merge", - "merge_sorted", "new_table", "input_table", "DynamicTableWriter", "TableReplayer", "garbage_collect", - "read_sql", "DHError", "SortDirection"] + "merge_sorted", "new_table", "input_table", "ring_table", "DynamicTableWriter", "TableReplayer", + "garbage_collect", "read_sql", "DHError", "SortDirection"] diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index e0c815b4192..256488eeb01 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -88,6 +88,7 @@ class NodeType(Enum): include_constituent=True. The constituent level is the lowest in a rollup table. These nodes have column names and types from the source table of the RollupTable. """ + class SearchDisplayMode(Enum): """An enum of search display modes for layout hints""" DEFAULT = _JSearchDisplayMode.Default @@ -97,6 +98,7 @@ class SearchDisplayMode(Enum): HIDE = _JSearchDisplayMode.Hide """Hide the search bar, regardless of user or system settings.""" + class _FormatOperationsRecorder(Protocol): """A mixin for creating format operations to be applied to individual nodes of either RollupTable or TreeTable.""" diff --git a/py/server/deephaven/table_factory.py b/py/server/deephaven/table_factory.py index 9f83877c816..0afc0918543 100644 --- a/py/server/deephaven/table_factory.py +++ b/py/server/deephaven/table_factory.py @@ -26,6 +26,7 @@ _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") _JTable = jpy.get_type("io.deephaven.engine.table.Table") _J_INPUT_TABLE_ATTRIBUTE = _JTable.INPUT_TABLE_ATTRIBUTE +_JRingTableTools = jpy.get_type("io.deephaven.engine.table.impl.sources.ring.RingTableTools") def empty_table(size: int) -> Table: @@ -291,3 +292,27 @@ def input_table(col_defs: Dict[str, DType] = None, init_table: Table = None, DHError """ return InputTable(col_defs=col_defs, init_table=init_table, key_cols=key_cols) + + +def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table: + """Creates a ring table that retains the latest 'capacity' number of rows from the parent table. + Latest rows are determined solely by the new rows added to the parent table, deleted rows are ignored, + and updated rows are not expected and will raise an exception. + + Ring table is mostly used with stream tables which do not retain their own data for more than an update cycle. + + Args: + parent (Table): the parent table + capacity (int): the capacity of the ring table + initialize (bool): whether to initialize the ring table with a snapshot of the parent table, default is True + + Returns: + a Table + + Raises: + DHError + """ + try: + return Table(j_table=_JRingTableTools.of(parent.j_table, capacity, initialize)) + except Exception as e: + raise DHError(e, "failed to create a ring table.") from e diff --git a/py/server/tests/test_table_factory.py b/py/server/tests/test_table_factory.py index 17eaabe0564..483ae31d2e6 100644 --- a/py/server/tests/test_table_factory.py +++ b/py/server/tests/test_table_factory.py @@ -12,7 +12,7 @@ from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \ string_col, datetime_col, pyobj_col, jobj_col from deephaven.constants import NULL_DOUBLE, NULL_FLOAT, NULL_LONG, NULL_INT, NULL_SHORT, NULL_BYTE -from deephaven.table_factory import DynamicTableWriter +from deephaven.table_factory import DynamicTableWriter, ring_table from tests.testbase import BaseTestCase JArrayList = jpy.get_type("java.util.ArrayList") @@ -274,6 +274,30 @@ def test_input_table(self): keyed_input_table.delete(t.select(["String", "Double"])) self.assertEqual(keyed_input_table.size, 0) + def test_ring_table(self): + cols = [ + bool_col(name="Boolean", data=[True, False]), + byte_col(name="Byte", data=(1, -1)), + char_col(name="Char", data='-1'), + short_col(name="Short", data=[1, -1]), + int_col(name="Int", data=[1, -1]), + long_col(name="Long", data=[1, -1]), + long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)), + float_col(name="Float", data=[1.01, -1.01]), + double_col(name="Double", data=[1.01, -1.01]), + string_col(name="String", data=["foo", "bar"]), + ] + t = new_table(cols=cols) + keyed_input_table = input_table(init_table=t, key_cols=["String"]) + ring_t = ring_table(parent=keyed_input_table, capacity=6, initialize=False) + for i in range(5): + keyed_input_table.delete(t.select(["String"])) + keyed_input_table.add(t) + self.assertTrue(keyed_input_table.is_refreshing) + self.assertEqual(keyed_input_table.size, 2) + self.assertTrue(ring_t.is_refreshing) + self.wait_ticking_table_update(ring_t, 6, 5) + if __name__ == '__main__': unittest.main()