diff --git a/_unittests/ut_df/test_streaming_dataframe.py b/_unittests/ut_df/test_streaming_dataframe.py index a34af88..6d07188 100644 --- a/_unittests/ut_df/test_streaming_dataframe.py +++ b/_unittests/ut_df/test_streaming_dataframe.py @@ -505,7 +505,25 @@ def test_fillna(self): ndf = na.to_df() self.assertEqual(ndf, df3) + def test_describe(self): + x = numpy.arange(100001).astype(numpy.float64) / 100000 - 0.5 + y = numpy.arange(100001).astype(numpy.int64) + z = numpy.array([chr(65 + j % 45) for j in y]) + df = pandas.DataFrame(data=dict(X=x, Y=y, Z=z)) + sdf = StreamingDataFrame.read_df(df) + + desc = sdf.describe() + self.assertEqual(['X', 'Y'], list(desc.columns)) + self.assertEqual(desc.loc['min', :].tolist(), [-0.5, 0]) + self.assertEqual(desc.loc['max', :].tolist(), [0.5, 100000]) + self.assertEqualArray(desc.loc['mean', :], numpy.array([0, 50000])) + self.assertEqualArray(desc.loc['25%', :], numpy.array([-0.25, 25000])) + self.assertEqualArray(desc.loc['50%', :], numpy.array([0.0, 50000])) + self.assertEqualArray(desc.loc['75%', :], numpy.array([0.25, 75000])) + self.assertEqualArray(desc.loc['std', :], numpy.array( + [2.886795e-01, 28867.946472]), decimal=4) + if __name__ == "__main__": - TestStreamingDataFrame().test_apply() + # TestStreamingDataFrame().test_describe() unittest.main() diff --git a/_unittests/ut_module/test_code_style.py b/_unittests/ut_module/test_code_style.py index 27580d6..bd6b356 100644 --- a/_unittests/ut_module/test_code_style.py +++ b/_unittests/ut_module/test_code_style.py @@ -16,7 +16,7 @@ def test_style_src(self): thi, "..", "..", "pandas_streaming")) check_pep8(src_, fLOG=fLOG, pylint_ignore=('C0103', 'C1801', 'R0201', 'R1705', 'W0108', 'W0613', - 'W0212', 'W0703', 'W0107'), + 'W0212', 'W0703', 'W0107', 'C0302'), skip=["Too many nested blocks", "Module 'numpy.random' has no 'RandomState' member", "dataframe_split.py:60: [E731]", @@ -27,7 +27,7 @@ def test_style_test(self): test = os.path.normpath(os.path.join(thi, "..", )) check_pep8(test, fLOG=fLOG, neg_pattern="temp_.*", pylint_ignore=('C0103', 'C1801', 'R0201', 'R1705', 'W0108', 'W0613', - 'C0111', 'W0107'), + 'C0111', 'W0107', 'C0302'), skip=[]) diff --git a/pandas_streaming/df/dataframe.py b/pandas_streaming/df/dataframe.py index 2d575f9..44a041d 100644 --- a/pandas_streaming/df/dataframe.py +++ b/pandas_streaming/df/dataframe.py @@ -5,6 +5,7 @@ """ from io import StringIO, BytesIO from inspect import isfunction +import numpy import numpy.random as nrandom import pandas from pandas.testing import assert_frame_equal @@ -56,16 +57,16 @@ class StreamingDataFrame: in some situations, it is more efficient not to keep that constraints. Draw a random @see me sample is one of these cases. + + :param iter_creation: function which creates an iterator or an + instance of @see cl StreamingDataFrame + :param check_schema: checks that the schema is the same + for every :epkg:`dataframe` + :param stable: indicates if the :epkg:`dataframe` remains the same + whenever it is walked through """ def __init__(self, iter_creation, check_schema=True, stable=True): - """ - @param iter_creation function which creates an iterator or an instance of - @see cl StreamingDataFrame - @param check_schema checks that the schema is the same for every :epkg:`dataframe` - @param stable indicates if the :epkg:`dataframe` remains the same whenever - it is walked through - """ if isinstance(iter_creation, StreamingDataFrame): self.iter_creation = iter_creation.iter_creation self.stable = iter_creation.stable @@ -372,14 +373,17 @@ def __iter__(self): rows, sch[0], list(it.columns))) # pylint: disable=E1136 if list(it.dtypes) != sch[1]: # pylint: disable=E1136 errdf = pandas.DataFrame( - dict(names=sch[0], schema1=sch[1], schema2=list(it.dtypes))) # pylint: disable=E1136 + dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136 + schema2=list(it.dtypes))) # pylint: disable=E1136 tdf = StringIO() errdf['diff'] = errdf['schema2'] != errdf['schema1'] errdf = errdf[errdf['diff']] - errdf.to_csv(tdf, sep=",") - msg = 'Column types are different after row {0}\n{1}' + errdf.to_csv(tdf, sep=",", index=False) raise StreamingDataFrameSchemaError( - msg.format(rows, tdf.getvalue())) + 'Column types are different after row {0}. You may use option ' + 'dtype={{"column_name": str}} to force the type on this column.' + '\n---\n{1}'.format(rows, tdf.getvalue())) + rows += it.shape[0] yield it @@ -988,3 +992,52 @@ def iterate_na(self, **kwargs): return StreamingDataFrame( lambda: iterate_na(self, **kwargs), **self.get_kwargs()) + + def describe(self, percentiles=None, include=None, exclude=None, + datetime_is_numeric=False): + """ + Calls :epkg:`pandas:DataFrame:describe` on every piece + of the datasets. *percentiles* are not really accurate + but just an indication. + + :param percentiles: see :epkg:`pandas:DataFrame:describe` + :param include: see :epkg:`pandas:DataFrame:describe` + :param exclude: see :epkg:`pandas:DataFrame:describe` + :param datetime_is_numeric: see :epkg:`pandas:DataFrame:describe` + :return: :epkg:`pandas:DataFrame:describe` + """ + merged = None + stack = [] + notper = ['count', 'mean', 'std'] + for df in self: + desc = df.describe( + percentiles=percentiles, include=include, exclude=exclude, + datetime_is_numeric=datetime_is_numeric) + count = desc.loc['count', :] + rows = [name for name in desc.index if name not in notper] + stack.append(desc.loc[rows, :]) + if merged is None: + merged = desc + merged.loc['std', :] = ( + merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count + merged.loc['mean', :] *= count + else: + merged.loc['count', :] += desc.loc['count', :] + merged.loc['mean', :] += desc.loc['mean', :] * count + merged.loc['std', :] += ( + desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count + merged.loc['max', :] = numpy.maximum( + merged.loc['max', :], desc.loc['max', :]) + merged.loc['min', :] = numpy.maximum( + merged.loc['min', :], desc.loc['min', :]) + merged.loc['mean', :] /= merged.loc['count', :] + merged.loc['std', :] = ( + merged.loc['std', :] / merged.loc['count', :] - + merged.loc['mean', :] ** 2) ** 0.5 + values = pandas.concat(stack) + summary = values.describe(percentiles=percentiles, + datetime_is_numeric=datetime_is_numeric) + merged = merged.loc[notper, :] + rows = [name for name in summary.index if name not in notper] + summary = summary.loc[rows, :] + return pandas.concat([merged, summary])