Skip to content

Commit

Permalink
fix: make clean static again due to multiprocessing and model issue (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JGSweets authored Aug 6, 2021
1 parent ae9dcc9 commit 7b40438
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
29 changes: 22 additions & 7 deletions dataprofiler/profilers/profile_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def __init__(self, df_series=None, sample_size=None, min_sample_size=5000,
clean_sampled_df, base_stats = \
self.clean_data_and_get_base_stats(
df_series=df_series, sample_size=sample_size,
min_true_samples=self._min_true_samples, sample_ids=sample_ids)
null_values=self._null_values,
min_true_samples=self._min_true_samples,
sample_ids=sample_ids)
self.update_column_profilers(clean_sampled_df, pool)
self._update_base_stats(base_stats)

Expand Down Expand Up @@ -382,6 +384,7 @@ def update_profile(self, df_series, sample_size=None,
clean_sampled_df, base_stats = \
self.clean_data_and_get_base_stats(
df_series=df_series, sample_size=sample_size,
null_values=self._null_values,
min_true_samples=min_true_samples, sample_ids=sample_ids)

self._update_base_stats(base_stats)
Expand All @@ -403,8 +406,8 @@ def _get_sample_size(self, df_series):

# TODO: flag column name with null values and potentially return row
# index number in the error as well

def clean_data_and_get_base_stats(self, df_series, sample_size,
@staticmethod
def clean_data_and_get_base_stats(df_series, sample_size, null_values=None,
min_true_samples=None,
sample_ids=None):
"""
Expand All @@ -415,6 +418,10 @@ def clean_data_and_get_base_stats(self, df_series, sample_size,
:type df_series: pandas.core.series.Series
:param sample_size: Number of samples to use in generating the profile
:type sample_size: int
:param null_values: Dictionary mapping null values to regex flag where
the key represents the null value to remove from the data and the
flag represents the regex flag to apply
:type null_values: dict[str, re.FLAG]
:param min_true_samples: Minimum number of samples required for the
profiler
:type min_true_samples: int
Expand All @@ -428,6 +435,9 @@ def clean_data_and_get_base_stats(self, df_series, sample_size,
if min_true_samples is None:
min_true_samples = 0

if null_values is None:
null_values = dict()

len_df = len(df_series)
if not len_df:
return df_series, {
Expand Down Expand Up @@ -467,7 +477,7 @@ def clean_data_and_get_base_stats(self, df_series, sample_size,
na_columns = dict()
true_sample_list = set()
total_sample_size = 0
query = '|'.join(self._null_values.keys())
query = '|'.join(null_values.keys())
regex = f"^(?:{(query)})$"
for chunked_sample_ids in sample_ind_generator:
total_sample_size += len(chunked_sample_ids)
Expand Down Expand Up @@ -1959,9 +1969,10 @@ def tqdm(l):
if min_true_samples is None:
min_true_samples = self._profile[prof_idx]._min_true_samples
try:
null_values = self._profile[prof_idx]._null_values
multi_process_dict[col_idx] = pool.apply_async(
self._profile[prof_idx].clean_data_and_get_base_stats,
(col_ser, sample_size, min_true_samples,
(col_ser, sample_size, null_values, min_true_samples,
sample_ids))
except Exception as e:
logger.info(e)
Expand Down Expand Up @@ -1989,9 +2000,10 @@ def tqdm(l):
if min_true_samples is None:
min_true_samples = \
self._profile[prof_idx]._min_true_samples
null_values = self._profile[prof_idx]._null_values
clean_sampled_dict[prof_idx], base_stats = \
self._profile[prof_idx].clean_data_and_get_base_stats(
col_ser, sample_size,
col_ser, sample_size, null_values,
min_true_samples, sample_ids)
self._profile[prof_idx]._update_base_stats(base_stats)

Expand All @@ -2005,9 +2017,11 @@ def tqdm(l):
prof_idx = col_idx_to_prof_idx[col_idx]
if min_true_samples is None:
min_true_samples = self._profile[prof_idx]._min_true_samples
null_values = self._profile[prof_idx]._null_values
clean_sampled_dict[prof_idx], base_stats = \
self._profile[prof_idx].clean_data_and_get_base_stats(
df_series=col_ser, sample_size=sample_size,
null_values=null_values,
min_true_samples=min_true_samples,
sample_ids=sample_ids
)
Expand Down Expand Up @@ -2037,7 +2051,8 @@ def tqdm(l):
samples_for_row_stats = np.concatenate(sample_ids)

if self.options.correlation.is_enabled:
self._update_correlation(clean_sampled_dict, corr_prev_dependent_properties)
self._update_correlation(clean_sampled_dict,
corr_prev_dependent_properties)
self._update_row_statistics(data, samples_for_row_stats)

def save(self, filepath=None):
Expand Down
12 changes: 6 additions & 6 deletions dataprofiler/tests/profilers/test_profile_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ def test_clean_data_and_get_base_stats(self, *mocks):

#Tests with default null values set
profiler = mock.Mock(spec=StructuredColProfiler)
profiler._null_values = {
null_values = {
"": 0,
"nan": re.IGNORECASE,
"none": re.IGNORECASE,
Expand All @@ -1563,7 +1563,7 @@ def test_clean_data_and_get_base_stats(self, *mocks):
test_utils.set_seed(seed=0)
df_series, base_stats = \
StructuredColProfiler.clean_data_and_get_base_stats(
profiler, df_series=data[1:], sample_size=6,
df_series=data[1:], sample_size=6, null_values=null_values,
min_true_samples=0)
# note data above is a subset `df_series=data[1:]`, 1.0 will not exist
self.assertTrue(np.issubdtype(np.object_, df_series.dtype))
Expand All @@ -1573,24 +1573,24 @@ def test_clean_data_and_get_base_stats(self, *mocks):
'min_id': None, 'max_id': None}, base_stats)

# Tests with some other null values set
profiler._null_values = {
null_values = {
"1.0": 0,
"3.0": 0
}
df_series, base_stats = \
StructuredColProfiler.clean_data_and_get_base_stats(
profiler, df_series=data, sample_size=6,
df_series=data, sample_size=6, null_values=null_values,
min_true_samples=0)
self.assertDictEqual({'sample': ["nan", '6.0', '4.0', "nan"],
'sample_size': 6, 'null_count': 2,
'null_types': {'1.0': ['a'], '3.0': ['c']},
'min_id': None, 'max_id': None}, base_stats)

# Tests with no null values set
profiler._null_values = {}
null_values = {}
df_series, base_stats = \
StructuredColProfiler.clean_data_and_get_base_stats(
profiler, df_series=data, sample_size=6,
df_series=data, sample_size=6, null_values=null_values,
min_true_samples=0)
self.assertDictEqual({'sample': ["3.0", "4.0", '6.0', "nan", "1.0"],
'sample_size': 6, 'null_count': 0,
Expand Down

0 comments on commit 7b40438

Please sign in to comment.