diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index cc362d49659..c857c2eb51b 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -108,6 +108,7 @@ STATA_FORMATS, TSV_FORMATS, ) +from ludwig.utils.dataframe_utils import is_dask_series_or_df from ludwig.utils.defaults import default_preprocessing_parameters, default_random_seed from ludwig.utils.fs_utils import file_lock, path_exists from ludwig.utils.misc_utils import get_from_registry, merge_dict @@ -1487,7 +1488,13 @@ def precompute_fill_value(dataset_cols, feature, preprocessing_parameters: Prepr if missing_value_strategy == FILL_WITH_CONST: return preprocessing_parameters["fill_value"] elif missing_value_strategy == FILL_WITH_MODE: - return dataset_cols[feature[COLUMN]].value_counts().index[0] + # Requires separate handling if Dask since Dask has lazy evaluation + # Otherwise, dask returns a Dask index structure instead of a value to use as a fill value + return ( + dataset_cols[feature[COLUMN]].value_counts().index.compute()[0] + if is_dask_series_or_df(dataset_cols[feature[COLUMN]], backend) + else dataset_cols[feature[COLUMN]].value_counts().index[0] + ) elif missing_value_strategy == FILL_WITH_MEAN: if feature[TYPE] != NUMBER: raise ValueError( diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index d581525b0bf..f27fed2bbc2 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -540,3 +540,34 @@ def test_vit_encoder_different_dimension_image(tmpdir, csv_filename, use_pretrai # Failure happens post preprocessing but before training during the ECD model creation phase # so make sure the model can be created properly and training can proceed model.train(dataset=data_csv) + + +@pytest.mark.parametrize( + "df_engine", + [ + pytest.param("pandas", id="pandas"), + pytest.param("dask", id="dask", marks=pytest.mark.distributed), + ], +) +def test_fill_with_mode_different_df_engine(tmpdir, csv_filename, df_engine, ray_cluster_2cpu): + config = { + "input_features": [category_feature(preprocessing={"missing_value_strategy": "fill_with_mode"})], + "output_features": [binary_feature()], + } + + training_data_csv_path = generate_data( + config["input_features"], config["output_features"], os.path.join(tmpdir, csv_filename) + ) + + df = pd.read_csv(training_data_csv_path) + + if df_engine == "dask": + import dask.dataframe as dd + + df = dd.from_pandas(df, npartitions=1) + + # Only support Dask on Ray backend + config["backend"] = {"type": "ray"} + + ludwig_model = LudwigModel(config) + ludwig_model.preprocess(dataset=df)