diff --git a/tensorflow_data_validation/utils/stats_gen_lib.py b/tensorflow_data_validation/utils/stats_gen_lib.py index af69c254..999a1329 100644 --- a/tensorflow_data_validation/utils/stats_gen_lib.py +++ b/tensorflow_data_validation/utils/stats_gen_lib.py @@ -91,6 +91,17 @@ def generate_statistics_from_tfrecord( batch_size = stats_options.desired_batch_size # PyLint doesn't understand Beam PTransforms. # pylint: disable=no-value-for-parameter + + def _convert_dict(legacy_examples: Dict) -> Dict: + result = {} + + for k, v in legacy_examples.items(): + if np.issubdtype(v.dtype, np.number): + result[k] = v + else: + result[k] = v.astype(str) + return result + with beam.Pipeline(options=pipeline_options) as p: # Auto detect tfrecord file compression format based on input data # path suffix. @@ -101,6 +112,7 @@ def generate_statistics_from_tfrecord( schema=None, telemetry_descriptors=['tfdv', 'generate_statistics_from_tfrecord']) .BeamSource(batch_size)) + | 'format' >> beam.Map(_convert_dict) | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options) | 'WriteStatsOutput' >> (stats_api.WriteStatisticsToTFRecord(output_path))) @@ -163,6 +175,18 @@ def generate_statistics_from_csv( constants.DEFAULT_DESIRED_INPUT_BATCH_SIZE) # PyLint doesn't understand Beam PTransforms. # pylint: disable=no-value-for-parameter + + def _convert_dict(legacy_examples: Dict) -> Dict: + result = {} + + for k, v in legacy_examples.items(): + if np.issubdtype(v.dtype, np.number): + result[k] = v + else: + result[k] = v.astype(str) + return result + + with beam.Pipeline(options=pipeline_options) as p: # If a header is not provided, assume the first line in a file # to be the header. @@ -175,6 +199,7 @@ def generate_statistics_from_csv( file_pattern=data_location, skip_header_lines=skip_header_lines, compression_type=compression_type) + | 'format' >> beam.Map(_convert_dict) | 'DecodeData' >> csv_decoder.DecodeCSV( column_names=column_names, delimiter=delimiter, @@ -229,6 +254,11 @@ def generate_statistics_from_dataframe( dataframe, stats_options, stats_generators) else: # TODO(b/144580609): Consider using Beam for inmemory mode as well. + + for col in dataframe.columns: + if ~ np.issubdtype(dataframe[col].dtype, np.number): + dataframe[col] = dataframe[col].astype(str) + splits = np.array_split(dataframe, n_jobs) partial_stats = Parallel(n_jobs=n_jobs)( delayed(_generate_partial_statistics_from_df)( @@ -267,6 +297,10 @@ def _generate_partial_statistics_from_df( type=schema_pb2.INT, bool_domain=schema_pb2.BoolDomain()) dataframe = dataframe.drop(columns=drop_columns) + for col in dataframe.columns: + if ~ np.issubdtype(dataframe[col].dtype, np.number): + dataframe[col] = dataframe[col].astype(str) + if schema.feature: stats_options_modified.schema = schema record_batch_with_primitive_arrays = table_util.DataFrameToRecordBatch(