Skip to content

Commit

Permalink
Deprecate legacy type hints such as [int] and replace with python-sta…
Browse files Browse the repository at this point in the history
…ndard type hints such as List[int] (#331)

* Deprecate legacy type hints such as [int] and replace with python-standard hints such as List[int]. Left existing cases in test_parsing.py and test_type_Checking.py to ensure they still work for now

Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>

* Revert Callable[List in csp.apply for older python compat

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>

* Cleanup of typing imports in user-facing modules

Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>

* Disable deprecation warnings for now until python 3.8 support is dropped. This will enable users on 3.9 and above to use proper type hints without having to import typing

Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>

---------

Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
Co-authored-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
  • Loading branch information
ptomecek and timkpaine authored Jul 25, 2024
1 parent dce7312 commit fac1334
Show file tree
Hide file tree
Showing 26 changed files with 566 additions and 547 deletions.
12 changes: 5 additions & 7 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing
from datetime import datetime, timedelta
from enum import IntEnum
from typing import TypeVar, Union
from uuid import uuid4

import csp
Expand All @@ -18,7 +18,7 @@
from csp.lib import _kafkaadapterimpl

_ = BytesMessageProtoMapper, DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper
T = typing.TypeVar("T")
T = TypeVar("T")


class KafkaStatusMessageType(IntEnum):
Expand All @@ -39,7 +39,7 @@ class KafkaAdapterManager:
def __init__(
self,
broker,
start_offset: typing.Union[KafkaStartOffset, timedelta, datetime] = None,
start_offset: Union[KafkaStartOffset, timedelta, datetime] = None,
group_id: str = None,
group_id_prefix: str = "",
max_threads=4,
Expand Down Expand Up @@ -132,7 +132,7 @@ def subscribe(
# Leave key None to subscribe to all messages on the topic
# Note that if you subscribe to all messages, they are always flagged as "live" and cant be replayed in engine time
key=None,
field_map: typing.Union[dict, str] = None,
field_map: Union[dict, str] = None,
meta_field_map: dict = None,
push_mode: csp.PushMode = csp.PushMode.LAST_VALUE,
adjust_out_of_order_time: bool = False,
Expand All @@ -154,9 +154,7 @@ def subscribe(

return _kafka_input_adapter_def(self, ts_type, properties, push_mode)

def publish(
self, msg_mapper: MsgMapper, topic: str, key: str, x: ts["T"], field_map: typing.Union[dict, str] = None
):
def publish(self, msg_mapper: MsgMapper, topic: str, key: str, x: ts["T"], field_map: Union[dict, str] = None):
if isinstance(field_map, str):
field_map = {"": field_map}

Expand Down
6 changes: 3 additions & 3 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import threading
import typing
from datetime import timedelta
from typing import Dict, Optional, Union

import csp
from csp import ts
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(self, name, limit, index):
self.index = index
self.columns = {}

def publish(self, value: ts[object], field_map: typing.Union[typing.Dict[str, str], str, None] = None):
def publish(self, value: ts[object], field_map: Union[Dict[str, str], str, None] = None):
"""
:param value - timeseries to publish onto this table
:param field_map: if publishing structs, a dictionary of struct field -> perspective fieldname ( if None will pass struct fields as is )
Expand All @@ -161,7 +161,7 @@ def publish(self, value: ts[object], field_map: typing.Union[typing.Dict[str, st
raise TypeError("Expected type str for field_map on single column publish, got %s" % type(field_map))
self._publish_field(value, field_map)

def _publish_struct(self, value: ts[csp.Struct], field_map: typing.Optional[typing.Dict[str, str]]):
def _publish_struct(self, value: ts[csp.Struct], field_map: Optional[Dict[str, str]]):
field_map = field_map or {k: k for k in value.tstype.typ.metadata()}
for k, v in field_map.items():
self._publish_field(getattr(value, k), v)
Expand Down
2 changes: 1 addition & 1 deletion csp/adapters/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def on_tick(self, time, value):
_slack_input_adapter = py_push_adapter_def(
name="SlackInputAdapter",
adapterimpl=SlackInputAdapterImpl,
out_type=ts[[SlackMessage]],
out_type=ts[List[SlackMessage]],
manager_type=SlackAdapterManager,
)
_slack_output_adapter = py_output_adapter_def(
Expand Down
11 changes: 5 additions & 6 deletions csp/adapters/websocket.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
import math
import threading
import typing
import urllib
from collections import defaultdict
from datetime import date, datetime, timedelta
from typing import Dict, List
from typing import Dict, List, Optional, TypeVar, Union

import csp
from csp import ts
Expand All @@ -31,7 +30,7 @@
RawBytesMessageMapper,
RawTextMessageMapper,
)
T = typing.TypeVar("T")
T = TypeVar("T")


try:
Expand Down Expand Up @@ -331,7 +330,7 @@ def __init__(self, name, index):
def publish(
self,
value: ts[object],
field_map: typing.Union[typing.Dict[str, str], str, None] = None,
field_map: Union[Dict[str, str], str, None] = None,
):
"""
:param value - timeseries to publish onto this table
Expand All @@ -345,7 +344,7 @@ def publish(
raise TypeError("Expected type str for field_map on single column publish, got %s" % type(field_map))
self._publish_field(value, field_map)

def _publish_struct(self, value: ts[csp.Struct], field_map: typing.Optional[typing.Dict[str, str]]):
def _publish_struct(self, value: ts[csp.Struct], field_map: Optional[Dict[str, str]]):
field_map = field_map or {k: k for k in value.tstype.typ.metadata()}
for k, v in field_map.items():
self._publish_field(getattr(value, k), v)
Expand Down Expand Up @@ -427,7 +426,7 @@ def subscribe(
self,
ts_type: type,
msg_mapper: MsgMapper,
field_map: typing.Union[dict, str] = None,
field_map: Union[dict, str] = None,
meta_field_map: dict = None,
push_mode: csp.PushMode = csp.PushMode.NON_COLLAPSING,
):
Expand Down
68 changes: 35 additions & 33 deletions csp/baselib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import pytz
import queue
import threading
import typing
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, TypeVar, Union

import csp
from csp.impl.__cspimpl import _cspimpl
Expand Down Expand Up @@ -63,11 +63,11 @@
"wrap_feedback",
]

T = typing.TypeVar("T")
K = typing.TypeVar("K")
V = typing.TypeVar("V")
Y = typing.TypeVar("Y")
U = typing.TypeVar("U")
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
Y = TypeVar("Y")
U = TypeVar("U")

const = input_adapter_def("csp.const", _cspimpl._const, ts["T"], value="~T", delay=(timedelta, timedelta()))
_timer = input_adapter_def(
Expand Down Expand Up @@ -156,13 +156,13 @@ def get_instance(cls):


@node
def _list_basket_to_string_ts(x: [ts["T"]]) -> ts[str]:
def _list_basket_to_string_ts(x: List[ts["T"]]) -> ts[str]:
value = ",".join([str(x[i]) if csp.ticked(x[i]) else "" for i in range(len(x))])
return f"[{value}]"


@node
def _dict_basket_to_string_ts(x: {"K": ts[object]}) -> ts[str]:
def _dict_basket_to_string_ts(x: Dict["K", ts[object]]) -> ts[str]:
return str({k: x[k] for k in x.tickedkeys()})


Expand Down Expand Up @@ -204,7 +204,7 @@ def log(
level: int,
tag: str,
x,
logger: typing.Optional[logging.Logger] = None,
logger: Optional[logging.Logger] = None,
logger_tz: object = None,
use_thread: bool = False,
):
Expand Down Expand Up @@ -243,7 +243,7 @@ def _log_ts(
level: int,
tag: str,
x: ts["T"],
logger: typing.Optional[logging.Logger] = None,
logger: Optional[logging.Logger] = None,
logger_tz: object = None,
use_thread: bool = False,
):
Expand Down Expand Up @@ -274,8 +274,8 @@ def _log_ts(


@graph
def get_basket_field(dict_basket: {"K": ts["V"]}, field_name: str) -> OutputBasket(
{"K": ts[object]}, shape_of="dict_basket"
def get_basket_field(dict_basket: Dict["K", ts["V"]], field_name: str) -> OutputBasket(
Dict["K", ts[object]], shape_of="dict_basket"
):
"""Given a dict basket of Struct objects, get a dict basket of the given field of struct for the matching key
Expand Down Expand Up @@ -343,7 +343,7 @@ def _delay_by_ticks(x: ts["T"], delay: int) -> ts["T"]:


@graph
def delay(x: ts["T"], delay: typing.Union[timedelta, int]) -> ts["T"]:
def delay(x: ts["T"], delay: Union[timedelta, int]) -> ts["T"]:
"""delay input ticks by given delay"""
if isinstance(delay, int):
return _delay_by_ticks(x, delay)
Expand All @@ -352,7 +352,7 @@ def delay(x: ts["T"], delay: typing.Union[timedelta, int]) -> ts["T"]:


@graph
def _lag(x: ts["T"], lag: typing.Union[timedelta, int]) -> ts["T"]:
def _lag(x: ts["T"], lag: Union[timedelta, int]) -> ts["T"]:
"""ticks when input ticks, but with lagged value of input"""
if isinstance(lag, int):
return _delay_by_ticks(x, lag)
Expand All @@ -361,7 +361,7 @@ def _lag(x: ts["T"], lag: typing.Union[timedelta, int]) -> ts["T"]:


@graph
def diff(x: ts["T"], lag: typing.Union[timedelta, int]) -> ts["T"]:
def diff(x: ts["T"], lag: Union[timedelta, int]) -> ts["T"]:
"""diff x against itself lag time/ticks ago"""
return x - _lag(x, lag)

Expand Down Expand Up @@ -396,7 +396,7 @@ def cast_int_to_float(x: ts[int]) -> ts[float]:


@node()
def apply(x: ts["T"], f: typing.Callable[["T"], "U"], result_type: "U") -> ts["U"]:
def apply(x: ts["T"], f: Callable[["T"], "U"], result_type: "U") -> ts["U"]:
"""
:param x: The time series on which the function should be applied
:param f: A scalar function that will be applied on each value of x
Expand Down Expand Up @@ -461,7 +461,7 @@ def drop_nans(x: ts[float]) -> ts[float]:


@node(cppimpl=_cspbaselibimpl.unroll)
def unroll(x: ts[["T"]]) -> ts["T"]:
def unroll(x: ts[List["T"]]) -> ts["T"]:
""" "unrolls" timeseries of lists of type 'T' into individual ticks of type 'T'"""
with csp.alarms():
alarm = csp.alarm("T")
Expand All @@ -484,14 +484,14 @@ def unroll(x: ts[["T"]]) -> ts["T"]:


@node(cppimpl=_cspbaselibimpl.collect)
def collect(x: [ts["T"]]) -> ts[["T"]]:
def collect(x: List[ts["T"]]) -> ts[List["T"]]:
"""convert basket of timeseries into timeseries of list of ticked values"""
if csp.ticked(x):
return list(x.tickedvalues())


@graph
def flatten(x: [ts["T"]]) -> ts["T"]:
def flatten(x: List[ts["T"]]) -> ts["T"]:
"""flatten a basket of inputs into ts[ 'T' ]"""
# Minor optimization, if we have a list with just
# a single ts, then just emit it as-is. Otherwise,
Expand All @@ -504,7 +504,7 @@ def flatten(x: [ts["T"]]) -> ts["T"]:

# TODO cppimpl
@node
def gate(x: ts["T"], release: ts[bool], release_on_tick: bool = False) -> ts[["T"]]:
def gate(x: ts["T"], release: ts[bool], release_on_tick: bool = False) -> ts[List["T"]]:
""" "gate" the input.
if release is false, input will be held until release is true.
when release ticks true, all gated inputs will tick in one shot
Expand Down Expand Up @@ -551,7 +551,9 @@ def null_ts(typ: "T") -> ts["T"]:


@node(cppimpl=_cspbaselibimpl.multiplex)
def multiplex(x: {"K": ts["T"]}, key: ts["K"], tick_on_index: bool = False, raise_on_bad_key: bool = False) -> ts["T"]:
def multiplex(
x: Dict["K", ts["T"]], key: ts["K"], tick_on_index: bool = False, raise_on_bad_key: bool = False
) -> ts["T"]:
"""
:param x: The basket of time series to multiplex
:param key: A
Expand All @@ -578,8 +580,8 @@ def multiplex(x: {"K": ts["T"]}, key: ts["K"], tick_on_index: bool = False, rais


@node(cppimpl=_cspbaselibimpl.demultiplex)
def demultiplex(x: ts["T"], key: ts["K"], keys: ["K"], raise_on_bad_key: bool = False) -> OutputBasket(
{"K": ts["T"]}, shape="keys"
def demultiplex(x: ts["T"], key: ts["K"], keys: List["K"], raise_on_bad_key: bool = False) -> OutputBasket(
Dict["K", ts["T"]], shape="keys"
):
"""whenever the timeseries input ticks, output a tick on the appropriate basket output"""
with csp.state():
Expand All @@ -595,15 +597,15 @@ def demultiplex(x: ts["T"], key: ts["K"], keys: ["K"], raise_on_bad_key: bool =
# TODO - looks like output annotations arent working for dynamic baskets, needs to be fixed
# @node(cppimpl=_cspbaselibimpl.dynamic_demultiplex)
@node
def dynamic_demultiplex(x: ts["T"], key: ts["K"]) -> {ts["K"]: ts["T"]}:
def dynamic_demultiplex(x: ts["T"], key: ts["K"]) -> Dict[ts["K"], ts["T"]]:
"""whenever the timeseries input ticks, output a tick on the appropriate dynamic basket output"""
if csp.ticked(x) and csp.valid(key):
csp.output({key: x})


# @node(cppimpl=_cspbaselibimpl.dynamic_collect)
@node
def dynamic_collect(data: {ts["K"]: ts["V"]}) -> ts[{"K": "V"}]:
def dynamic_collect(data: Dict[ts["K"], ts["V"]]) -> ts[Dict["K", "V"]]:
"""whenever any input of the dynamic basket ticks, output the key-value pairs in a dictionary"""
if csp.ticked(data):
return dict(data.tickeditems())
Expand All @@ -622,7 +624,7 @@ def accum(x: ts["T"], start: "~T" = 0) -> ts["T"]:
@node(cppimpl=_cspbaselibimpl.exprtk_impl)
def _csp_exprtk_impl(
expression_str: str,
inputs: {str: ts[object]},
inputs: Dict[str, ts[object]],
state_vars: dict,
constants: dict,
functions: dict,
Expand All @@ -637,13 +639,13 @@ def _csp_exprtk_impl(
@graph
def exprtk(
expression_str: str,
inputs: {str: ts[object]},
inputs: Dict[str, ts[object]],
state_vars: dict = {},
trigger: ts[object] = None,
functions: dict = {},
constants: dict = {},
output_ndarray: bool = False,
) -> ts[typing.Union[float, np.ndarray]]:
) -> ts[Union[float, np.ndarray]]:
"""given a mathematical expression,
and a set of timeseries corresponding to variables in that expression, tick out the result (a float) of that expression,
either every time an input ticks, or on the trigger if provided.
Expand Down Expand Up @@ -679,7 +681,7 @@ def struct_field(x: ts["T"], field: str, fieldType: "Y") -> ts["Y"]:


@node(cppimpl=_cspbaselibimpl.struct_fromts)
def _struct_fromts(cls: "T", inputs: {str: ts[object]}, trigger: ts[object], use_trigger: bool) -> ts["T"]:
def _struct_fromts(cls: "T", inputs: Dict[str, ts[object]], trigger: ts[object], use_trigger: bool) -> ts["T"]:
"""construct a ticking Struct from the given timeseries.
Note structs will be created from all valid items"""
with csp.start():
Expand All @@ -690,7 +692,7 @@ def _struct_fromts(cls: "T", inputs: {str: ts[object]}, trigger: ts[object], use


@graph
def struct_fromts(cls: "T", inputs: {str: ts[object]}, trigger: ts[object] = None) -> ts["T"]:
def struct_fromts(cls: "T", inputs: Dict[str, ts[object]], trigger: ts[object] = None) -> ts["T"]:
"""construct a ticking Struct from the given timeseries basket.
Note structs will be created from all valid items.
trigger - Optional timeseries to control when struct gets created ( defaults to any time a basket input ticks )"""
Expand All @@ -699,7 +701,7 @@ def struct_fromts(cls: "T", inputs: {str: ts[object]}, trigger: ts[object] = Non


@node(cppimpl=_cspbaselibimpl.struct_collectts)
def struct_collectts(cls: "T", inputs: {str: ts[object]}) -> ts["T"]:
def struct_collectts(cls: "T", inputs: Dict[str, ts[object]]) -> ts["T"]:
"""construct a ticking Struct from the given timeseries.
Note structs will be created from all ticked items"""
if csp.ticked(inputs):
Expand Down Expand Up @@ -820,7 +822,7 @@ def __init__(self, ts_type, default_to_null: bool = False):
"""
super().__init__()
self._inputs = []
self._output = DelayedEdge(ts[[ts_type]], default_to_null)
self._output = DelayedEdge(ts[List[ts_type]], default_to_null)

def copy(self):
res = DelayedCollect()
Expand All @@ -838,7 +840,7 @@ def add_input(self, x: ts["T"]):
self._inputs.append(x)

def output(self):
"""returns collected inputs as ts[ typing.List[ input_ts_type] ]"""
"""returns collected inputs as ts[ List[ input_ts_type] ]"""
return self._output

def _instantiate(self):
Expand Down
Loading

0 comments on commit fac1334

Please sign in to comment.