Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade CSP to Perspective 3.x #370

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import threading
from datetime import timedelta
from perspective import PerspectiveTornadoHandler, Server
from typing import Dict, Optional, Union

import csp
Expand All @@ -13,21 +14,20 @@
except ImportError:
raise ImportError("perspective adapter requires tornado package")


try:
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size

MAJOR, MINOR, PATCH = map(int, __version__.split("."))
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
if (MAJOR, MINOR, PATCH) < (3, 1, 0):
raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package")
except ImportError:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package")


# Run perspective update in a separate tornado loop
def perspective_thread(manager):
def perspective_thread(client):
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
client.set_loop_callback(loop.add_callback)
loop.start()


Expand All @@ -54,19 +54,17 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):


@csp.node
def _launch_application(port: int, manager: object, stub: ts[object]):
def _launch_application(port: int, server: Server, stub: ts[object]):
with csp.state():
s_app = None
s_ioloop = None
s_iothread = None

with csp.start():
from perspective import PerspectiveTornadoHandler

s_app = tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
(r"/websocket", PerspectiveTornadoHandler, {"perspective_server": server, "check_origin": True})
],
websocket_ping_interval=15,
)
Expand Down Expand Up @@ -197,20 +195,19 @@ def create_table(self, name, limit=None, index=None):

def _instantiate(self):
set_threadpool_size(self._threadpool_size)
server = Server()
client = server.new_local_client()

manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
thread.daemon = True
thread.start()

for table_name, table in self._tables.items():
schema = {
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
}
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)

_apply_updates(ptable, table.columns, self._throttle)

_launch_application(self._port, manager, csp.const("stub"))
_launch_application(self._port, server, csp.const("stub"))
22 changes: 15 additions & 7 deletions csp/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import Dict, Optional

import csp.baselib
Expand Down Expand Up @@ -198,7 +198,7 @@ def to_pandas_ts(self, trigger, window, tindex=None, wait_all_valid=True):

return make_pandas(trigger, self._data, window, tindex, wait_all_valid)

def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime: bool = False):
def to_perspective(self, client, starttime: datetime, endtime: datetime = None, realtime: bool = False):
import csp

try:
Expand Down Expand Up @@ -229,7 +229,7 @@ def join(self):
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")

@csp.node
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
with csp.alarms():
alarm = csp.alarm(bool)
with csp.state():
Expand All @@ -240,7 +240,7 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
s_buffer[-1][timecol] = csp.now()
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -250,9 +250,17 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
csp.schedule_alarm(alarm, throttle, True)

timecol = "time"
schema = {k: v.tstype.typ for k, v in self._data.items()}
schema[timecol] = datetime
table = perspective.Table(schema)
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}
schema = {k: perspective_type_map[v.tstype.typ] for k, v in self._data.items()}
schema[timecol] = "datetime"
table = client.table(schema)
runner = csp.run_on_thread(
apply_updates,
table,
Expand Down
71 changes: 55 additions & 16 deletions csp/impl/pandas_perspective.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
import pyarrow as pa
import pytz
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from pandas.compat import set_function_name
from typing import Optional

Expand Down Expand Up @@ -40,7 +41,8 @@ def _apply_updates(
if throttle > timedelta(0):
csp.schedule_alarm(alarm, throttle, True)
s_has_time_col = time_col and time_col not in data.keys()
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])

with csp.stop():
try:
Expand Down Expand Up @@ -81,14 +83,22 @@ def _apply_updates(
row[index_col] = idx
if s_has_time_col:
if localize:
row[time_col] = pytz.utc.localize(csp.now())
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
else:
row[time_col] = csp.now()
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
else:
row = new_rows[idx]

if localize and col in s_datetime_cols and value.tzinfo is None:
row[col] = pytz.utc.localize(value)
if col in s_date_cols:
row[col] = int(
datetime(year=value.year, month=value.month, day=value.day, tzinfo=timezone.utc).timestamp() * 1000
)

elif localize and col in s_datetime_cols:
if value.tzinfo is None:
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
else:
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
else:
row[col] = value

Expand Down Expand Up @@ -160,28 +170,41 @@ def __init__(
self._limit = limit
self._localize = localize

# TODO: we do not want 1 server per table, make a Client param?
self._psp_server = perspective.Server()
self._psp_client = self._psp_server.new_local_client()

self._basket = _frame_to_basket(data)
self._static_frame = data.csp.static_frame()
self._static_table = perspective.Table(self._static_frame)
self._static_table = self._psp_client.table(self._static_frame)
static_schema = self._static_table.schema()
# Since the index will be accounted for separately, remove the index from the static table schema,
# and re-enter it under index_col
raw_index_name = self._static_frame.index.name or "index"
index_type = static_schema.pop(raw_index_name)
schema = {index_col: index_type}
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}

if time_col:
schema[time_col] = datetime
schema[time_col] = "datetime"
for col, series in data.items():
if is_csp_type(series):
schema[col] = series.dtype.subtype
schema[col] = perspective_type_map[series.dtype.subtype]
else:
schema[col] = static_schema[col]

if self._keep_history:
self._table = perspective.Table(schema, index=None, limit=limit)
self._table = self._psp_client.table(schema, index=None, limit=limit)
self._static_records = self._static_frame.to_dict(orient="index")
else:
self._table = perspective.Table(schema, index=self._index_col)
self._table = self._psp_client.table(schema, index=self._index_col)
self._static_frame.index = self._static_frame.index.rename(self._index_col)
self._table.update(self._static_frame)
self._static_records = None # No need to update dynamically
Expand Down Expand Up @@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime):
index = self._index_col
if self._limit:
df = df.sort_values(self._time_col).tail(self._limit).reset_index(drop=True)
return perspective.Table(df.to_dict("series"), index=index)
return self._psp_client.table(df, index=index)

def run(self, starttime=None, endtime=timedelta(seconds=60), realtime=True, clear=False):
"""Run a graph that sends data to the table on the current thread.
Expand Down Expand Up @@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs):
"sort": [[self._time_col, "desc"]],
}
else:
kwargs = {"columns": list(self._table.schema())}
kwargs = {"columns": list(self._table.columns())}
kwargs.update(override_kwargs)
return perspective.PerspectiveWidget(self._table, **kwargs)

Expand All @@ -294,14 +317,30 @@ def _method(self, **options):

@classmethod
def _add_view_methods(cls):
cls.to_df = cls._create_view_method(perspective.View.to_df)
cls.to_dict = cls._create_view_method(perspective.View.to_dict)
cls.to_json = cls._create_view_method(perspective.View.to_json)
cls.to_csv = cls._create_view_method(perspective.View.to_csv)
cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
cls.to_columns = cls._create_view_method(perspective.View.to_columns)
cls.to_arrow = cls._create_view_method(perspective.View.to_arrow)

def to_df(self, **kwargs):
ipc_bytes = self.to_arrow()
table = pa.ipc.open_stream(ipc_bytes).read_all()
df = pd.DataFrame(table.to_pandas(**kwargs))

# DAVIS: `pyarrow` does not force alphabetical order on categories, so
# we correct this here to make assertions pass. We can enforce this in
# Perspective at a performance hit/API complexity.
for column in df:
if df[column].dtype == "datetime64[ms]":
df[column] = df[column].astype("datetime64[ns]")
elif df[column].dtype == "category":
df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values())

if df.index.dtype == "category":
df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values())

return df


CspPerspectiveTable._add_view_methods()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ def test_perspective(self):
starttime = datetime(2021, 4, 26)
endtime = starttime + timedelta(seconds=10)

_ = df.to_perspective(starttime, endtime)
server = perspective.Server()
client = server.new_local_client()

_ = df.to_perspective(client, starttime, endtime)

# realtime
widget = df.to_perspective(datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
widget = df.to_perspective(client, datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
import time

time.sleep(1)
Expand Down
Loading