Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
achoum committed Jun 6, 2023
1 parent 64cad58 commit 603bca8
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 108 deletions.
2 changes: 1 addition & 1 deletion temporian/core/data/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ def source_node(
assumed not indexed.
is_unix_timestamp: If true, the timestamps are interpreted as unix
timestamps in seconds.
same_sampling_as: If set, the created node is guarentied to have the
same_sampling_as: If set, the created node is guaranteed to have the
same sampling as same_sampling_as`. In this case, `indexes` and
`is_unix_timestamp` should not be provided. Some operators require
for input nodes to have the same sampling.
Expand Down
108 changes: 62 additions & 46 deletions temporian/implementation/numpy/data/event_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import math
import datetime

import numpy as np

Expand Down Expand Up @@ -95,7 +96,7 @@ def tp_dtype_to_np_dtype(dtype: DType) -> Any:
def normalize_features(
feature_values: Any,
) -> np.ndarray:
"""Normalies a list of feature values to temporian format.
"""Normalizes a list of feature values to temporian format.
Keep this function in sync with the documentation of "io.event_set".
Expand Down Expand Up @@ -146,8 +147,8 @@ def normalize_features(
return feature_values


def normalize_timestamp(
raw_timestamps: Any,
def normalize_timestamps(
values: Any,
) -> Tuple[np.ndarray, bool]:
"""Normalizes timestamps to temporian format.
Expand All @@ -159,61 +160,54 @@ def normalize_timestamp(
"""

# Convert to numpy array
if not isinstance(raw_timestamps, np.ndarray):
raw_timestamps = np.array(raw_timestamps)
if not isinstance(values, np.ndarray):
values = np.array(values)

# raw_timestamps is represented as a number. Cast to float64.
if raw_timestamps.dtype.type in [np.float32, np.int64, np.int32]:
raw_timestamps = raw_timestamps.astype(np.float64)
# values is represented as a number. Cast to float64.
if values.dtype.type in [np.float32, np.int64, np.int32]:
values = values.astype(np.float64)

if raw_timestamps.dtype.type == np.float64:
if values.dtype.type == np.float64:
# Check NaN
if np.isnan(raw_timestamps).any():
if np.isnan(values).any():
raise ValueError("Timestamps contains NaN values.")

return raw_timestamps, False
return values, False

if raw_timestamps.dtype.type == np.object_:
if all(isinstance(x, str) for x in raw_timestamps):
# raw_timestamps is a date. Cast to unix epoch in float64 seconds.
raw_timestamps = (
raw_timestamps.astype("datetime64[ns]").astype(np.float64) / 1e9
)
return raw_timestamps, True
if values.dtype.type in [np.str_, np.string_]:
values = values.astype("datetime64[ns]")

if values.dtype.type == np.object_:
if all(isinstance(x, str) for x in values) or all(
isinstance(x, datetime.datetime) for x in values
):
# values is a date. Cast to unix epoch in float64 seconds.
values = values.astype("datetime64[ns]")

if all(
elif all(
str(type(x)) == "<class 'pandas._libs.tslibs.timestamps.Timestamp'>"
for x in raw_timestamps
for x in values
):
raw_timestamps = np.array([x.to_numpy() for x in raw_timestamps])
values = np.array([x.to_numpy() for x in values])

if raw_timestamps.dtype.type == np.datetime64:
# raw_timestamps is a date. Cast to unix epoch in float64 seconds.
raw_timestamps = (
raw_timestamps.astype("datetime64[ns]").astype(np.float64) / 1e9
)
return raw_timestamps, True
if values.dtype.type == np.datetime64:
# values is a date. Cast to unix epoch in float64 seconds.
values = values.astype("datetime64[ns]").astype(np.float64) / 1e9
return values, True

object_description = (
f"{raw_timestamps!r}.\nDetails: type={type(raw_timestamps)}"
)
if isinstance(raw_timestamps, np.ndarray):
object_description = f"{values!r}.\nDetails: type={type(values)}"
if isinstance(values, np.ndarray):
object_description += (
f" np.dtype={raw_timestamps.dtype}"
f" np.dtype.type={raw_timestamps.dtype.type}"
f" np.dtype={values.dtype} np.dtype.type={values.dtype.type}"
)
if raw_timestamps.dtype.type == np.object_:
object_description += f" type(value[0])={type(raw_timestamps[0])}"
if values.dtype.type == np.object_:
object_description += f" type(value[0])={type(values[0])}"

# Keep this function in sync with the documentation of "io.event_set".
raise ValueError(
f"""Invalid timestamps value.
Timestamps can be:
- Python list of int, float, str, bytes and datetime.
- Numpy arrays of int{{32, 64}}, float{{32, 64}}, str_, string_ / bytes_,
datetime64, object (containing "str"),
- Pandas series of int{{32, 64}}, float{{32, 64}}, Timestamp.
Instead, got {object_description}."""
"Invalid timestamps value. Check `tp.event_set` documentation for the"
" list of supported timestamp types. Instead, got"
f" {object_description}."
)


Expand Down Expand Up @@ -391,10 +385,30 @@ def node(self, force_new_node: bool = False) -> Node:
def source_node(self, force_new_node: bool = False) -> Node:
"""Creates a node able to consume the the event set.
If called multiple times, always return the same node.
If called multiple times with force_new_node=False (default), the same
node is returned.
Usage example:
```
my_evset = tp.event_set(
timestamps=[1, 2, 3, 4],
features={
"feature_1": [0.5, 0.6, math.nan, 0.9],
"feature_2": ["red", "blue", "red", "blue"],
},
)
my_node = my_evset.source_node()
```
Args:
force_new_node: If false (default), return the same node if `node`
is called multiple times. If true, return a new node each time.
force_new_node: If false (default), return the same node each time
`source_node` is called. If true, a new node is created each
time.
Returns:
A node able to consume the content of the event set.
"""

if self._internal_node is not None and not force_new_node:
Expand Down Expand Up @@ -424,7 +438,9 @@ def from_dataframe(
)

def to_dataframe(self) -> "pd.DataFrame":
"""Convert a EventSet to a pandas DataFrame.
"""Convert an EventSet to a pandas DataFrame.
See `tp.pd_dataframe_to_event_set` for details.
Returns:
DataFrame created from EventSet.
Expand Down
Loading

0 comments on commit 603bca8

Please sign in to comment.