Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Adding typehint annotations in backend and data components and fixing mypy errors. #3709

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions ludwig/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Any, Callable, TYPE_CHECKING
from typing import Any, Callable, Generator, TYPE_CHECKING

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -89,7 +89,7 @@ def initialize_pytorch(self, *args, **kwargs):

@contextmanager
@abstractmethod
def create_trainer(self, **kwargs) -> BaseTrainer:
def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> Generator:
raise NotImplementedError()

@abstractmethod
Expand Down Expand Up @@ -146,7 +146,9 @@ def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len:
raise NotImplementedError()

@abstractmethod
def batch_transform(self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame:
def batch_transform(
self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None
) -> DataFrame:
"""Applies `transform_fn` to every `batch_size` length batch of `df` and returns the result."""
raise NotImplementedError()

Expand All @@ -171,7 +173,9 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si
with ThreadPoolExecutor() as executor: # number of threads is inferred
if isinstance(sample_fname, str):
if map_fn is read_audio_from_path: # bypass torchaudio issue that no longer takes in file-like objects
result = executor.map(lambda path: map_fn(path) if path is not None else path, column.values)
result = executor.map( # type: ignore[misc]
lambda path: map_fn(path) if path is not None else path, column.values
)
else:
result = executor.map(
lambda path: get_bytes_obj_from_path(path) if path is not None else path, column.values
Expand All @@ -186,7 +190,7 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si
return pd.Series(result, index=column.index, name=column.name)

@staticmethod
def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame:
def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None) -> DataFrame:
name = name or "Batch Transform"
batches = to_batches(df, batch_size)
transform = transform_fn()
Expand All @@ -204,21 +208,11 @@ def initialize():
def initialize_pytorch(*args, **kwargs):
initialize_pytorch(*args, **kwargs)

def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> BaseTrainer:
from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry

if model.type() == MODEL_LLM:
trainer_cls = get_from_registry(config.type, get_llm_trainers_registry())
else:
trainer_cls = get_from_registry(model.type(), get_trainers_registry())

return trainer_cls(config=config, model=model, **kwargs)

@staticmethod
def create_predictor(model: BaseModel, **kwargs):
from ludwig.models.predictor import get_predictor_cls

return get_predictor_cls(model.type())(model, **kwargs)
return get_predictor_cls(model.type())(model, **kwargs) # type: ignore[call-arg]

def sync_model(self, model):
pass
Expand Down Expand Up @@ -254,14 +248,16 @@ def is_coordinator() -> bool:
class LocalBackend(LocalPreprocessingMixin, LocalTrainingMixin, Backend):
BACKEND_TYPE = "local"

_shared_instance: LocalBackend

@classmethod
def shared_instance(cls):
def shared_instance(cls) -> LocalBackend:
"""Returns a shared singleton LocalBackend instance."""
if not hasattr(cls, "_shared_instance"):
cls._shared_instance = cls()
return cls._shared_instance

def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None:
super().__init__(dataset_manager=PandasDatasetManager(self), **kwargs)

@property
Expand All @@ -280,6 +276,22 @@ def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | No
# trial resources it wants, because there is no Ray Datasets process to compete with it for CPUs.
return None

def create_trainer(
self,
config: BaseTrainerConfig,
model: BaseModel,
**kwargs,
) -> BaseTrainer: # type: ignore[override]
from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry

trainer_cls: type
if model.type() == MODEL_LLM:
trainer_cls = get_from_registry(config.type, get_llm_trainers_registry())
else:
trainer_cls = get_from_registry(model.type(), get_trainers_registry())

return trainer_cls(config=config, model=model, **kwargs)


@DeveloperAPI
class DataParallelBackend(LocalPreprocessingMixin, Backend, ABC):
Expand All @@ -298,15 +310,20 @@ def initialize_pytorch(self, *args, **kwargs):
*args, local_rank=self._distributed.local_rank(), local_size=self._distributed.local_size(), **kwargs
)

def create_trainer(self, **kwargs) -> BaseTrainer:
def create_trainer(
self,
config: BaseTrainerConfig,
model: BaseModel,
**kwargs,
) -> BaseTrainer: # type: ignore[override]
from ludwig.trainers.trainer import Trainer

return Trainer(distributed=self._distributed, **kwargs)

def create_predictor(self, model: BaseModel, **kwargs):
from ludwig.models.predictor import get_predictor_cls

return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs)
return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) # type: ignore[call-arg]

def sync_model(self, model):
# Model weights are only saved on the coordinator, so broadcast
Expand Down
23 changes: 13 additions & 10 deletions ludwig/data/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# limitations under the License.
# ==============================================================================

from __future__ import annotations

import contextlib
from abc import ABC, abstractmethod
from typing import Iterable, Optional
from typing import Iterable

from ludwig.data.batcher.base import Batcher
from ludwig.distributed import DistributedStrategy
from ludwig.features.base_feature import BaseFeature
from ludwig.utils.defaults import default_random_seed
Expand All @@ -26,7 +29,7 @@

class Dataset(ABC):
@abstractmethod
def __len__(self):
def __len__(self) -> int:
raise NotImplementedError()

@contextlib.contextmanager
Expand All @@ -38,36 +41,36 @@ def initialize_batcher(
random_seed: int = default_random_seed,
ignore_last: bool = False,
distributed: DistributedStrategy = None,
):
) -> Batcher:
raise NotImplementedError()

@abstractmethod
def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame:
def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame:
raise NotImplementedError()

@abstractmethod
def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame:
def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame:
raise NotImplementedError()

@property
def in_memory_size_bytes(self):
def in_memory_size_bytes(self) -> int:
raise NotImplementedError()


class DatasetManager(ABC):
@abstractmethod
def create(self, dataset, config, training_set_metadata):
def create(self, dataset, config, training_set_metadata) -> Dataset:
raise NotImplementedError()

@abstractmethod
def save(self, cache_path, dataset, config, training_set_metadata, tag):
def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset:
raise NotImplementedError()

@abstractmethod
def can_cache(self, skip_save_processed_input):
def can_cache(self, skip_save_processed_input) -> bool:
raise NotImplementedError()

@property
@abstractmethod
def data_format(self):
def data_format(self) -> str:
raise NotImplementedError()
33 changes: 20 additions & 13 deletions ludwig/data/dataset/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from __future__ import annotations

import contextlib
from typing import Iterable, Optional
from typing import Iterable, TYPE_CHECKING

import numpy as np
from pandas import DataFrame

from ludwig.constants import PREPROCESSING, TRAINING
from ludwig.data.batcher.base import Batcher
from ludwig.data.batcher.random_access import RandomAccessBatcher
from ludwig.data.dataset.base import Dataset, DatasetManager
from ludwig.data.sampler import DistributedSampler
Expand All @@ -31,6 +35,9 @@
from ludwig.utils.fs_utils import download_h5
from ludwig.utils.misc_utils import get_proc_features

if TYPE_CHECKING:
from ludwig.backend.base import Backend


class PandasDataset(Dataset):
def __init__(self, dataset, features, data_hdf5_fp):
Expand All @@ -42,13 +49,13 @@ def __init__(self, dataset, features, data_hdf5_fp):
dataset = load_hdf5(dataset)
self.dataset = to_numpy_dataset(dataset)

def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame:
def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame:
"""Convert the dataset to a Pandas DataFrame."""
if features:
return from_numpy_dataset({feature.feature_name: self.dataset[feature.proc_column] for feature in features})
return from_numpy_dataset(self.dataset)

def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame:
def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame:
return to_scalar_df(self.to_df(features))

def get(self, proc_column, idx=None):
Expand Down Expand Up @@ -76,18 +83,18 @@ def get(self, proc_column, idx=None):
indices = indices[:, np.argsort(indices[1])]
return im_data[indices[2, :]]

def get_dataset(self):
def get_dataset(self) -> dict[str, np.ndarray]:
return self.dataset

def __len__(self):
return self.size

@property
def processed_data_fp(self) -> Optional[str]:
def processed_data_fp(self) -> str | None:
return self.data_hdf5_fp

@property
def in_memory_size_bytes(self):
def in_memory_size_bytes(self) -> int:
df = self.to_df()
return df.memory_usage(deep=True).sum() if df is not None else 0

Expand All @@ -100,7 +107,7 @@ def initialize_batcher(
ignore_last: bool = False,
distributed: DistributedStrategy = None,
augmentation_pipeline=None,
):
) -> Batcher:
sampler = DistributedSampler(
len(self), shuffle=should_shuffle, random_seed=random_seed, distributed=distributed
)
Expand All @@ -115,21 +122,21 @@ def initialize_batcher(


class PandasDatasetManager(DatasetManager):
def __init__(self, backend):
self.backend = backend
def __init__(self, backend: Backend):
self.backend: Backend = backend

def create(self, dataset, config, training_set_metadata):
def create(self, dataset, config, training_set_metadata) -> Dataset:
return PandasDataset(dataset, get_proc_features(config), training_set_metadata.get(DATA_TRAIN_HDF5_FP))

def save(self, cache_path, dataset, config, training_set_metadata, tag):
def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset:
save_hdf5(cache_path, dataset)
if tag == TRAINING:
training_set_metadata[DATA_TRAIN_HDF5_FP] = cache_path
return dataset

def can_cache(self, skip_save_processed_input):
def can_cache(self, skip_save_processed_input) -> bool:
return self.backend.is_coordinator() and not skip_save_processed_input

@property
def data_format(self):
def data_format(self) -> str:
return "hdf5"