Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python 3.13 support #298

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/data/core/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ min: &min
max: &max
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftps/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/hdfs/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ max: &max
hadoop-version: hadoop3-hdfs
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/s3/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ max: &max
minio-version: 2024.11.7
spark-version: 3.5.4
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/sftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min
max: &max
openssh-version: 9.7_p1-r4-ls179
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/webdav/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min
max: &max
webdav-version: latest
pydantic-version: 2
python-version: '3.12'
python-version: '3.13'
os: ubuntu-latest

latest: &latest
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/test-webdav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
with:
python-version: ${{ inputs.python-version }}

- name: Set up lxml libs
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install --no-install-recommends libxml2-dev libxslt-dev

- name: Cache pip
uses: actions/cache@v4
if: inputs.with-cache
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ Compatibility matrix
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.3.x <https://spark.apache.org/docs/3.3.4/#downloading>`_ | 3.7 - 3.10 | 8u201 - 17 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.4.x <https://spark.apache.org/docs/3.4.3/#downloading>`_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 |
| `3.4.x <https://spark.apache.org/docs/3.4.3/#downloading>`_ | 3.7 - 3.13 | 8u362 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.5.x <https://spark.apache.org/docs/3.5.4/#downloading>`_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 |
| `3.5.x <https://spark.apache.org/docs/3.5.4/#downloading>`_ | 3.8 - 3.13 | 8u371 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+

.. _pyspark-install:
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/298.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Python 3.13. support.
2 changes: 1 addition & 1 deletion onetl/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.12.6
0.13.0
14 changes: 3 additions & 11 deletions onetl/base/base_file_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ContextManager
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import DataFrameReader, DataFrameWriter, SparkSession
Expand All @@ -30,7 +30,7 @@ def check_if_supported(self, spark: SparkSession) -> None:
"""

@abstractmethod
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader | ContextManager[DataFrameReader]:
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader:
"""
Apply provided format to :obj:`pyspark.sql.DataFrameReader`. |support_hooks|

Expand All @@ -40,10 +40,6 @@ def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader | ContextM
-------
:obj:`pyspark.sql.DataFrameReader`
DataFrameReader with options applied.

``ContextManager[DataFrameReader]``
If returned context manager, it will be entered before reading data and exited after creating a DataFrame.
Context manager's ``__enter__`` method should return :obj:`pyspark.sql.DataFrameReader` instance.
"""


Expand All @@ -68,7 +64,7 @@ def check_if_supported(self, spark: SparkSession) -> None:
"""

@abstractmethod
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter | ContextManager[DataFrameWriter]:
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
"""
Apply provided format to :obj:`pyspark.sql.DataFrameWriter`. |support_hooks|

Expand All @@ -78,8 +74,4 @@ def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter | ContextM
-------
:obj:`pyspark.sql.DataFrameWriter`
DataFrameWriter with options applied.

``ContextManager[DataFrameWriter]``
If returned context manager, it will be entered before writing and exited after writing a DataFrame.
Context manager's ``__enter__`` method should return :obj:`pyspark.sql.DataFrameWriter` instance.
"""
21 changes: 15 additions & 6 deletions onetl/connection/db_connection/greenplum/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
import logging
import os
import textwrap
import threading
import warnings
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Optional
from urllib.parse import quote, urlencode, urlparse, urlunparse

from etl_entities.instance import Host

from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions

try:
from pydantic.v1 import validator
from pydantic.v1 import PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
from pydantic import validator # type: ignore[no-redef, assignment]
from pydantic import validator, SecretStr, PrivateAttr # type: ignore[no-redef, assignment]

Check warning on line 20 in onetl/connection/db_connection/greenplum/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/greenplum/connection.py#L20

Added line #L20 was not covered by tests

from onetl._util.classproperty import classproperty
from onetl._util.java import try_import_java_class
Expand All @@ -40,7 +41,9 @@
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
JDBCOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.exception import MISSING_JVM_CLASS_MSG, TooManyParallelJobsError
from onetl.hooks import slot, support_hooks
Expand Down Expand Up @@ -70,11 +73,11 @@

class Config:
extra = "allow"
prohibited_options = JDBCOptions.Config.prohibited_options
prohibited_options = JDBCMixinOptions.Config.prohibited_options


@support_hooks
class Greenplum(JDBCMixin, DBConnection):
class Greenplum(JDBCMixin, DBConnection): # noqa: WPS338
"""Greenplum connection. |support_hooks|

Based on package ``io.pivotal:greenplum-spark:2.2.0``
Expand Down Expand Up @@ -158,6 +161,8 @@
"""

host: Host
user: str
password: SecretStr
database: str
port: int = 5432
extra: GreenplumExtra = GreenplumExtra()
Expand All @@ -167,6 +172,7 @@
SQLOptions = GreenplumSQLOptions
FetchOptions = GreenplumFetchOptions
ExecuteOptions = GreenplumExecuteOptions
JDBCOptions = JDBCMixinOptions

Extra = GreenplumExtra
Dialect = GreenplumDialect
Expand All @@ -175,6 +181,9 @@
CONNECTIONS_WARNING_LIMIT: ClassVar[int] = 31
CONNECTIONS_EXCEPTION_LIMIT: ClassVar[int] = 100

_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

@slot
@classmethod
def get_packages(
Expand Down
7 changes: 4 additions & 3 deletions onetl/connection/db_connection/hive/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from onetl._metrics.recorder import SparkMetricsRecorder
from onetl._util.spark import inject_spark_param, override_job_description
from onetl._util.sql import clear_statement
from onetl.base import BaseWritableFileFormat
from onetl.connection.db_connection.db_connection import DBConnection
from onetl.connection.db_connection.hive.dialect import HiveDialect
from onetl.connection.db_connection.hive.options import (
Expand All @@ -24,7 +25,7 @@
HiveWriteOptions,
)
from onetl.connection.db_connection.hive.slots import HiveSlots
from onetl.file.format.file_format import WriteOnlyFileFormat
from onetl.file.format.file_format import ReadWriteFileFormat, WriteOnlyFileFormat
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand Down Expand Up @@ -504,7 +505,7 @@ def _format_write_options(self, write_options: HiveWriteOptions) -> dict:
exclude={"if_exists"},
)

if isinstance(write_options.format, WriteOnlyFileFormat):
if isinstance(write_options.format, (WriteOnlyFileFormat, ReadWriteFileFormat)):
options_dict["format"] = write_options.format.name
options_dict.update(write_options.format.dict(exclude={"name"}))

Expand Down Expand Up @@ -533,7 +534,7 @@ def _save_as_table(
writer = writer.option(method, value)

# deserialize passed OCR(), Parquet(), CSV(), etc. file formats
if isinstance(write_options.format, WriteOnlyFileFormat):
if isinstance(write_options.format, BaseWritableFileFormat):
writer = write_options.format.apply_to_writer(writer)
elif isinstance(write_options.format, str):
writer = writer.format(write_options.format)
Expand Down
4 changes: 2 additions & 2 deletions onetl/connection/db_connection/hive/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from typing_extensions import deprecated

from onetl.file.format.file_format import WriteOnlyFileFormat
from onetl.base import BaseWritableFileFormat
from onetl.impl import GenericOptions


Expand Down Expand Up @@ -204,7 +204,7 @@ class Config:
does not affect behavior.
"""

format: Union[str, WriteOnlyFileFormat] = "orc"
format: Union[str, BaseWritableFileFormat] = "orc"
"""Format of files which should be used for storing table data.

Examples
Expand Down
58 changes: 53 additions & 5 deletions onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@

import logging
import secrets
import threading
import warnings
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar, Optional

try:
from pydantic.v1 import PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
from pydantic import PrivateAttr, SecretStr, validator # type: ignore[no-redef, assignment]

Check warning on line 14 in onetl/connection/db_connection/jdbc_connection/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/jdbc_connection/connection.py#L13-L14

Added lines #L13 - L14 were not covered by tests

from onetl._util.java import try_import_java_class
from onetl._util.spark import override_job_description
from onetl._util.sql import clear_statement
from onetl.connection.db_connection.db_connection import DBConnection
Expand All @@ -20,7 +27,14 @@
JDBCWriteOptions,
)
from onetl.connection.db_connection.jdbc_mixin import JDBCMixin
from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.exception import MISSING_JVM_CLASS_MSG
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand All @@ -45,13 +59,38 @@


@support_hooks
class JDBCConnection(JDBCMixin, DBConnection):
class JDBCConnection(JDBCMixin, DBConnection): # noqa: WPS338
user: str
password: SecretStr

DRIVER: ClassVar[str]
_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

JDBCOptions = JDBCMixinOptions
FetchOptions = JDBCFetchOptions
ExecuteOptions = JDBCExecuteOptions
Dialect = JDBCDialect
ReadOptions = JDBCReadOptions
SQLOptions = JDBCSQLOptions
WriteOptions = JDBCWriteOptions
Options = JDBCLegacyOptions

@validator("spark")
def _check_java_class_imported(cls, spark):
try:
try_import_java_class(spark, cls.DRIVER)
except Exception as e:
msg = MISSING_JVM_CLASS_MSG.format(
java_class=cls.DRIVER,
package_source=cls.__name__,
args="",
)
if log.isEnabledFor(logging.DEBUG):
log.debug("Missing Java class", exc_info=e, stack_info=True)

Check warning on line 90 in onetl/connection/db_connection/jdbc_connection/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/jdbc_connection/connection.py#L90

Added line #L90 was not covered by tests
raise ValueError(msg) from e
return spark

@slot
def sql(
self,
Expand Down Expand Up @@ -116,11 +155,16 @@
limit: int | None = None,
options: JDBCReadOptions | None = None,
) -> DataFrame:
if isinstance(options, JDBCLegacyOptions):
raw_options = self.ReadOptions.parse(options.dict(exclude_unset=True))

Check warning on line 159 in onetl/connection/db_connection/jdbc_connection/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/jdbc_connection/connection.py#L159

Added line #L159 was not covered by tests
else:
raw_options = self.ReadOptions.parse(options)

read_options = self._set_lower_upper_bound(
table=source,
where=where,
hint=hint,
options=self.ReadOptions.parse(options),
options=raw_options,
)

new_columns = columns or ["*"]
Expand Down Expand Up @@ -178,7 +222,11 @@
target: str,
options: JDBCWriteOptions | None = None,
) -> None:
write_options = self.WriteOptions.parse(options)
if isinstance(options, JDBCLegacyOptions):
write_options = self.WriteOptions.parse(options.dict(exclude_unset=True))

Check warning on line 226 in onetl/connection/db_connection/jdbc_connection/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/jdbc_connection/connection.py#L226

Added line #L226 was not covered by tests
else:
write_options = self.WriteOptions.parse(options)

jdbc_properties = self._get_jdbc_properties(write_options, exclude={"if_exists"}, exclude_none=True)

mode = (
Expand Down
14 changes: 13 additions & 1 deletion onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,19 @@ def _check_partition_fields(cls, values):
"Deprecated in 0.5.0 and will be removed in 1.0.0. Use 'ReadOptions' or 'WriteOptions' instead",
category=UserWarning,
)
class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions):
class JDBCLegacyOptions(GenericOptions):
class Config:
prohibited_options = GENERIC_PROHIBITED_OPTIONS
known_options = READ_OPTIONS | WRITE_OPTIONS | READ_WRITE_OPTIONS
extra = "allow"

partition_column: Optional[str] = Field(default=None, alias="partitionColumn")
num_partitions: PositiveInt = Field(default=1, alias="numPartitions")
lower_bound: Optional[int] = Field(default=None, alias="lowerBound")
upper_bound: Optional[int] = Field(default=None, alias="upperBound")
session_init_statement: Optional[str] = Field(default=None, alias="sessionInitStatement")
query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode")
isolation_level: str = Field(default="READ_UNCOMMITTED", alias="isolationLevel")
fetchsize: int = 100_000
batchsize: int = 20_000
Loading
Loading