From 12c38d031738a1c64a54670c3805d86a43c7f90d Mon Sep 17 00:00:00 2001 From: Mathis Chenuet <9201969+artemisart@users.noreply.github.com> Date: Mon, 22 Jan 2024 11:23:28 +0100 Subject: [PATCH] pyupgrade, typing --- docs/conf.py | 1 - functional/execution.py | 6 +-- functional/io.py | 31 +++++------ functional/lineage.py | 13 +++-- functional/pipeline.py | 87 ++++++++++++++---------------- functional/streams.py | 6 +-- functional/test/test_functional.py | 4 +- functional/test/test_streams.py | 8 +-- functional/transformations.py | 3 +- functional/util.py | 33 +++++++----- 10 files changed, 99 insertions(+), 93 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 74d1f6b..ef91773 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # ScalaFunctional documentation build configuration file, created by # sphinx-quickstart on Wed Mar 11 23:00:20 2015. diff --git a/functional/execution.py b/functional/execution.py index 9088673..62b3dee 100644 --- a/functional/execution.py +++ b/functional/execution.py @@ -2,7 +2,7 @@ from functional.util import compose, parallelize -class ExecutionStrategies(object): +class ExecutionStrategies: """ Enum like object listing the types of execution strategies. """ @@ -11,7 +11,7 @@ class ExecutionStrategies(object): PARALLEL = 1 -class ExecutionEngine(object): +class ExecutionEngine: """ Class to perform serial execution of a Sequence evaluation. """ @@ -43,7 +43,7 @@ def __init__(self, processes=None, partition_size=None): Set the number of processes for parallel execution. :param processes: Number of parallel Processes """ - super(ParallelExecutionEngine, self).__init__() + super().__init__() self.processes = processes self.partition_size = partition_size diff --git a/functional/io.py b/functional/io.py index 91696f9..a8a7c93 100644 --- a/functional/io.py +++ b/functional/io.py @@ -1,18 +1,19 @@ -import gzip -import lzma +import builtins import bz2 +import gzip import io -import builtins - -from typing import Optional, Generic, TypeVar, Any +import lzma +from os import PathLike +from typing import Any, Optional, TypeAlias +# from typeshed +StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] +FileDescriptorOrPath: TypeAlias = int | StrOrBytesPath WRITE_MODE = "wt" -_FileConv_co = TypeVar("_FileConv_co", covariant=True) - -class ReusableFile(Generic[_FileConv_co]): +class ReusableFile: """ Class which emulates the builtin file except that calling iter() on it will return separate iterators on different file handlers (which are automatically closed when iteration stops). This @@ -23,7 +24,7 @@ class ReusableFile(Generic[_FileConv_co]): # pylint: disable=too-many-instance-attributes def __init__( self, - path: str, + path: FileDescriptorOrPath, delimiter: Optional[str] = None, mode: str = "r", buffering: int = -1, @@ -95,7 +96,7 @@ def __init__( errors: Optional[str] = None, newline: Optional[str] = None, ): - super(CompressedFile, self).__init__( + super().__init__( path, delimiter=delimiter, mode=mode, @@ -126,7 +127,7 @@ def __init__( errors: Optional[str] = None, newline: Optional[str] = None, ): - super(GZFile, self).__init__( + super().__init__( path, delimiter=delimiter, mode=mode, @@ -181,7 +182,7 @@ def __init__( errors: Optional[str] = None, newline: Optional[str] = None, ): - super(BZ2File, self).__init__( + super().__init__( path, delimiter=delimiter, mode=mode, @@ -234,7 +235,7 @@ def __init__( filters=None, format=None, ): - super(XZFile, self).__init__( + super().__init__( path, delimiter=delimiter, mode=mode, @@ -278,7 +279,7 @@ def read(self): return file_content.read() -COMPRESSION_CLASSES = [GZFile, BZ2File, XZFile] +COMPRESSION_CLASSES: list[type[CompressedFile]] = [GZFile, BZ2File, XZFile] N_COMPRESSION_CHECK_BYTES = max(len(cls.magic_bytes) for cls in COMPRESSION_CLASSES) # type: ignore @@ -288,7 +289,7 @@ def get_read_function(filename: str, disable_compression: bool): with open(filename, "rb") as f: start_bytes = f.read(N_COMPRESSION_CHECK_BYTES) for cls in COMPRESSION_CLASSES: - if cls.is_compressed(start_bytes): # type: ignore + if cls.is_compressed(start_bytes): return cls return ReusableFile diff --git a/functional/lineage.py b/functional/lineage.py index fb9e266..237af4f 100644 --- a/functional/lineage.py +++ b/functional/lineage.py @@ -1,13 +1,20 @@ +from __future__ import annotations + +from typing import Optional from functional.execution import ExecutionEngine from functional.transformations import CACHE_T -class Lineage(object): +class Lineage: """ Class for tracking the lineage of transformations, and applying them to a given sequence. """ - def __init__(self, prior_lineage=None, engine=None): + def __init__( + self, + prior_lineage: Optional[Lineage] = None, + engine: Optional[ExecutionEngine] = None, + ): """ Construct an empty lineage if prior_lineage is None or if its not use it as the list of current transformations @@ -16,7 +23,7 @@ def __init__(self, prior_lineage=None, engine=None): :return: new Lineage object """ self.transformations = ( - [] if prior_lineage is None else list(prior_lineage.transformations) + [] if prior_lineage is None else prior_lineage.transformations ) self.engine = ( (engine or ExecutionEngine()) diff --git a/functional/pipeline.py b/functional/pipeline.py index 0eb0234..1bda396 100644 --- a/functional/pipeline.py +++ b/functional/pipeline.py @@ -1,44 +1,49 @@ """ The pipeline module contains the transformations and actions API of PyFunctional """ -from operator import mul, add import collections -from functools import reduce, wraps, partial - -import json import csv -import sqlite3 +import json import re - +import sqlite3 from collections.abc import Iterable -from typing import List, Optional, Tuple, Union +from functools import partial, reduce, wraps +from operator import add, mul +from typing import Any, Callable, Generic, Optional, TypeVar, Union, reveal_type from tabulate import tabulate -from functional.execution import ExecutionEngine +from functional import transformations +from functional.execution import ExecutionEngine, ExecutionStrategies +from functional.io import WRITE_MODE, universal_write_open from functional.lineage import Lineage from functional.util import ( + default_value, + identity, is_iterable, - is_primitive, is_namedtuple, + is_primitive, is_tabulatable, - identity, - default_value, ) -from functional.io import WRITE_MODE, universal_write_open -from functional import transformations -from functional.execution import ExecutionStrategies + +T = TypeVar('T') -class Sequence(object): +class Sequence(Generic[T]): """ Sequence is a wrapper around any type of sequence which provides access to common functional transformations and reductions in a data pipeline style """ + engine: ExecutionEngine + _max_repr_items: Optional[int] + _base_sequence: Iterable[T] + _lineage: Lineage + no_wrap: Optional[bool] + def __init__( self, - sequence: Iterable, + sequence: Iterable[T], transform=None, engine: Optional[ExecutionEngine] = None, max_repr_items: Optional[int] = None, @@ -62,7 +67,7 @@ def __init__( self._max_repr_items: Optional[int] = ( max_repr_items or sequence._max_repr_items ) - self._base_sequence: Union[Iterable, List, Tuple] = sequence._base_sequence + self._base_sequence: Union[Iterable, list, tuple] = sequence._base_sequence self._lineage: Lineage = Lineage( prior_lineage=sequence._lineage, engine=engine ) @@ -432,7 +437,7 @@ def drop_while(self, func): """ return self._transform(transformations.drop_while_t(func)) - def take(self, n): + def take(self, n: int): """ Take the first n elements of the sequence. @@ -963,7 +968,7 @@ def reduce(self, func, *initial): :param initial: single optional argument acting as initial value :return: reduced value using func """ - if len(initial) == 0: + if not initial: return _wrap(reduce(func, self)) elif len(initial) == 1: return _wrap(reduce(func, self, initial[0])) @@ -987,7 +992,7 @@ def accumulate(self, func=add): """ return self._transform(transformations.accumulate_t(func)) - def make_string(self, separator): + def make_string(self, separator: str) -> str: """ Concatenate the elements of the sequence into a string separated by separator. @@ -1016,20 +1021,10 @@ def product(self, projection=None): :return: product of elements in sequence """ if self.empty(): - if projection: - return projection(1) - else: - return 1 + return projection(1) if projection else 1 if self.size() == 1: - if projection: - return projection(self.first()) - else: - return self.first() - - if projection: - return self.map(projection).reduce(mul) - else: - return self.reduce(mul) + return projection(self.first()) if projection else self.first() + return (self.map(projection) if projection else self).reduce(mul) def sum(self, projection=None): """ @@ -1044,10 +1039,7 @@ def sum(self, projection=None): :param projection: function to project on the sequence before taking the sum :return: sum of elements in sequence """ - if projection: - return sum(self.map(projection)) - else: - return sum(self) + return sum(self.map(projection) if projection else self) def average(self, projection=None): """ @@ -1062,10 +1054,7 @@ def average(self, projection=None): :return: average of elements in the sequence """ length = self.size() - if projection: - return sum(self.map(projection)) / length - else: - return sum(self) / length + return sum(self.map(projection) if projection else self) / length def aggregate(self, *args): """ @@ -1377,7 +1366,7 @@ def slice(self, start, until): """ return self._transform(transformations.slice_t(start, until)) - def to_list(self, n=None): + def to_list(self, n: Optional[int] = None) -> list[T]: """ Converts sequence to list of elements. @@ -1399,7 +1388,7 @@ def to_list(self, n=None): else: return self.cache().take(n).list() - def list(self, n=None): + def list(self, n: Optional[int] = None) -> list[T]: """ Converts sequence to list of elements. @@ -1417,7 +1406,7 @@ def list(self, n=None): """ return self.to_list(n=n) - def to_set(self): + def to_set(self) -> set[T]: """ Converts sequence to a set of elements. @@ -1434,7 +1423,7 @@ def to_set(self): """ return set(self.sequence) - def set(self): + def set(self) -> set[T]: """ Converts sequence to a set of elements. @@ -1828,7 +1817,13 @@ def _wrap(value): return value -def extend(func=None, aslist=False, final=False, name=None, parallel=False): +def extend( + func: Optional[Callable[[Any], Any]] = None, + aslist: bool = False, + final: bool = False, + name: str = '', + parallel: bool = False, +): """ Function decorator for adding new methods to the Sequence class. diff --git a/functional/streams.py b/functional/streams.py index e1340a9..dfe3c42 100644 --- a/functional/streams.py +++ b/functional/streams.py @@ -13,7 +13,7 @@ from functional.io import get_read_function -class Stream(object): +class Stream: """ Represents and implements a stream which separates the responsibilities of Sequence and ExecutionEngine. @@ -318,9 +318,7 @@ def __init__( :param disable_compression: Disable file compression detection :param no_wrap: default value of no_wrap for functions like first() or last() """ - super(ParallelStream, self).__init__( - disable_compression=disable_compression, no_wrap=no_wrap - ) + super().__init__(disable_compression=disable_compression, no_wrap=no_wrap) self.processes = processes self.partition_size = partition_size diff --git a/functional/test/test_functional.py b/functional/test/test_functional.py index 732f6e6..5446dbf 100644 --- a/functional/test/test_functional.py +++ b/functional/test/test_functional.py @@ -951,7 +951,7 @@ def test_wrap(self): self.assert_not_type(_wrap(Data(1, 2))) def test_wrap_objects(self): - class A(object): + class A: a = 1 l = [A(), A(), A()] @@ -1031,7 +1031,7 @@ def test_tabulate(self): sequence = seq(1, 2, 3) self.assertEqual(sequence.tabulate(), None) - class NotTabulatable(object): + class NotTabulatable: pass sequence = seq(NotTabulatable(), NotTabulatable(), NotTabulatable()) diff --git a/functional/test/test_streams.py b/functional/test/test_streams.py index 6ec5e6d..8b2b1fb 100644 --- a/functional/test/test_streams.py +++ b/functional/test/test_streams.py @@ -122,7 +122,7 @@ def test_csv(self): result = self.seq.csv("functional/test/data/test.csv").to_list() expect = [["1", "2", "3", "4"], ["a", "b", "c", "d"]] self.assertEqual(expect, result) - with open("functional/test/data/test.csv", "r", encoding="utf8") as csv_file: + with open("functional/test/data/test.csv", encoding="utf8") as csv_file: self.assertEqual(expect, self.seq.csv(csv_file).to_list()) with self.assertRaises(ValueError): self.seq.csv(1) @@ -138,7 +138,7 @@ def test_csv_dict_reader(self): self.assertEqual(result[1]["b"], "5") self.assertEqual(result[1]["c"], "6") - with open("functional/test/data/test_header.csv", "r", encoding="utf8") as f: + with open("functional/test/data/test_header.csv", encoding="utf8") as f: result = self.seq.csv_dict_reader(f).to_list() self.assertEqual(result[0]["a"], "1") self.assertEqual(result[0]["b"], "2") @@ -316,11 +316,11 @@ def test_to_file(self): tmp_path = "functional/test/data/tmp/output.txt" sequence = self.seq(1, 2, 3, 4) sequence.to_file(tmp_path) - with open(tmp_path, "r", encoding="utf8") as output: + with open(tmp_path, encoding="utf8") as output: self.assertEqual("[1, 2, 3, 4]", output.readlines()[0]) sequence.to_file(tmp_path, delimiter=":") - with open(tmp_path, "r", encoding="utf8") as output: + with open(tmp_path, encoding="utf8") as output: self.assertEqual("1:2:3:4", output.readlines()[0]) def test_to_file_compressed(self): diff --git a/functional/transformations.py b/functional/transformations.py index f13481a..67b8c28 100644 --- a/functional/transformations.py +++ b/functional/transformations.py @@ -260,8 +260,7 @@ def flat_map_impl(func: Callable, sequence): :return: flat_map generator """ for element in sequence: - for value in func(element): - yield value + yield from func(element) def flat_map_t(func): diff --git a/functional/util.py b/functional/util.py index 2cac321..4302733 100644 --- a/functional/util.py +++ b/functional/util.py @@ -3,16 +3,18 @@ from functools import reduce from itertools import chain, count, islice, takewhile from multiprocessing import Pool, cpu_count -from typing import Any +from typing import Callable, Optional, Sized, TypeVar import dill as serializer # type: ignore +T = TypeVar('T') +U = TypeVar('U') PROTOCOL = serializer.HIGHEST_PROTOCOL CPU_COUNT = cpu_count() -def is_primitive(val): +def is_primitive(val: object) -> bool: """ Checks if the passed value is a primitive type. @@ -39,7 +41,7 @@ def is_primitive(val): return isinstance(val, (str, bool, float, complex, bytes, int)) -def is_namedtuple(val): +def is_namedtuple(val: object) -> bool: """ Use Duck Typing to check if val is a named tuple. Checks that val is of type tuple and contains the attribute _fields which is defined for named tuples. @@ -54,7 +56,7 @@ def is_namedtuple(val): return all(isinstance(n, str) for n in fields) -def identity(arg): +def identity(arg: T) -> T: """ Function which returns the argument. Used as a default lambda function. @@ -68,7 +70,7 @@ def identity(arg): return arg -def is_iterable(val): +def is_iterable(val: object) -> bool: """ Check if val is not a list, but is a Iterable type. This is used to determine when list() should be called on val @@ -91,7 +93,7 @@ def is_tabulatable(val: object) -> bool: ) -def split_every(parts, iterable): +def split_every(parts: int, iterable: Iterable[T]) -> Iterable[list[T]]: """ Split an iterable into parts of length parts @@ -106,7 +108,7 @@ def split_every(parts, iterable): return takewhile(bool, (list(islice(iterable, parts)) for _ in count())) -def unpack(packed): +def unpack(packed: bytes) -> Optional[list]: """ Unpack the function and args then apply the function to the arguments and return result :param packed: input packed tuple of (func, args) @@ -119,7 +121,7 @@ def unpack(packed): return None -def pack(func, args): +def pack(func: Callable, args: Iterable) -> bytes: """ Pack a function and the args it should be applied to :param func: Function to apply @@ -144,9 +146,14 @@ def parallelize(func, result, processes=None, partition_size=None): return chain.from_iterable(parallel_iter) -def lazy_parallelize(func, result, processes=None, partition_size=None): +def lazy_parallelize( + func: Callable[[T], U], + result: Iterable[T], + processes: Optional[int] = None, + partition_size: Optional[int] = None, +) -> Iterable[list[U]]: """ - Lazily computes an iterable in parallel, and returns them in pool chunks + Lazily computes an map in parallel, and returns them in pool chunks :param func: Function to apply :param result: Data to apply to :param processes: Number of processes to use in parallel @@ -164,7 +171,7 @@ def lazy_parallelize(func, result, processes=None, partition_size=None): yield from pool.imap(unpack, packed_partitions) -def compute_partition_size(result, processes): +def compute_partition_size(result: Sized, processes: int) -> int: """ Attempts to compute the partition size to evenly distribute work across processes. Defaults to 1 if the length of result cannot be determined. @@ -179,7 +186,7 @@ def compute_partition_size(result, processes): return 1 -def compose(*functions): +def compose(*functions: Callable) -> Callable: """ Compose all the function arguments together :param functions: Functions to compose @@ -189,7 +196,7 @@ def compose(*functions): return reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x) -def default_value(*vals: Any): +def default_value(*vals: Optional[bool]): for val in vals: if val is not None: return val