Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion _unittests/ut_df/test_streaming_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,25 @@ def test_fillna(self):
ndf = na.to_df()
self.assertEqual(ndf, df3)

def test_describe(self):
x = numpy.arange(100001).astype(numpy.float64) / 100000 - 0.5
y = numpy.arange(100001).astype(numpy.int64)
z = numpy.array([chr(65 + j % 45) for j in y])
df = pandas.DataFrame(data=dict(X=x, Y=y, Z=z))
sdf = StreamingDataFrame.read_df(df)

desc = sdf.describe()
self.assertEqual(['X', 'Y'], list(desc.columns))
self.assertEqual(desc.loc['min', :].tolist(), [-0.5, 0])
self.assertEqual(desc.loc['max', :].tolist(), [0.5, 100000])
self.assertEqualArray(desc.loc['mean', :], numpy.array([0, 50000]))
self.assertEqualArray(desc.loc['25%', :], numpy.array([-0.25, 25000]))
self.assertEqualArray(desc.loc['50%', :], numpy.array([0.0, 50000]))
self.assertEqualArray(desc.loc['75%', :], numpy.array([0.25, 75000]))
self.assertEqualArray(desc.loc['std', :], numpy.array(
[2.886795e-01, 28867.946472]), decimal=4)


if __name__ == "__main__":
TestStreamingDataFrame().test_apply()
# TestStreamingDataFrame().test_describe()
unittest.main()
4 changes: 2 additions & 2 deletions _unittests/ut_module/test_code_style.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_style_src(self):
thi, "..", "..", "pandas_streaming"))
check_pep8(src_, fLOG=fLOG,
pylint_ignore=('C0103', 'C1801', 'R0201', 'R1705', 'W0108', 'W0613',
'W0212', 'W0703', 'W0107'),
'W0212', 'W0703', 'W0107', 'C0302'),
skip=["Too many nested blocks",
"Module 'numpy.random' has no 'RandomState' member",
"dataframe_split.py:60: [E731]",
Expand All @@ -27,7 +27,7 @@ def test_style_test(self):
test = os.path.normpath(os.path.join(thi, "..", ))
check_pep8(test, fLOG=fLOG, neg_pattern="temp_.*",
pylint_ignore=('C0103', 'C1801', 'R0201', 'R1705', 'W0108', 'W0613',
'C0111', 'W0107'),
'C0111', 'W0107', 'C0302'),
skip=[])


Expand Down
75 changes: 64 additions & 11 deletions pandas_streaming/df/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
from io import StringIO, BytesIO
from inspect import isfunction
import numpy
import numpy.random as nrandom
import pandas
from pandas.testing import assert_frame_equal
Expand Down Expand Up @@ -56,16 +57,16 @@ class StreamingDataFrame:
in some situations, it is more efficient not to keep
that constraints. Draw a random @see me sample
is one of these cases.

:param iter_creation: function which creates an iterator or an
instance of @see cl StreamingDataFrame
:param check_schema: checks that the schema is the same
for every :epkg:`dataframe`
:param stable: indicates if the :epkg:`dataframe` remains the same
whenever it is walked through
"""

def __init__(self, iter_creation, check_schema=True, stable=True):
"""
@param iter_creation function which creates an iterator or an instance of
@see cl StreamingDataFrame
@param check_schema checks that the schema is the same for every :epkg:`dataframe`
@param stable indicates if the :epkg:`dataframe` remains the same whenever
it is walked through
"""
if isinstance(iter_creation, StreamingDataFrame):
self.iter_creation = iter_creation.iter_creation
self.stable = iter_creation.stable
Expand Down Expand Up @@ -372,14 +373,17 @@ def __iter__(self):
rows, sch[0], list(it.columns))) # pylint: disable=E1136
if list(it.dtypes) != sch[1]: # pylint: disable=E1136
errdf = pandas.DataFrame(
dict(names=sch[0], schema1=sch[1], schema2=list(it.dtypes))) # pylint: disable=E1136
dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136
schema2=list(it.dtypes))) # pylint: disable=E1136
tdf = StringIO()
errdf['diff'] = errdf['schema2'] != errdf['schema1']
errdf = errdf[errdf['diff']]
errdf.to_csv(tdf, sep=",")
msg = 'Column types are different after row {0}\n{1}'
errdf.to_csv(tdf, sep=",", index=False)
raise StreamingDataFrameSchemaError(
msg.format(rows, tdf.getvalue()))
'Column types are different after row {0}. You may use option '
'dtype={{"column_name": str}} to force the type on this column.'
'\n---\n{1}'.format(rows, tdf.getvalue()))

rows += it.shape[0]
yield it

Expand Down Expand Up @@ -988,3 +992,52 @@ def iterate_na(self, **kwargs):

return StreamingDataFrame(
lambda: iterate_na(self, **kwargs), **self.get_kwargs())

def describe(self, percentiles=None, include=None, exclude=None,
datetime_is_numeric=False):
"""
Calls :epkg:`pandas:DataFrame:describe` on every piece
of the datasets. *percentiles* are not really accurate
but just an indication.

:param percentiles: see :epkg:`pandas:DataFrame:describe`
:param include: see :epkg:`pandas:DataFrame:describe`
:param exclude: see :epkg:`pandas:DataFrame:describe`
:param datetime_is_numeric: see :epkg:`pandas:DataFrame:describe`
:return: :epkg:`pandas:DataFrame:describe`
"""
merged = None
stack = []
notper = ['count', 'mean', 'std']
for df in self:
desc = df.describe(
percentiles=percentiles, include=include, exclude=exclude,
datetime_is_numeric=datetime_is_numeric)
count = desc.loc['count', :]
rows = [name for name in desc.index if name not in notper]
stack.append(desc.loc[rows, :])
if merged is None:
merged = desc
merged.loc['std', :] = (
merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count
merged.loc['mean', :] *= count
else:
merged.loc['count', :] += desc.loc['count', :]
merged.loc['mean', :] += desc.loc['mean', :] * count
merged.loc['std', :] += (
desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count
merged.loc['max', :] = numpy.maximum(
merged.loc['max', :], desc.loc['max', :])
merged.loc['min', :] = numpy.maximum(
merged.loc['min', :], desc.loc['min', :])
merged.loc['mean', :] /= merged.loc['count', :]
merged.loc['std', :] = (
merged.loc['std', :] / merged.loc['count', :] -
merged.loc['mean', :] ** 2) ** 0.5
values = pandas.concat(stack)
summary = values.describe(percentiles=percentiles,
datetime_is_numeric=datetime_is_numeric)
merged = merged.loc[notper, :]
rows = [name for name in summary.index if name not in notper]
summary = summary.loc[rows, :]
return pandas.concat([merged, summary])