Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-359] Rename Transformation Function Output Types and Fix Timezone-related issues #829

Merged
merged 37 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2329257
remove unused import
davitbzh Jul 13, 2022
e38ca9c
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 13, 2022
10ad8b9
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 13, 2022
16b2e52
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 15, 2022
63b812b
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 18, 2022
9fdd925
Merge remote-tracking branch 'upstream/master'
davitbzh Aug 4, 2022
3be3d58
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 7, 2022
6873412
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 14, 2022
c033dad
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 15, 2022
1b3d076
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 15, 2022
738ce21
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 19, 2022
afe2459
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 20, 2022
482058b
Merge remote-tracking branch 'upstream/master' into HOPSWORKS-3342-tf…
davitbzh Sep 20, 2022
f8a15ba
spark types for tf
davitbzh Sep 21, 2022
c726566
spark types for tf
davitbzh Sep 21, 2022
f625f77
tmp
davitbzh Sep 22, 2022
9b6394e
Merge remote-tracking branch 'upstream/master' into HOPSWORKS-3342-tf…
davitbzh Sep 22, 2022
025d6c0
merge rebase with upstream/master
davitbzh Sep 22, 2022
1b7df11
[HOPSWORKS-3342][Append] unit tests (#9)
tdoehmen Sep 22, 2022
14fd55a
infer_python_type
davitbzh Sep 22, 2022
083c210
fix tests
davitbzh Sep 22, 2022
2aa0a98
fix tests
davitbzh Sep 22, 2022
32a17db
Merge remote-tracking branch 'upstream/master' into HOPSWORKS-3342-tf…
davitbzh Sep 22, 2022
86c38ee
fix checks
davitbzh Sep 22, 2022
c9881e3
removed legacy types
tdoehmen Oct 12, 2022
868c8c0
removed legacy types cast from test
tdoehmen Oct 12, 2022
5b4a0c3
add convert_column method, fix style
tdoehmen Oct 12, 2022
325490d
updated builtin transformation functions
tdoehmen Oct 13, 2022
54dfdf1
style
tdoehmen Oct 13, 2022
d3f11d0
made transformation functions timezone-safe
tdoehmen Oct 13, 2022
324c112
style
tdoehmen Oct 13, 2022
ca5cddd
removed deprecated np types
tdoehmen Oct 13, 2022
953fec8
tests tf function returning none
tdoehmen Oct 14, 2022
969e8b4
style
tdoehmen Oct 14, 2022
2d54c30
merge with master
tdoehmen Oct 18, 2022
7d553c7
fixed tests
tdoehmen Oct 18, 2022
69c9715
style
tdoehmen Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 17 additions & 39 deletions python/hsfs/core/transformation_function_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,62 +177,40 @@ def populate_builtin_attached_fns(self, attached_transformation_fns, stat_conten
@staticmethod
def infer_spark_type(output_type):
if not output_type:
return "StringType()" # StringType() is default type for spark udfs
return "STRING" # STRING is default type for spark udfs

if isinstance(output_type, str):
output_type = output_type.lower()

if output_type in (str, "str", "string"):
return "StringType()"
elif output_type in (bytes, "binary"):
return "BinaryType()"
elif output_type in (numpy.int8, "int8", "byte", "tinyint"):
return "ByteType()"
elif output_type in (numpy.int16, "int16", "short", "smallint"):
return "ShortType()"
elif output_type in (int, "int", numpy.int, numpy.int32):
return "IntegerType()"
elif output_type in (numpy.int64, "int64", "long", "bigint"):
return "LongType()"
elif output_type in (float, "float", numpy.float):
return "FloatType()"
elif output_type in (numpy.float64, "float64", "double"):
return "DoubleType()"
elif output_type in (datetime.datetime, numpy.datetime64, "datetime"):
return "TimestampType()"
elif output_type in (datetime.date, "date"):
return "DateType()"
elif output_type in (bool, "boolean", "bool", numpy.bool):
return "BooleanType()"
else:
raise TypeError("Not supported type %s." % output_type)

@staticmethod
def convert_legacy_type(output_type):
if output_type == "StringType()":
return "STRING"
elif output_type == "BinaryType()":
elif output_type in (bytes, "binary"):
return "BINARY"
elif output_type == "ByteType()":
elif output_type in (numpy.int8, "int8", "byte", "tinyint"):
return "BYTE"
elif output_type == "ShortType()":
elif output_type in (numpy.int16, "int16", "short", "smallint"):
return "SHORT"
elif output_type == "IntegerType()":
elif output_type in (int, "int", "integer", numpy.int32):
return "INT"
elif output_type == "LongType()":
elif output_type in (numpy.int64, "int64", "long", "bigint"):
return "LONG"
elif output_type == "FloatType()":
elif output_type in (float, "float"):
return "FLOAT"
elif output_type == "DoubleType()":
elif output_type in (numpy.float64, "float64", "double"):
return "DOUBLE"
elif output_type == "TimestampType()":
elif output_type in (
datetime.datetime,
numpy.datetime64,
"datetime",
"timestamp",
):
return "TIMESTAMP"
elif output_type == "DateType()":
elif output_type in (datetime.date, "date"):
return "DATE"
elif output_type == "BooleanType()":
elif output_type in (bool, "boolean", "bool", numpy.bool):
return "BOOLEAN"
else:
return "STRING" # handle gracefully, and return STRING type, the default for spark udfs
raise TypeError("Not supported type %s." % output_type)

@staticmethod
def compute_transformation_fn_statistics(
Expand Down
30 changes: 15 additions & 15 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,29 +825,29 @@ def _apply_transformation_function(self, transformation_functions, dataset):

return dataset

@staticmethod
def convert_column(output_type, feature_column):
if output_type in ("StringType()",):
def convert_column(self, output_type, feature_column):
if output_type == "STRING":
return feature_column.astype(str)
elif output_type in ("BinaryType()",):
elif output_type == "BINARY":
return feature_column.astype(bytes)
elif output_type in ("ByteType()",):
elif output_type == "BYTE":
return feature_column.astype(np.int8)
elif output_type in ("ShortType()",):
elif output_type == "SHORT":
return feature_column.astype(np.int16)
elif output_type in ("IntegerType()",):
elif output_type == "INT":
return feature_column.astype(int)
elif output_type in ("LongType()",):
elif output_type == "LONG":
return feature_column.astype(np.int64)
elif output_type in ("FloatType()",):
elif output_type == "FLOAT":
return feature_column.astype(float)
elif output_type in ("DoubleType()",):
elif output_type == "DOUBLE":
return feature_column.astype(np.float64)
elif output_type in ("TimestampType()",):
return pd.to_datetime(feature_column)
elif output_type in ("DateType()",):
return pd.to_datetime(feature_column).dt.date
elif output_type in ("BooleanType()",):
elif output_type == "TIMESTAMP":
# convert (if tz!=UTC) to utc, then make timezone unaware
return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None)
elif output_type == "DATE":
return pd.to_datetime(feature_column, utc=True).dt.date
elif output_type == "BOOLEAN":
return feature_column.astype(bool)
else:
return feature_column # handle gracefully, just return the column as-is
Expand Down
32 changes: 27 additions & 5 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import numpy as np
import pandas as pd
import avro
from datetime import datetime
from datetime import datetime, timezone

# in case importing in %%local

try:
from pyspark import SparkFiles
from pyspark.sql import SparkSession, DataFrame, SQLContext
Expand Down Expand Up @@ -888,12 +889,33 @@ def _apply_transformation_function(self, transformation_functions, dataset):
+ "_"
+ feature_name
)

def timezone_decorator(func):
if transformation_fn.output_type != "TIMESTAMP":
return func

current_timezone = datetime.now().astimezone().tzinfo

def decorated_func(x):
result = func(x)
if isinstance(result, datetime):
if result.tzinfo is None:
# if timestamp is timezone unaware, make sure it's localized to the system's timezone.
# otherwise, spark will implicitly convert it to the system's timezone.
return result.replace(tzinfo=current_timezone)
else:
# convert to utc, then localize to system's timezone
return result.astimezone(timezone.utc).replace(
tzinfo=current_timezone
)
return result

return decorated_func

self._spark_session.udf.register(
fn_registration_name,
transformation_fn.transformation_fn,
transformation_function_engine.TransformationFunctionEngine.convert_legacy_type(
transformation_fn.output_type
),
timezone_decorator(transformation_fn.transformation_fn),
transformation_fn.output_type,
)
transformation_fn_expressions.append(
"{fn_name:}({name:}) AS {name:}".format(
Expand Down
Loading