Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change private attr and step getter #82

Merged
merged 66 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
2e86208
fix: work in progress
mikita-sakalouski Aug 9, 2024
0aa6a53
Merge remote-tracking branch 'origin/main' into 33-feature-ensure-tha…
mikita-sakalouski Sep 10, 2024
a9fe361
feat: add delta test skipping for 3.5
mikita-sakalouski Sep 19, 2024
f2ab79f
refactor: update DataFrame imports to use koheesio.spark across the c…
mikita-sakalouski Sep 30, 2024
f3e6014
chore: update dependencies and improve SQL step handling
mikita-sakalouski Oct 8, 2024
f24ed3e
Added show_string utility
dannymeijer Oct 9, 2024
ecbdc9a
refactor: replace spark_minor_version with SPARK_MINOR_VERSION consta…
mikita-sakalouski Oct 9, 2024
18ff011
few more fixes
dannymeijer Oct 9, 2024
8a84038
Merge remote-tracking branch 'origin/33-feature-ensure-that-we-can-su…
dannymeijer Oct 9, 2024
6a291b2
few more fixes
dannymeijer Oct 9, 2024
3bfe84a
refactor: update utility functions and improve test assertions for cl…
mikita-sakalouski Oct 9, 2024
a3a5ad6
few more fixes
dannymeijer Oct 9, 2024
31bb7d7
Down to the last 36 tests to fix
dannymeijer Oct 14, 2024
c837cd4
fix typo
dannymeijer Oct 14, 2024
3b09e5e
refactor: streamline imports in row_number_dedup.py for clarity
mikita-sakalouski Oct 14, 2024
ac9eee5
refactor: enhance BoxCsvFileReader to use pandas for CSV parsing
mikita-sakalouski Oct 14, 2024
b360d8e
last 24
dannymeijer Oct 15, 2024
cfab89f
fix formatting
dannymeijer Oct 15, 2024
891c7f5
one more test
dannymeijer Oct 15, 2024
349770c
17 more remaining
dannymeijer Oct 16, 2024
1b47c75
Last 21
dannymeijer Oct 16, 2024
4c93701
Last 21
dannymeijer Oct 16, 2024
1a77512
Last 20
dannymeijer Oct 16, 2024
224c0cc
EOD
dannymeijer Oct 21, 2024
42b7d86
fix: refactor connect types
mikita-sakalouski Oct 21, 2024
0c3b4f1
fix: improve session handling and type annotations in connect_utils a…
mikita-sakalouski Oct 21, 2024
586d76a
fix: improve tests
mikita-sakalouski Oct 22, 2024
46a18ca
snowflake refactoring (95% done)
dannymeijer Oct 22, 2024
16290e3
fix: adjust imports os spark connect
mikita-sakalouski Oct 22, 2024
fd415ad
Merge branch '33-feature-ensure-that-we-can-support-dbr-143lts' of pe…
mikita-sakalouski Oct 22, 2024
77aa482
fix: update Snowflake integration tests and improve session handling
mikita-sakalouski Oct 22, 2024
a8219f8
fix: update imports and add type ignores in Snowflake integration
mikita-sakalouski Oct 22, 2024
e32e9a7
fix: disable snowflake tests
mikita-sakalouski Oct 22, 2024
9fda4df
fix: update dependencies and improve Spark integration handling
mikita-sakalouski Oct 22, 2024
f251e87
fix: remove TypeAlias usage and simplify type definitions in common.py
mikita-sakalouski Oct 22, 2024
a7319f6
fix: improve tests
mikita-sakalouski Oct 22, 2024
53bb8ec
Merge remote-tracking branch 'origin/main' into 33-feature-ensure-tha…
mikita-sakalouski Oct 22, 2024
b570064
fix: spark imports
mikita-sakalouski Oct 22, 2024
916e1a8
fix: import DataStreamReader
mikita-sakalouski Oct 22, 2024
635a525
fix: active spark session
mikita-sakalouski Oct 22, 2024
5856960
fix: conftest
mikita-sakalouski Oct 22, 2024
d377bb1
fix: tests
mikita-sakalouski Oct 22, 2024
95a9d70
fix: spark remote parallel
mikita-sakalouski Oct 22, 2024
9baca2c
fix: remote port
mikita-sakalouski Oct 22, 2024
3e63806
fix: try with random port
mikita-sakalouski Oct 22, 2024
7b28ba4
fix: tests
mikita-sakalouski Oct 22, 2024
29e784f
fix: tests
mikita-sakalouski Oct 22, 2024
e9b0aca
fix: fail fast
mikita-sakalouski Oct 22, 2024
988fb03
fix: github action test
mikita-sakalouski Oct 22, 2024
b0fd123
fix: delta packages for builder
mikita-sakalouski Oct 22, 2024
b426f66
fix: delta packages
mikita-sakalouski Oct 22, 2024
14c6519
fix: get_active_session
mikita-sakalouski Oct 22, 2024
a1ce806
fix: tests
mikita-sakalouski Oct 22, 2024
754a21e
fix: handle multiple import errors for AnalysisException and ParseExc…
mikita-sakalouski Oct 22, 2024
7fddd06
refactor: reorganize imports and clean up unused references
mikita-sakalouski Oct 22, 2024
ccaed64
fix: connect parallel testing
mikita-sakalouski Oct 22, 2024
56e4f6c
fix: test
mikita-sakalouski Oct 23, 2024
09c2a35
refactor: change private attr and output getter
mikita-sakalouski Oct 23, 2024
ac1bbef
Merge remote-tracking branch 'origin/release/0.9' into hotfix/step_ou…
mikita-sakalouski Oct 29, 2024
8ab242f
fix: resolve issue with step output handling
mikita-sakalouski Oct 29, 2024
b576275
format: apply ruff
mikita-sakalouski Oct 29, 2024
7dfc58e
fmt: isort
mikita-sakalouski Oct 29, 2024
ee065a2
fmt: apply ruff sort
mikita-sakalouski Oct 29, 2024
f480c85
refactor: remove 'brickflow' from section-order in pyproject.toml
mikita-sakalouski Oct 29, 2024
7b4d5ed
refactor: clean up pyproject.toml and improve test for DeltaTableWriter
mikita-sakalouski Oct 29, 2024
e3f49b2
refactor: enhance type annotations in BaseReader and DropColumn classes
mikita-sakalouski Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 44 additions & 84 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,7 @@ tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"]
# Snowflake dependencies
snowflake = ["snowflake-connector-python>=3.12.0"]
# Development dependencies
dev = [
"black",
"isort",
"ruff",
"mypy",
"pylint",
"colorama",
"types-PyYAML",
"types-requests",

]
dev = ["ruff", "mypy", "pylint", "colorama", "types-PyYAML", "types-requests"]
test = [
"chispa",
"coverage[toml]",
Expand Down Expand Up @@ -153,23 +143,19 @@ Run `hatch run` to run scripts in the default environment.

# Code Quality
To check and format the codebase, we use:
- `black` for code formatting
- `isort` for import sorting (includes colorama for colored output)
- `ruff` for linting.
- `ruff` for linting, formtting and sorting imports
- `mypy` for static type checking.
- `pylint` for code quality checks.
---
There are several ways to run style checks and formatting:
- `hatch run black-check` will check the codebase with black without applying fixes.
- `hatch run black-fmt` will format the codebase using black.
- `hatch run isort-check` will check the codebase with isort without applying fixes.
- `hatch run isort-fmt` will format the codebase using isort.
- `hatch run ruff-check` will check the codebase with ruff without applying fixes.
- `hatch run ruff-fmt` will format the codebase using ruff.
- `hatch run mypy-check` will check the codebase with mypy.
- `hatch run pylint-check` will check the codebase with pylint.
- `hatch run check` will run all the above checks (including pylint and mypy).
- `hatch run fmt` or `hatch run fix` will format the codebase using black, isort, and ruff.
- `hatch run fmt` or `hatch run fix` will format the codebase using ruff.
- `hatch run lint` will run ruff, mypy, and pylint.

# Testing and Coverage
Expand Down Expand Up @@ -207,22 +193,14 @@ features = [
# TODO: add scripts section based on Makefile
# TODO: add bandit
# Code Quality commands
black-check = "black --check --diff ."
black-fmt = "black ."
isort-check = "isort . --check --diff --color"
isort-fmt = "isort ."
ruff-check = "ruff check ."
ruff-fmt = "ruff check . --fix"
ruff-fmt = "ruff format --check --diff ."
ruff-fmt-fix = "ruff format ."
ruff-check = "ruff check . --diff"
ruff-check-fix = "ruff check . --fix"
mypy-check = "mypy src"
pylint-check = "pylint --output-format=colorized -d W0511 src"
check = [
"- black-check",
"- isort-check",
"- ruff-check",
"- mypy-check",
"- pylint-check",
]
fmt = ["black-fmt", "isort-fmt", "ruff-fmt"]
check = ["- ruff-fmt", "- ruff-check", "- mypy-check", "- pylint-check"]
fmt = ["ruff-fmt-fix", "ruff-check-fix"]
fix = "fmt"
lint = ["- ruff-fmt", "- mypy-check", "pylint-check"]
log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
Expand Down Expand Up @@ -353,6 +331,7 @@ filterwarnings = [
"ignore:'PYARROW_IGNORE_TIMEZONE'.*:UserWarning:pyspark.pandas.*",
# pydantic warnings
"ignore:A custom validator is returning a value other than `self`.*.*:UserWarning:pydantic.main.*",
"ignore:<module 'datetime' from .*.*:UserWarning:pydantic._internal.*",
# pyspark.sql.connect warnings
"ignore:is_datetime64tz_dtype.*:DeprecationWarning:pyspark.sql.connect.*",
"ignore:distutils.*:DeprecationWarning:pyspark.sql.connect.*",
Expand Down Expand Up @@ -453,56 +432,6 @@ features = [
# Code Quality and Style #
### ~~~~~~~~~~~~~~~~~~ ###
#
[tool.black]
line-length = 120
target-version = ['py39', 'py310', 'py311', 'py312']
include = '\.pyi?$'
extend-exclude = '''
/(
| tests/_data
)/
'''

[tool.isort]
profile = "black"
skip = [
# Skip a variety of commonly ignored directories.
".bzr",
".direnv",
".eggs",
".git",
".hg",
".mypy_cache",
".pants.d",
".pytype",
".ruff_cache",
".svn",
".venv",
".venvs",
"__pypackages__",
"__research__",
"__notebooks__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"venv",
]
force_to_top = ["__future__", "typing"]
sections = [
"FUTURE",
"STDLIB",
"THIRDPARTY",
"PYDANTIC",
"PYSPARK",
"KOHEESIO",
"FIRSTPARTY",
"LOCALFOLDER",
]
known_pydantic = ["pydantic"]
known_pyspark = ["pyspark"]
known_koheesio = ["koheesio"]

[tool.ruff]
# https://docs.astral.sh/ruff/configuration/#using-pyprojecttoml
Expand Down Expand Up @@ -539,8 +468,24 @@ exclude = [
docstring-code-format = true

[tool.ruff.lint]
# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default.
select = ["E", "F"]
select = [
## pycodestyle
"E",
## Pyflakes
"F",
## flake8-bugbear
# "B",
## flake8-annotations
# "ANN",
## pyupgrade
# "UP",
## flake8-simplify
# "SIM",
## pep8-naming
# "N",
## isort
"I",
]
ignore = [
"F405", # To avoid errors like '`ConfigDict` may be undefined, or defined from star imports: `pydantic`'
"E501", # To avoid errors like 'line too long (120 > 79 characters)' -> let Black handle this instead
Expand All @@ -549,7 +494,6 @@ ignore = [
]
# Unlike Flake8, default to a complexity level of 10.
mccabe.max-complexity = 10

# Allow autofix for all enabled rules (when `--fix` is provided).
fixable = [
"A",
Expand Down Expand Up @@ -602,6 +546,22 @@ unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

[tool.ruff.lint.isort]
force-to-top = ["__future__", "typing"]
section-order = [
"future",
"standard-library",
"third-party",
"pydantic",
"pyspark",
"first-party",
"local-folder",
]
sections.pydantic = ["pydantic"]
sections.pyspark = ["pyspark"]
detect-same-package = true
force-sort-within-sections = true

[tool.mypy]
python_version = "3.10"
files = ["koheesio/**/*.py"]
Expand Down
10 changes: 7 additions & 3 deletions src/koheesio/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
This module provides classes for asynchronous steps in the koheesio package.
"""

from typing import Dict, Union
from typing import Dict, Optional, Union
from abc import ABC
from asyncio import iscoroutine

from pydantic import PrivateAttr

from koheesio.steps import Step, StepMetaClass, StepOutput


Expand Down Expand Up @@ -65,7 +67,9 @@ def merge(self, other: Union[Dict, StepOutput]) -> "AsyncStepOutput":
--------
```python
step_output = StepOutput(foo="bar")
step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
step_output.merge(
{"lorem": "ipsum"}
) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```

Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.
Expand Down Expand Up @@ -103,4 +107,4 @@ class Output(AsyncStepOutput):
This class represents the output of the asyncio step. It inherits from the AsyncStepOutput class.
"""

__output__: Output
_output: Optional[Output] = PrivateAttr(default=None)
16 changes: 9 additions & 7 deletions src/koheesio/asyncio/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

from __future__ import annotations

from typing import Any, Dict, List, Optional, Tuple, Union
import asyncio
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union

import nest_asyncio # type: ignore[import-untyped]
import yarl
from aiohttp import BaseConnector, ClientSession, TCPConnector
from aiohttp_retry import ExponentialRetry, RetryClient, RetryOptionsBase
import nest_asyncio # type: ignore[import-untyped]
import yarl

from pydantic import Field, SecretStr, field_validator, model_validator

Expand Down Expand Up @@ -54,26 +54,28 @@ class AsyncHttpStep(AsyncStep, ExtraParamsMixin):
from yarl import URL
from typing import Dict, Any, Union, List, Tuple


# Initialize the AsyncHttpStep
async def main():
session = ClientSession()
urls = [URL('https://example.com/api/1'), URL('https://example.com/api/2')]
urls = [URL("https://example.com/api/1"), URL("https://example.com/api/2")]
retry_options = ExponentialRetry()
connector = TCPConnector(limit=10)
headers = {'Content-Type': 'application/json'}
headers = {"Content-Type": "application/json"}
step = AsyncHttpStep(
client_session=session,
url=urls,
retry_options=retry_options,
connector=connector,
headers=headers
headers=headers,
)

# Execute the step
responses_urls= await step.get()
responses_urls = await step.get()

return responses_urls


# Run the main function
responses_urls = asyncio.run(main())
```
Expand Down
4 changes: 2 additions & 2 deletions src/koheesio/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

from __future__ import annotations

import re
from typing import Any, Dict, Iterator, Union
from collections.abc import Mapping
from pathlib import Path
from typing import Any, Dict, Iterator, Union
import re

import jsonpickle # type: ignore[import-untyped]
import tomli
Expand Down
7 changes: 3 additions & 4 deletions src/koheesio/integrations/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
* Application is authorized for the enterprise (Developer Portal - MyApp - Authorization)
"""

import datetime
import re
from typing import Any, Dict, Optional, Union
from abc import ABC
from io import BytesIO, StringIO
from pathlib import PurePath
import re

import pandas as pd
from boxsdk import Client, JWTAuth
from boxsdk.object.file import File
from boxsdk.object.folder import Folder
import pandas as pd

from pyspark.sql.functions import expr, lit
from pyspark.sql.types import StructType
Expand Down Expand Up @@ -475,7 +474,7 @@ def execute(self) -> BoxReaderBase.Output:

if len(files) > 0:
self.log.info(
f"A total of {len(files)} files, that match the mask '{self.mask}' has been detected in {self.path}."
f"A total of {len(files)} files, that match the mask '{self.filter}' has been detected in {self.path}."
f" They will be loaded into Spark Dataframe: {files}"
)
else:
Expand Down
4 changes: 3 additions & 1 deletion src/koheesio/integrations/snowflake/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def test_execute(self, mock_query):
mock_query.expected_data = [("row1",), ("row2",)]

# Act
instance = SnowflakeRunQueryPython(**COMMON_OPTIONS, query=query, account="42")
instance = SnowflakeRunQueryPython(
**COMMON_OPTIONS, query=query, account="42"
)
instance.execute()

# Assert
Expand Down
26 changes: 14 additions & 12 deletions src/koheesio/integrations/spark/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
For more details on each mode, see the docstring of the SFTPWriteMode enum.
"""

import hashlib
import time
from typing import Optional, Union
from enum import Enum
import hashlib
from pathlib import Path
import time

from paramiko.sftp_client import SFTPClient
from paramiko.transport import Transport

from pydantic import PrivateAttr

from koheesio.models import (
Field,
InstanceOf,
Expand Down Expand Up @@ -152,8 +154,8 @@ class SFTPWriter(Writer):
)

# private attrs
__client__: SFTPClient
__transport__: Transport
_client: Optional[SFTPClient] = PrivateAttr(default=None)
_transport: Optional[Transport] = PrivateAttr(default=None)

@model_validator(mode="before")
def validate_path_and_file_name(cls, data: dict) -> dict:
Expand Down Expand Up @@ -203,26 +205,26 @@ def transport(self) -> Transport:

If the username and password are provided, use them to connect to the SFTP server.
"""
if not self.__transport__:
self.__transport__ = Transport((self.host, self.port))
if not self._transport:
self._transport = Transport((self.host, self.port))
if self.username and self.password:
self.__transport__.connect(
self._transport.connect(
username=self.username.get_secret_value(), password=self.password.get_secret_value()
)
else:
self.__transport__.connect()
return self.__transport__
self._transport.connect()
return self._transport

@property
def client(self) -> SFTPClient:
"""Return the SFTP client. If it doesn't exist, create it."""
if not self.__client__:
if not self._client:
try:
self.__client__ = SFTPClient.from_transport(self.transport)
self._client = SFTPClient.from_transport(self.transport)
except EOFError as e:
self.log.error(f"Failed to create SFTP client. Transport active: {self.transport.is_active()}")
raise e
return self.__client__
return self._client

def _close_client(self) -> None:
"""Close the SFTP client and transport."""
Expand Down
Loading