Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test code and version support update #378

Merged
merged 17 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,9 @@ Run the following command in your terminal to create a virtual environment in th
tox --devenv .venv -e {environment-name}
```
The `—devenv` flag tells `tox` to create a development environment, and `.venv` is the folder where the virtual environment will be created.
Pre-defined environments can be found within the `tox.ini` file for different Python versions and their corresponding PySpark version. They include:
- py37-pyspark300
- py38-pyspark312
- py38-pyspark321
- py39-pyspark330
- py39-pyspark332

## Environments we test
The environments we test against are defined within the `tox.ini` file, and the requirements for those environments are stored in `python/tests/requirements`. The makeup of these environments is inspired by the [Databricks Runtime](https://docs.databricks.com/en/release-notes/runtime/index.html#) (hence the naming convention), but it's important to note that developing Databricks is **not** a requirement. We're simply mimicking some of the different runtime versions because (a) we recognize that much of the user base uses `tempo` on Databricks and (b) it saves development time spent trying to build out test environments with different versions of Python and PySpark from scratch.

## Run tests locally for one or more environments
You can run tests locally for one or more environments defined enviornments without setting up a development environment first.
Expand Down
6 changes: 6 additions & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
sphinx-autobuild==2021.3.14
sphinx-copybutton==0.5.1
Sphinx==4.5.0
sphinx-design==0.2.0
sphinx-panels==0.6.0
furo==2022.9.29
19 changes: 0 additions & 19 deletions python/requirements.txt

This file was deleted.

20 changes: 4 additions & 16 deletions python/tempo/io.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from __future__ import annotations

import logging
import os
from collections import deque
from typing import Optional

import pyspark.sql.functions as sfn
import tempo.tsdf as t_tsdf
from pyspark.sql import SparkSession
from pyspark.sql.utils import ParseException

import tempo.tsdf as t_tsdf

logger = logging.getLogger(__name__)


Expand All @@ -31,12 +29,6 @@ def write(
df = tsdf.df
ts_col = tsdf.ts_col
partitionCols = tsdf.partitionCols
if optimizationCols:
optimizationCols = optimizationCols + ["event_time"]
else:
optimizationCols = ["event_time"]

useDeltaOpt = os.getenv("DATABRICKS_RUNTIME_VERSION") is not None

view_df = df.withColumn("event_dt", sfn.to_date(sfn.col(ts_col))).withColumn(
"event_time",
Expand All @@ -52,11 +44,12 @@ def write(
tabName
)

if useDeltaOpt:
if optimizationCols:
try:
spark.sql(
"optimize {} zorder by {}".format(
tabName, "(" + ",".join(partitionCols + optimizationCols) + ")"
tabName,
"(" + ",".join(partitionCols + optimizationCols + [ts_col]) + ")",
)
)
except ParseException as e:
Expand All @@ -65,8 +58,3 @@ def write(
e
)
)
else:
logger.warning(
"Delta optimizations attempted on a non-Databricks platform. "
"Switch to use Databricks Runtime to get optimization advantages."
)
3 changes: 2 additions & 1 deletion python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import TimestampType
from pyspark.sql.window import Window, WindowSpec
from scipy.fft import fft, fftfreq # type: ignore

Expand Down Expand Up @@ -1102,7 +1103,7 @@ def withRangeStats(
]

# build window
if str(self.df.schema[self.ts_col].dataType) == "TimestampType":
if isinstance(self.df.schema[self.ts_col].dataType, TimestampType):
self.df = self.__add_double_ts()
prohibited_cols.extend(["double_ts"])
w = self.__rangeBetweenWindow(
Expand Down
8 changes: 3 additions & 5 deletions python/tempo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
import warnings
from typing import List, Optional, Union, overload

import pyspark.sql.functions as sfn
import tempo.resample as t_resample
import tempo.tsdf as t_tsdf
from IPython import get_ipython
from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pandas.core.frame import DataFrame as pandasDataFrame

import pyspark.sql.functions as sfn
from pyspark.sql.dataframe import DataFrame

import tempo.resample as t_resample
import tempo.tsdf as t_tsdf

logger = logging.getLogger(__name__)
IS_DATABRICKS = "DB_HOME" in os.environ.keys()

Expand Down
16 changes: 9 additions & 7 deletions python/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
from typing import Union

import jsonref
from chispa import assert_df_equality

import pyspark.sql.functions as sfn
from chispa import assert_df_equality
from delta.pip_utils import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

from tempo.intervals import IntervalsDF
from tempo.tsdf import TSDF

Expand All @@ -28,9 +27,11 @@ class SparkTest(unittest.TestCase):
def setUpClass(cls) -> None:
# create and configure PySpark Session
cls.spark = (
SparkSession.builder.appName("unit-tests")
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
configure_spark_with_delta_pip(SparkSession.builder.appName("unit-tests"))
.config(
"spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension",
)
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
Expand Down Expand Up @@ -124,7 +125,7 @@ def __loadTestData(self, test_case_path: str) -> dict:
:param test_case_path: string representation of the data path e.g. : "tsdf_tests.BasicTests.test_describe"
:type test_case_path: str
"""
file_name, class_name, func_name = test_case_path.split(".")
file_name, class_name, func_name = test_case_path.split(".")[-3:]

# find our test data file
test_data_file = self.__getTestDataFilePath(file_name)
Expand Down Expand Up @@ -225,4 +226,5 @@ def assertDataFrameEquality(
ignore_row_order=ignore_row_order,
ignore_column_order=ignore_column_order,
ignore_nullable=ignore_nullable,
ignore_metadata=True,
)
71 changes: 27 additions & 44 deletions python/tests/io_tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import os
import unittest
from unittest import mock
from importlib.metadata import version

from packaging import version as pkg_version
from tests.base import SparkTest

DELTA_VERSION = version("delta-spark")


class DeltaWriteTest(SparkTest):
def test_write_to_delta_without_optimization_cols(self):
Expand Down Expand Up @@ -37,29 +39,6 @@ def test_write_to_delta_with_optimization_cols(self):
# should be equal to the expected dataframe
self.assertEqual(self.spark.table(table_name).count(), 7)

def test_write_to_delta_non_dbr_environment_logging(self):
"""Test logging when writing"""

table_name = "my_table_optimization_col"

# load test data
input_tsdf = self.get_data_as_tsdf("input_data")

with self.assertLogs(level="WARNING") as warning_captured:
# test write to delta
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(warning_captured.records), 1)
self.assertEqual(
warning_captured.output,
[
"WARNING:tempo.io:"
"Delta optimizations attempted on a non-Databricks platform. "
"Switch to use Databricks Runtime to get optimization advantages."
],
)

@mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "10.4"})
def test_write_to_delta_bad_dbr_environment_logging(self):
"""Test useDeltaOpt Exception"""

Expand All @@ -68,25 +47,29 @@ def test_write_to_delta_bad_dbr_environment_logging(self):
# load test data
input_tsdf = self.get_data_as_tsdf("input_data")

with self.assertLogs(level="ERROR") as error_captured:
# test write to delta
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(error_captured.records), 1)
print(error_captured.output)
self.assertEqual(
error_captured.output,
[
"ERROR:tempo.io:"
"Delta optimizations attempted, but was not successful.\nError: \nmismatched input "
"'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', "
"'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', "
"'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', "
"'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', "
"'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize "
"my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n"
],
)
if pkg_version.parse(DELTA_VERSION) < pkg_version.parse("2.0.0"):

with self.assertLogs(level="ERROR") as error_captured:
# should fail to run optimize
input_tsdf.write(self.spark, table_name, ["date"])

self.assertEqual(len(error_captured.records), 1)
print(error_captured.output)
self.assertEqual(
error_captured.output,
[
"ERROR:tempo.io:"
"Delta optimizations attempted, but was not successful.\nError: \nmismatched input "
"'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', "
"'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', "
"'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', "
"'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', "
"'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize "
"my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n"
],
)
else:
pass


# MAIN
Expand Down
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr104.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==1.1.0
ipython==7.22.0
numpy==1.20.1
pandas==1.2.4
pyarrow==4.0.0
pyspark==3.2.1
scipy==1.6.2
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr113.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==2.1.0
ipython==7.32.0
numpy==1.20.3
pandas==1.3.4
pyarrow==7.0.0
pyspark==3.3.0
scipy==1.7.1
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr122.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==2.2.0
ipython==8.5.0
numpy==1.21.5
pandas==1.4.2
pyarrow==7.0.0
pyspark==3.3.2
scipy==1.7.3
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr133.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==2.4.0
ipython==8.10.0
numpy==1.21.5
pandas==1.4.4
pyarrow==8.0.0
pyspark==3.4.1
scipy==1.9.1
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr142.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==3.0.0
ipython==8.14.0
numpy==1.23.5
pandas==1.5.3
pyarrow==8.0.0
pyspark==3.5.0
scipy==1.10.0
7 changes: 7 additions & 0 deletions python/tests/requirements/dbr91.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
delta-spark==1.0.0
ipython==7.22.0
numpy==1.19.2
pandas==1.2.4
pyarrow==4.0.0
pyspark==3.1.2
scipy==1.6.2
4 changes: 4 additions & 0 deletions python/tests/requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
chispa
jsonref
packaging
python-dateutil
3 changes: 1 addition & 2 deletions python/tests/tsdf_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,7 @@ def test_withPartitionCols(self):
self.assertEqual(init_tsdf.partitionCols, [])
self.assertEqual(actual_tsdf.partitionCols, ["symbol"])

def test_tsdf_interpolate(self):
...
def test_tsdf_interpolate(self): ...


class FourierTransformTest(SparkTest):
Expand Down
23 changes: 13 additions & 10 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ envlist =
build-dist
; Mirror Supported LTS DBR versions here: https://docs.databricks.com/release-notes/runtime/
; Use correct PySpark version based on Python version present in env name
py37-pyspark300,
py38-pyspark{312,321},
py39-pyspark{330,332}
dbr{91,104,113,122,133,142}
R7L208 marked this conversation as resolved.
Show resolved Hide resolved
skip_missing_interpreters = true


Expand All @@ -23,14 +21,19 @@ package = wheel
wheel_build_env = .pkg
setenv =
COVERAGE_FILE = .coverage.{envname}
basepython =
dbr142: py310
dbr133: py310
dbr122: py39
dbr113: py39
dbr104: py38
dbr91: py38
dbr73: py37
deps =
pyspark300: pyspark==3.0.0
pyspark312: pyspark==3.1.2
pyspark321: pyspark==3.2.1
pyspark330: pyspark==3.3.0
pyspark332: pyspark==3.3.2
-rtests/requirements/{envname}.txt
-rtests/requirements/dev.txt
coverage>=7,<8
-rrequirements.txt

commands =
coverage --version
coverage run -m unittest discover -s tests -p '*_tests.py'
Expand Down Expand Up @@ -63,7 +66,7 @@ deps =
mypy>=1,<2
pandas-stubs>=2,<3
types-pytz>=2023,<2024
-rrequirements.txt
-rtests/requirements/dbr133.txt
commands =
mypy {toxinidir}/tempo

Expand Down
Loading