Skip to content

Commit

Permalink
remove the need for local backend
Browse files Browse the repository at this point in the history
  • Loading branch information
jppgks committed Sep 14, 2022
1 parent 6b5c9f8 commit 4f6e168
Showing 1 changed file with 42 additions and 42 deletions.
84 changes: 42 additions & 42 deletions ludwig/data/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,28 @@ def get_schema_cls():
return FixedSplitConfig


def _get_partition_splitter(splitter: Splitter, random_seed: float) -> Callable:
"""Returns a function that splits a single partition into train, val, test.
def stratify_split_dataframe(
df: DataFrame, column: str, probabilities: List[float], random_seed: float
) -> Tuple[DataFrame, DataFrame, DataFrame]:
"""Splits a dataframe into train, validation, and test sets based on the values of a column.
The function returns a single DataFrame with the split column populated. Assumes that the split column is already
present in the partition and has a default value of 0 (train).
The column must be categorical (including binary). The split is stratified, meaning that the proportion of each
category in each split is the same as in the original dataset.
"""
single_partition_backend = initialize_backend(LOCAL)
frac_train, frac_val, frac_test = probabilities

def split_partition(part: DataFrame) -> DataFrame:
# Split the partition via the non-partitioned code path
_, val, test = splitter.split(part, single_partition_backend, random_seed=random_seed)
# Split column defaults to 0 (train), so only need to update val and test
part.loc[val.index, TMP_SPLIT_COL] = 1
part.loc[test.index, TMP_SPLIT_COL] = 2
return part
# Dataframe of just the column on which to stratify
y = df[[column]].astype(np.int8)
df_train, df_temp, _, y_temp = train_test_split(
df, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_seed
)
# Split the temp dataframe into val and test dataframes.
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, _, _ = train_test_split(
df_temp, y_temp, stratify=y_temp, test_size=relative_frac_test, random_state=random_seed
)

return split_partition
return df_train, df_val, df_test


@split_registry.register("stratify")
Expand All @@ -136,35 +141,30 @@ def __init__(self, column: str, probabilities: List[float] = DEFAULT_PROBABILITI
def split(
self, df: DataFrame, backend: Backend, random_seed: float = default_random_seed
) -> Tuple[DataFrame, DataFrame, DataFrame]:
if backend.df_engine.partitioned:
# For a partitioned dataset, we can stratify split each partition individually
# to obtain a global stratified split.

# Default to 0 for train
df[TMP_SPLIT_COL] = 0

# Recursively split each partition
partition_splitter = _get_partition_splitter(splitter=self, random_seed=random_seed)
df = backend.df_engine.map_partitions(df, partition_splitter, meta=df)

df_train = df[df[TMP_SPLIT_COL] == 0].drop(columns=TMP_SPLIT_COL)
df_val = df[df[TMP_SPLIT_COL] == 1].drop(columns=TMP_SPLIT_COL)
df_test = df[df[TMP_SPLIT_COL] == 2].drop(columns=TMP_SPLIT_COL)

return df_train, df_val, df_test

frac_train, frac_val, frac_test = self.probabilities

# Dataframe of just the column on which to stratify
y = df[[self.column]].astype(np.int8)
df_train, df_temp, _, y_temp = train_test_split(
df, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_seed
)
# Split the temp dataframe into val and test dataframes.
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, _, _ = train_test_split(
df_temp, y_temp, stratify=y_temp, test_size=relative_frac_test, random_state=random_seed
)
if not backend.df_engine.partitioned:
return stratify_split_dataframe(df, self.column, self.probabilities, random_seed)

# For a partitioned dataset, we can stratify split each partition individually
# to obtain a global stratified split.

def split_partition(partition: DataFrame) -> DataFrame:
"""Splits a single partition into train, val, test.
Returns a single DataFrame with the split column populated. Assumes that the split column is already present
in the partition and has a default value of 0 (train).
"""
_, val, test = stratify_split_dataframe(partition, self.column, self.probabilities, random_seed)
# Split column defaults to train, so only need to update val and test
partition.loc[val.index, TMP_SPLIT_COL] = 1
partition.loc[test.index, TMP_SPLIT_COL] = 2
return partition

df[TMP_SPLIT_COL] = 0
df = backend.df_engine.map_partitions(df, split_partition, meta=df)

df_train = df[df[TMP_SPLIT_COL] == 0].drop(columns=TMP_SPLIT_COL)
df_val = df[df[TMP_SPLIT_COL] == 1].drop(columns=TMP_SPLIT_COL)
df_test = df[df[TMP_SPLIT_COL] == 2].drop(columns=TMP_SPLIT_COL)

return df_train, df_val, df_test

Expand Down

0 comments on commit 4f6e168

Please sign in to comment.