Skip to content

Commit

Permalink
[FSTORE-311] Improved timestamp with timezone handling (#821)
Browse files Browse the repository at this point in the history
* add timezone normalization to python engine

* set spark session timezone to utc

* stylecheck

* set spark timezone in java client

* style

* remove spark proerty validation because its internally set as session property
  • Loading branch information
tdoehmen authored Oct 6, 2022
1 parent de28384 commit a2f7d30
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private SparkEngine() {
sparkSession.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
// force Spark to fallback to using the Hive Serde to read Hudi COPY_ON_WRITE tables
sparkSession.conf().set("spark.sql.hive.convertMetastoreParquet", "false");
sparkSession.conf().set("spark.sql.session.timeZone", "UTC");
}

public void validateSparkConfiguration() throws FeatureStoreException {
Expand Down
7 changes: 7 additions & 0 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ def convert_to_default_dataframe(self, dataframe):
util.FeatureGroupWarning,
)

# convert timestamps with timezone to UTC
for col in dataframe.columns:
if isinstance(
dataframe[col].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
dataframe[col] = dataframe[col].dt.tz_convert(None)

# making a shallow copy of the dataframe so that column names are unchanged
dataframe_copy = dataframe.copy(deep=False)
dataframe_copy.columns = [x.lower() for x in dataframe_copy.columns]
Expand Down
1 change: 1 addition & 0 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self):
self._spark_session.conf.set("hive.exec.dynamic.partition", "true")
self._spark_session.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
self._spark_session.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
self._spark_session.conf.set("spark.sql.session.timeZone", "UTC")

if importlib.util.find_spec("pydoop"):
# If we are on Databricks don't setup Pydoop as it's not available and cannot be easily installed.
Expand Down
57 changes: 57 additions & 0 deletions python/tests/engine/test_python_spark_convert_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Copyright 2022 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from hsfs.engine import spark
from hsfs.engine import python


class TestPythonSparkConvertDataframe:
def test_convert_to_default_dataframe_w_timezone(
self, mocker, dataframe_fixture_times
):
mocker.patch("hsfs.client.get_instance")
python_engine = python.Engine()

default_df_python = python_engine.convert_to_default_dataframe(
dataframe_fixture_times
)

spark_engine = spark.Engine()

default_df_spark_from_pd = spark_engine.convert_to_default_dataframe(
dataframe_fixture_times
)

assert (
default_df_spark_from_pd.head()[2]
== default_df_python["event_datetime_notz"][0].to_pydatetime()
)
assert (
default_df_spark_from_pd.head()[3]
== default_df_python["event_datetime_utc"][0].to_pydatetime()
)
assert (
default_df_spark_from_pd.head()[4]
== default_df_python["event_datetime_utc_3"][0].to_pydatetime()
)
assert (
default_df_spark_from_pd.head()[5]
== default_df_python["event_timestamp"][0].to_pydatetime()
)
assert (
default_df_spark_from_pd.head()[6]
== default_df_python["event_timestamp_pacific"][0].to_pydatetime()
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from hsfs.core.transformation_function_engine import TransformationFunctionEngine


class TestPythonSparkTransformationFuctions:
class TestPythonSparkTransformationFunctions:
def _create_training_dataset(
self, tf_fun, output_type=None, name=None, col="col_0"
):
Expand Down

0 comments on commit a2f7d30

Please sign in to comment.