diff --git a/_unittests/ut_df/test_dataframe_io_helpers.py b/_unittests/ut_df/test_dataframe_io_helpers.py index 0a3199e..c030aff 100644 --- a/_unittests/ut_df/test_dataframe_io_helpers.py +++ b/_unittests/ut_df/test_dataframe_io_helpers.py @@ -114,6 +114,9 @@ def test_enumerate_json_items(self): items = list(enumerate_json_items( BytesIO(TestDataFrameIOHelpers.text_json))) self.assertEqual(TestDataFrameIOHelpers.text_json_exp, items) + items = list(enumerate_json_items( + BytesIO(TestDataFrameIOHelpers.text_json))) + self.assertEqual(TestDataFrameIOHelpers.text_json_exp, items) def test_read_json_raw(self): data = [{'id': 1, 'name': {'first': 'Coleen', 'last': 'Volk'}}, @@ -133,6 +136,15 @@ def test_read_json_raw(self): js_exp = loads(exp) self.assertEqual(js_exp, js_read) + def test_read_json_raw_head(self): + data = [{'id': 1, 'name': {'first': 'Coleen', 'last': 'Volk'}}, + {'name': {'given': 'Mose', 'family': 'Regner'}}, + {'id': 2, 'name': 'FayeRaker'}] + it = StreamingDataFrame.read_json(data, flatten=True, chunksize=1) + h1 = it.head() + h2 = it.head() + self.assertEqualDataFrame(h1, h2) + def test_pandas_json_chunksize(self): jsonl = '''{"a": 1, "b": 2} {"a": 3, "b": 4}''' @@ -161,6 +173,18 @@ def test_read_json_rows2(self): js = dfs[0].to_json(orient='records') self.assertEqual('[{"a":1,"b":2},{"a":3,"b":4}]', js) + def test_read_json_rows2_head(self): + data = b'''{"a": 1, "b": 2} + {"a": 3, "b": 4}''' + dfs = pandas.read_json(BytesIO(data), lines=True) + self.assertEqual(dfs.shape, (2, 2)) + it = StreamingDataFrame.read_json(BytesIO(data), lines="stream") + h1 = it.head() + h2 = it.head() + self.assertNotEmpty(h1) + self.assertNotEmpty(h2) + self.assertEqualDataFrame(h1, h2) + def test_read_json_ijson(self): it = StreamingDataFrame.read_json( BytesIO(TestDataFrameIOHelpers.text_json)) diff --git a/_unittests/ut_df/test_streaming_dataframe.py b/_unittests/ut_df/test_streaming_dataframe.py index 0ca2a14..57f9a48 100644 --- a/_unittests/ut_df/test_streaming_dataframe.py +++ b/_unittests/ut_df/test_streaming_dataframe.py @@ -455,7 +455,6 @@ def test_getitem(self): df1 = sdf.to_df() df2 = sdf2.to_df() self.assertEqualDataFrame(df1[["cint"]], df2) - self.assertRaise(lambda: sdf["cint"], NotImplementedError) self.assertRaise(lambda: sdf[:, "cint"], NotImplementedError) def test_read_csv_names(self): @@ -523,6 +522,39 @@ def test_describe(self): self.assertEqualArray(desc.loc['std', :], numpy.array( [2.886795e-01, 28867.946472]), decimal=4) + def test_set_item(self): + df = pandas.DataFrame(data=dict(a=[4.5], b=[6], c=[7])) + self.assertRaise(lambda: StreamingDataFrame(df), TypeError) + sdf = StreamingDataFrame.read_df(df) + + def f(): + sdf[['a']] = 10 + self.assertRaise(f, ValueError) + + def g(): + sdf['a'] = [10] + self.assertRaise(g, NotImplementedError) + + sdf['aa'] = 10 + df = sdf.to_df() + ddf = pandas.DataFrame(data=dict(a=[4.5], b=[6], c=[7], aa=[10])) + self.assertEqualDataFrame(df, ddf) + sdf['bb'] = sdf['b'] + 10 + df = sdf.to_df() + ddf = ddf = pandas.DataFrame( + data=dict(a=[4.5], b=[6], c=[7], aa=[10], bb=[16])) + self.assertEqualDataFrame(df, ddf) + + def test_set_item_function(self): + df = pandas.DataFrame(data=dict(a=[4.5], b=[6], c=[7])) + self.assertRaise(lambda: StreamingDataFrame(df), TypeError) + sdf = StreamingDataFrame.read_df(df) + sdf['bb'] = sdf['b'].apply(lambda x: x + 11) + df = sdf.to_df() + ddf = ddf = pandas.DataFrame( + data=dict(a=[4.5], b=[6], c=[7], bb=[17])) + self.assertEqualDataFrame(df, ddf) + if __name__ == "__main__": # TestStreamingDataFrame().test_describe() diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 5b4a53a..a228ffc 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -4,7 +4,7 @@ jobs: vmImage: 'ubuntu-latest' strategy: matrix: - Python37-Linux: + Python39-Linux: python.version: '3.9' maxParallel: 3 @@ -51,7 +51,7 @@ jobs: vmImage: 'macOS-latest' strategy: matrix: - Python37-Mac: + Python39-Mac: python.version: '3.9' maxParallel: 3 diff --git a/pandas_streaming/__init__.py b/pandas_streaming/__init__.py index 9129cd6..e118395 100644 --- a/pandas_streaming/__init__.py +++ b/pandas_streaming/__init__.py @@ -35,8 +35,8 @@ def check(log=False): It raises an exception. If you want to disable the logs: - @param log if True, display information, otherwise - @return 0 or exception + :param log: if True, display information, otherwise none + :return: 0 or exception """ return True diff --git a/pandas_streaming/data/dummy.py b/pandas_streaming/data/dummy.py index 518e1b8..c0fea66 100644 --- a/pandas_streaming/data/dummy.py +++ b/pandas_streaming/data/dummy.py @@ -12,11 +12,11 @@ def dummy_streaming_dataframe(n, chunksize=10, asfloat=False, **cols): Returns a dummy streaming dataframe mostly for unit test purposes. - @param n number of rows - @param chunksize chunk size - @param asfloat use random float and not random int - @param cols additional columns - @return a @see cl StreamingDataFrame + :param n: number of rows + :param chunksize: chunk size + :param asfloat: use random float and not random int + :param cols: additional columns + :return: a @see cl StreamingDataFrame """ if asfloat: df = DataFrame(dict(cfloat=[_ + 0.1 for _ in range(0, n)], cstr=[ diff --git a/pandas_streaming/df/connex_split.py b/pandas_streaming/df/connex_split.py index 3a8063a..e06cd69 100644 --- a/pandas_streaming/df/connex_split.py +++ b/pandas_streaming/df/connex_split.py @@ -38,25 +38,30 @@ def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None, without using randomness. """ if hasattr(df, 'iter_creation'): - raise NotImplementedError( + raise NotImplementedError( # pragma: no cover 'Not implemented yet for StreamingDataFrame.') if isinstance(df, numpy.ndarray): - raise NotImplementedError("Not implemented on numpy arrays.") + raise NotImplementedError( # pragma: no cover + "Not implemented on numpy arrays.") if shuffle: df = dataframe_shuffle(df, random_state=random_state) if weights is None: if test_size == 0 or train_size == 0: raise ValueError( - "test_size={0} or train_size={1} cannot be null (1)".format(test_size, train_size)) - return train_test_split(df, test_size=test_size, train_size=train_size, random_state=random_state) + "test_size={0} or train_size={1} cannot be null (1)." + "".format(test_size, train_size)) + return train_test_split(df, test_size=test_size, + train_size=train_size, + random_state=random_state) if isinstance(weights, pandas.Series): weights = list(weights) elif isinstance(weights, str): weights = list(df[weights]) if len(weights) != df.shape[0]: - raise ValueError("Dimension mismatch between weights and dataframe {0} != {1}".format( - df.shape[0], len(weights))) + raise ValueError( + "Dimension mismatch between weights and dataframe " + "{0} != {1}".format(df.shape[0], len(weights))) p = (1 - test_size) if test_size else None if train_size is not None: @@ -64,7 +69,8 @@ def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None, test_size = 1 - p if p is None or min(test_size, p) <= 0: raise ValueError( - "test_size={0} or train_size={1} cannot be null (2)".format(test_size, train_size)) + "test_size={0} or train_size={1} cannot be null (2)." + "".format(test_size, train_size)) ratio = test_size / p if random_state is None: @@ -98,7 +104,8 @@ def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None, (1.0 * (train_weights + test_weights)) if r >= fail_imbalanced: raise ImbalancedSplitException( # pragma: no cover - "Split is imbalanced: train_weights={0} test_weights={1} r={2}".format(train_weights, test_weights, r)) + "Split is imbalanced: train_weights={0} test_weights={1} r={2}." + "".format(train_weights, test_weights, r)) return df.iloc[train_ids, :], df.iloc[test_ids, :] @@ -125,7 +132,8 @@ def train_test_connex_split(df, groups, test_size=0.25, train_size=None, @param hash_size size of the hash to cache information about partition @param unique_rows ensures that rows are unique @param shuffle shuffles before the split - @param fail_imbalanced raises an exception if relative weights difference is higher than this value + @param fail_imbalanced raises an exception if relative weights difference + is higher than this value @param stop_if_bigger (float) stops a connected components from being bigger than this ratio of elements, this should not be used unless a big components emerges, the algorithm stops merging @@ -135,7 +143,8 @@ def train_test_connex_split(df, groups, test_size=0.25, train_size=None, if their relative sizes are too different, the value should be close to 1 @param return_cnx returns connected components as a third results - @param must_groups column name for ids which must not be shared by train/test partitions + @param must_groups column name for ids which must not be shared by + train/test partitions @param random_state seed for random generator @param fLOG logging function @return Two @see cl StreamingDataFrame, one @@ -171,9 +180,10 @@ def train_test_connex_split(df, groups, test_size=0.25, train_size=None, dict(user="UD", prod="PG", card="C5"), ]) - train, test = train_test_connex_split(df, test_size=0.5, - groups=['user', 'prod', 'card'], - fail_imbalanced=0.6) + train, test = train_test_connex_split( + df, test_size=0.5, groups=['user', 'prod', 'card'], + fail_imbalanced=0.6) + print(train) print(test) @@ -197,23 +207,25 @@ def train_test_connex_split(df, groups, test_size=0.25, train_size=None, dict(user="UD", prod="PG", card="C5"), ]) - train, test, cnx = train_test_connex_split(df, test_size=0.5, - groups=['user', 'prod', 'card'], - fail_imbalanced=0.6, return_cnx=True) + train, test, cnx = train_test_connex_split( + df, test_size=0.5, groups=['user', 'prod', 'card'], + fail_imbalanced=0.6, return_cnx=True) print(cnx[0]) print(cnx[1]) """ if stratify is not None: - raise NotImplementedError("Option stratify is not implemented.") + raise NotImplementedError( # pragma: no cover + "Option stratify is not implemented.") if groups is None or len(groups) == 0: raise ValueError( # pragma: no cover "groups is empty. Use regular train_test_split.") if hasattr(df, 'iter_creation'): - raise NotImplementedError( + raise NotImplementedError( # pragma: no cover 'Not implemented yet for StreamingDataFrame.') if isinstance(df, numpy.ndarray): - raise NotImplementedError("Not implemented on numpy arrays.") + raise NotImplementedError( # pragma: no cover + "Not implemented on numpy arrays.") if shuffle: df = dataframe_shuffle(df, random_state=random_state) @@ -241,8 +253,8 @@ def do_connex_components(dfrows, local_groups, kb, sib): while modif > 0 and itern < len(elements): if fLOG and df.shape[0] > 10000: - fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - modif={2}".format( - iter, len(set(elements)), modif)) + fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - " + "modif={2}".format(iter, len(set(elements)), modif)) modif = 0 itern += 1 for i, row in enumerate(dfrows.itertuples(index=False, name=None)): @@ -272,8 +284,11 @@ def do_connex_components(dfrows, local_groups, kb, sib): r = diff / float(maxi) if r > kb: if fLOG: # pragma: no cover - fLOG('[train_test_connex_split] balance r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}'.format( - r, kb, new_c, len(counts_cnx[new_c]), c, len(counts_cnx[c]))) + fLOG('[train_test_connex_split] balance ' + 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, ' + '#[{4}]={5}'.format(r, kb, new_c, + len(counts_cnx[new_c]), + c, len(counts_cnx[c]))) continue if sib is not None: @@ -281,8 +296,10 @@ def do_connex_components(dfrows, local_groups, kb, sib): len(counts_cnx[c])) / float(len(elements)) if r > sib: if fLOG: # pragma: no cover - fLOG('[train_test_connex_split] no merge r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}'.format( - r, sib, new_c, len(counts_cnx[new_c]), c, len(counts_cnx[c]))) + fLOG('[train_test_connex_split] no merge ' + 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}' + ''.format(r, sib, new_c, len(counts_cnx[new_c]), + c, len(counts_cnx[c]))) avoids_merge[new_c, c] = i continue @@ -316,7 +333,8 @@ def do_connex_components(dfrows, local_groups, kb, sib): fLOG("[train_test_connex_split] #nb in '{0}': {1}".format( g, len(set(dfids[g])))) fLOG( - "[train_test_connex_split] #connex {0}/{1}".format(grsum.shape[0], dfids.shape[0])) + "[train_test_connex_split] #connex {0}/{1}".format( + grsum.shape[0], dfids.shape[0])) if grsum.shape[0] <= 1: raise ValueError( # pragma: no cover "Every element is in the same connected components.") @@ -328,28 +346,28 @@ def do_connex_components(dfrows, local_groups, kb, sib): cl = [(v, k) for k, v in counts.items()] cum = 0 maxc = None - fLOG("[train_test_connex_split] number of connected components: {0}".format( - len(set(elements)))) + fLOG("[train_test_connex_split] number of connected components: {0}" + "".format(len(set(elements)))) for i, (v, k) in enumerate(sorted(cl, reverse=True)): if i == 0: maxc = k, v if i >= 10: break cum += v - fLOG("[train_test_connex_split] c={0} #elements={1} cumulated={2}/{3}".format( - k, v, cum, len(elements))) + fLOG("[train_test_connex_split] c={0} #elements={1} cumulated" + "={2}/{3}".format(k, v, cum, len(elements))) # Most important component - fLOG( - '[train_test_connex_split] first row of the biggest component {0}'.format(maxc)) + fLOG('[train_test_connex_split] first row of the biggest component ' + '{0}'.format(maxc)) tdf = dfids[dfids[name] == maxc[0]] fLOG('[train_test_connex_split] \n{0}'.format(tdf.head(n=10))) # Splits. - train, test = train_test_split_weights(grsum, weights=one, test_size=test_size, - train_size=train_size, shuffle=shuffle, - fail_imbalanced=fail_imbalanced, - random_state=random_state) + train, test = train_test_split_weights( + grsum, weights=one, test_size=test_size, train_size=train_size, + shuffle=shuffle, fail_imbalanced=fail_imbalanced, + random_state=random_state) train.drop(one, inplace=True, axis=1) test.drop(one, inplace=True, axis=1) @@ -369,7 +387,8 @@ def double_merge(d): def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, - stratify=None, force=False, random_state=None, fLOG=None): + stratify=None, force=False, random_state=None, + fLOG=None): """ This split is for a specific case where data is linked in one way. Let's assume we have two ids as we have @@ -380,7 +399,8 @@ def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, @param df :epkg:`pandas:DataFrame` @param group columns name for the ids - @param test_size ratio for the test partition (if *train_size* is not specified) + @param test_size ratio for the test partition + (if *train_size* is not specified) @param train_size ratio for the train partition @param stratify column holding the stratification @param force if True, tries to get at least one example on the test side @@ -398,7 +418,8 @@ def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, classification. A category (*stratify*) is not exclusive and an observation can be assigned to multiple categories. In that particular case, the method - `train_test_split `_ + `train_test_split `_ can not directly be used. .. runpython:: @@ -419,9 +440,11 @@ def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, print(test) """ if stratify is None: - raise ValueError("stratify must be specified.") + raise ValueError( # pragma: no cover + "stratify must be specified.") if group is None: - raise ValueError("group must be specified.") + raise ValueError( # pragma: no cover + "group must be specified.") if hasattr(df, 'iter_creation'): raise NotImplementedError( 'Not implemented yet for StreamingDataFrame.') @@ -434,7 +457,8 @@ def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, test_size = 1 - p if p is None or min(test_size, p) <= 0: raise ValueError( # pragma: no cover - "test_size={0} or train_size={1} cannot be null".format(test_size, train_size)) + "test_size={0} or train_size={1} cannot be null".format( + test_size, train_size)) couples = df[[group, stratify]].itertuples(name=None, index=False) hist = Counter(df[stratify]) diff --git a/pandas_streaming/df/dataframe.py b/pandas_streaming/df/dataframe.py index ede4328..1126b98 100644 --- a/pandas_streaming/df/dataframe.py +++ b/pandas_streaming/df/dataframe.py @@ -67,6 +67,11 @@ class StreamingDataFrame: """ def __init__(self, iter_creation, check_schema=True, stable=True): + if isinstance(iter_creation, (pandas.DataFrame, dict, + numpy.ndarray, str)): + raise TypeError( + "Unexpected type %r for iter_creation. It must " + "be an iterator." % type(iter_creation)) if isinstance(iter_creation, StreamingDataFrame): self.iter_creation = iter_creation.iter_creation self.stable = iter_creation.stable @@ -137,8 +142,9 @@ def train_test_split(self, path_or_buf=None, export_method="to_csv", if streaming: if partitions is not None: if len(partitions) != 2: - raise NotImplementedError( - "Only train and test split is allowed, *partitions* must be of length 2.") + raise NotImplementedError( # pragma: no cover + "Only train and test split is allowed, *partitions* " + "must be of length 2.") kwargs = kwargs.copy() kwargs['train_size'] = partitions[0] kwargs['test_size'] = partitions[1] @@ -212,13 +218,20 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat if isinstance(args[0], (list, dict)): if flatten: - return StreamingDataFrame.read_df(json_normalize(args[0]), **kwargs_create) + return StreamingDataFrame.read_df( + json_normalize(args[0]), **kwargs_create) return StreamingDataFrame.read_df(args[0], **kwargs_create) if kwargs.get('lines', None) == 'stream': del kwargs['lines'] - st = JsonIterator2Stream(enumerate_json_items( - args[0], encoding=kwargs.get('encoding', None), lines=True, flatten=flatten)) + + def localf(a0=args[0]): + a0.seek(0) + return enumerate_json_items( + a0, encoding=kwargs.get('encoding', None), lines=True, + flatten=flatten) + + st = JsonIterator2Stream(localf) args = args[1:] if chunksize is None: @@ -228,9 +241,12 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat **kwargs_create) def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): - for r in pandas.read_json(st, *args, chunksize=chunksize, nrows=chunksize, - lines=True, **kw): + st.seek(0) + for r in pandas.read_json( + st, *args, chunksize=chunksize, nrows=chunksize, + lines=True, **kw): yield r + return StreamingDataFrame(fct1, **kwargs_create) if kwargs.get('lines', False): @@ -243,12 +259,14 @@ def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): **kwargs_create) def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): - for r in pandas.read_json(*args, chunksize=chunksize, nrows=chunksize, **kw): + for r in pandas.read_json( + *args, chunksize=chunksize, nrows=chunksize, **kw): yield r return StreamingDataFrame(fct2, **kwargs_create) - st = JsonIterator2Stream(enumerate_json_items( - args[0], encoding=kwargs.get('encoding', None), flatten=flatten)) + st = JsonIterator2Stream( + lambda a0=args[0]: enumerate_json_items( + a0, encoding=kwargs.get('encoding', None), flatten=flatten)) args = args[1:] if 'lines' in kwargs: del kwargs['lines'] @@ -260,8 +278,9 @@ def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): **kwargs_create) def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): - for r in pandas.read_json(st, *args, chunksize=chunksize, nrows=chunksize, - lines=True, **kw): + for r in pandas.read_json( + st, *args, chunksize=chunksize, nrows=chunksize, + lines=True, **kw): yield r return StreamingDataFrame(fct3, **kwargs_create) @@ -324,13 +343,14 @@ def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame': chunksize = df.shape[0] else: raise NotImplementedError( - "Cannot retrieve size to infer chunksize for type={0}".format(type(df))) + "Cannot retrieve size to infer chunksize for type={0}" + ".".format(type(df))) if hasattr(df, 'shape'): size = df.shape[0] else: - raise NotImplementedError( - "Cannot retrieve size for type={0}".format(type(df))) + raise NotImplementedError( # pragma: no cover + "Cannot retrieve size for type={0}.".format(type(df))) def local_iterator(): "local iterator" @@ -413,6 +433,8 @@ def columns(self): """ for it in self: return it.columns + # The dataframe is empty. + return [] @property def dtypes(self): @@ -771,14 +793,13 @@ def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_me We assume the result holds in memory. The out-of-memory is not implemented yet. - @param by see :epkg:`pandas:DataFrame:groupby` - @param in_memory in-memory algorithm - @param lambda_agg aggregation function, *sum* by default - @param lambda_agg_agg to aggregate the aggregations, *sum* by default - @param kwargs additional parameters for :epkg:`pandas:DataFrame:groupby` - @param strategy ``'cum'``, or ``'streaming'``, - see below - @return :epkg:`pandas:DataFrame` + :param by: see :epkg:`pandas:DataFrame:groupby` + :param in_memory: in-memory algorithm + :param lambda_agg: aggregation function, *sum* by default + :param lambda_agg_agg: to aggregate the aggregations, *sum* by default + :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` + :param strategy: ``'cum'``, or ``'streaming'``, see below + :return: :epkg:`pandas:DataFrame` As the input @see cl StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. @@ -822,7 +843,8 @@ def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_me df20 = dummy_streaming_dataframe(20).to_dataframe() df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) - sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), strategy='cum', as_index=False) + sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), + strategy='cum', as_index=False) for gr in sgr: print() print(gr) @@ -874,9 +896,9 @@ def ensure_dtype(self, df, dtypes): Ensures the :epkg:`dataframe` *df* has types indicated in dtypes. Changes it if not. - @param df dataframe - @param dtypes list of types - @return updated? + :param df: dataframe + :param dtypes: list of types + :return: updated? """ ch = False cols = df.columns @@ -895,16 +917,69 @@ def __getitem__(self, *args): if len(args) != 1: raise NotImplementedError("Only a list of columns is supported.") cols = args[0] + if isinstance(cols, str): + # One column. + iter_creation = self.iter_creation + + def iterate_col(): + "iterate on one column" + for df in iter_creation(): + yield df[[cols]] + return StreamingSeries(iterate_col, **self.get_kwargs()) + if not isinstance(cols, list): raise NotImplementedError("Only a list of columns is supported.") def iterate_cols(sdf): - "iterate on columns" + """Iterate on columns.""" for df in sdf: yield df[cols] return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs()) + def __setitem__(self, index, value): + """ + Limited set of operators are supported. + """ + if not isinstance(index, str): + raise ValueError( + "Only column affected are supported but index=%r." % index) + if isinstance(value, (int, float, numpy.number, str)): + # Is is equivalent to add_column. + iter_creation = self.iter_creation + + def iterate_fct(): + "iterate on rows" + iters = iter_creation() + for df in iters: + dfc = df.copy() + dfc[index] = value + yield dfc + + self.iter_creation = iterate_fct + + elif isinstance(value, StreamingSeries): + iter_creation = self.iter_creation + + def iterate_fct(): + "iterate on rows" + iters = iter_creation() + for df, dfs in zip(iters, value): + if df.shape[0] != dfs.shape[0]: + raise RuntimeError( + "Chunksize or shape are different when " + "iterating on two StreamDataFrame at the same " + "time: %r != %r." % (df.shape[0], dfs.shape[0])) + dfc = df.copy() + dfc[index] = dfs + yield dfc + + self.iter_creation = iterate_fct + else: + raise NotImplementedError( + "Not implemented for type(index)=%r and type(value)=%r." % ( + type(index), type(value))) + def add_column(self, col, value): """ Implements some of the functionalities :epkg:`pandas` @@ -1042,3 +1117,38 @@ def describe(self, percentiles=None, include=None, exclude=None, rows = [name for name in summary.index if name not in notper] summary = summary.loc[rows, :] return pandas.concat([merged, summary]) + + +class StreamingSeries(StreamingDataFrame): + """ + Seens as a :epkg:`StreamingDataFrame` of one column. + """ + + def __init__(self, iter_creation, check_schema=True, stable=True): + StreamingDataFrame.__init__( + self, iter_creation, check_schema=check_schema, stable=stable) + if len(self.columns) != 1: + raise RuntimeError( + "A series can contain only one column not %r." % len(self.columns)) + + def apply(self, *args, **kwargs) -> 'StreamingDataFrame': + """ + Applies :epkg:`pandas:Series:apply`. + This function returns a @see cl StreamingSeries. + """ + return StreamingSeries( + lambda: map(lambda df: df.apply(*args, **kwargs), self), + **self.get_kwargs()) + + def __add__(self, value): + """ + Does an addition on every value hoping that has a meaning. + + :param value: any value which makes sense + :return: a new series + """ + def iterate(): + for df in self: + yield df + value + + return StreamingSeries(iterate, **self.get_kwargs()) diff --git a/pandas_streaming/df/dataframe_io.py b/pandas_streaming/df/dataframe_io.py index 6fa455d..2138178 100644 --- a/pandas_streaming/df/dataframe_io.py +++ b/pandas_streaming/df/dataframe_io.py @@ -15,12 +15,12 @@ def to_zip(df, zipfilename, zname="df.csv", **kwargs): Saves a :epkg:`Dataframe` into a :epkg:`zip` file. It can be read by @see fn to_zip. - @param df :epkg:`dataframe` or :epkg:`numpy:array` - @param zipfilename a :epkg:`*py:zipfile:ZipFile` or a filename - @param zname a filename in th zipfile - @param kwargs parameters for :epkg:`pandas:to_csv` or - :epkg:`numpy:save` - @return zipfilename + :param df: :epkg:`dataframe` or :epkg:`numpy:array` + :param zipfilename: a :epkg:`*py:zipfile:ZipFile` or a filename + :param zname: a filename in th zipfile + :param kwargs: parameters for :epkg:`pandas:to_csv` or + :epkg:`numpy:save` + :return: zipfilename .. exref:: :title: Saves and reads a dataframe in a zip file @@ -101,21 +101,21 @@ def to_zip(df, zipfilename, zname="df.csv", **kwargs): zf.close() -def read_zip(zipfilename, zname="df.csv", **kwargs): +def read_zip(zipfilename, zname=None, **kwargs): """ Reads a :epkg:`dataframe` from a :epkg:`zip` file. It can be saved by @see fn read_zip. - @param zipfilename a :epkg:`*py:zipfile:ZipFile` or a filename - @param zname a filename in th zipfile - @param kwargs parameters for :epkg:`pandas:read_csv` - @return :epkg:`pandas:dataframe` or :epkg:`numpy:array` + :param zipfilename: a :epkg:`*py:zipfile:ZipFile` or a filename + :param zname: a filename in zipfile, if None, takes the first one + :param kwargs: parameters for :func:`pandas.read_csv` + :return: :func:`pandas.DataFrame` or :epkg:`numpy:array` """ if isinstance(zipfilename, str): ext = os.path.splitext(zipfilename)[-1] if ext != '.zip': - raise NotImplementedError( - "Only zip file are implemented not '{0}'.".format(ext)) + raise NotImplementedError( # pragma: no cover + "Only zip files are supported not '{0}'.".format(ext)) zf = zipfile.ZipFile(zipfilename, 'r') # pylint: disable=R1732 close = True elif isinstance(zipfilename, zipfile.ZipFile): @@ -125,6 +125,8 @@ def read_zip(zipfilename, zname="df.csv", **kwargs): raise TypeError( # pragma: no cover "No implementation for type '{0}'".format(type(zipfilename))) + if zname is None: + zname = zf.namelist()[0] content = zf.read(zname) stb = io.BytesIO(content) ext = os.path.splitext(zname)[-1] diff --git a/pandas_streaming/df/dataframe_io_helpers.py b/pandas_streaming/df/dataframe_io_helpers.py index cd1262e..bd07858 100644 --- a/pandas_streaming/df/dataframe_io_helpers.py +++ b/pandas_streaming/df/dataframe_io_helpers.py @@ -16,19 +16,26 @@ class JsonPerRowsStream: """ Reads a :epkg:`json` streams and adds ``,``, ``[``, ``]`` to convert a stream containing - one :pekg:`json` object per row into one single :epkg:`json` object. + one :epkg:`json` object per row into one single :epkg:`json` object. It only implements method *readline*. + + :param st: stream """ def __init__(self, st): - """ - @param st stream - """ self.st = st self.begin = True self.newline = False self.end = True + def seek(self, offset): + """ + Change the stream position to the given byte offset. + + :param offset: offset, only 0 is implemented + """ + self.st.seek(offset) + def readline(self, size=-1): """ Reads a line, adds ``,``, ``[``, ``]`` if needed. @@ -48,14 +55,12 @@ def readline(self, size=-1): if text.endswith("\n"): self.newline = True return text - elif len(text) == 0 or len(text) < size: + if len(text) == 0 or len(text) < size: if self.end: self.end = False return text + ']' - else: - return text - else: return text + return text def read(self, size=-1): """ @@ -85,14 +90,12 @@ def read(self, size=-1): if text.endswith(cst[0]): self.newline = True return text - elif len(text) == 0 or len(text) < size: + if len(text) == 0 or len(text) < size: if self.end: self.end = False return text + cst[4] - else: - return text - else: return text + return text def getvalue(self): """ @@ -143,12 +146,12 @@ def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fL """ Enumerates items from a :epkg:`JSON` file or string. - @param filename filename or string or stream to parse - @param encoding encoding - @param lines one record per row - @param flatten call @see fn flatten_dictionary - @param fLOG logging function - @return iterator on records at first level. + :param filename: filename or string or stream to parse + :param encoding: encoding + :param lines: one record per row + :param flatten: call @see fn flatten_dictionary + :param fLOG: logging function + :return: iterator on records at first level. It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``. However, if option *lines* if true, the function considers that the @@ -224,19 +227,26 @@ def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fL if isinstance(filename, str): if "{" not in filename and os.path.exists(filename): with open(filename, "r", encoding=encoding) as f: - for el in enumerate_json_items(f, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): + for el in enumerate_json_items( + f, encoding=encoding, lines=lines, + flatten=flatten, fLOG=fLOG): yield el else: st = StringIO(filename) - for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): + for el in enumerate_json_items( + st, encoding=encoding, lines=lines, + flatten=flatten, fLOG=fLOG): yield el elif isinstance(filename, bytes): st = BytesIO(filename) - for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): + for el in enumerate_json_items( + st, encoding=encoding, lines=lines, flatten=flatten, + fLOG=fLOG): yield el elif lines: - for el in enumerate_json_items(JsonPerRowsStream(filename), - encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG): + for el in enumerate_json_items( + JsonPerRowsStream(filename), + encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG): yield el else: parser = ijson.parse(filename) @@ -247,7 +257,8 @@ def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fL for i, (_, event, value) in enumerate(parser): if i % 1000000 == 0 and fLOG is not None: fLOG( # pragma: no cover - "[enumerate_json_items] i={0} yielded={1}".format(i, nbyield)) + "[enumerate_json_items] i={0} yielded={1}" + "".format(i, nbyield)) if event == "start_array": if curkey is None: current = [] @@ -315,6 +326,9 @@ class JsonIterator2Stream: method *read* is called. The iterator could be one returned by @see fn enumerate_json_items. + :param it: iterator + :param kwargs: arguments to :epkg:`*py:json:dumps` + .. exref:: :title: Reshape a json file @@ -382,12 +396,20 @@ class JsonIterator2Stream: """ def __init__(self, it, **kwargs): - """ - @param it iterator - @param kwargs arguments to :epkg:`*py:json:dumps` - """ self.it = it self.kwargs = kwargs + self.it0 = it() + + def seek(self, offset): + """ + Change the stream position to the given byte offset. + + :param offset: offset, only 0 is implemented + """ + if offset != 0: + raise NotImplementedError( + "The iterator can only return at the beginning.") + self.it0 = self.it() def write(self): """ @@ -400,14 +422,18 @@ def read(self): Reads the next item and returns it as a string. """ try: - value = next(self.it) + value = next(self.it0) return dumps(value, **self.kwargs) except StopIteration: return None def __iter__(self): """ - Iterate on each row. + Iterates on each row. The behaviour is a bit tricky. + It is implemented to be swalled by :func:`pandas.read_json` which + uses :func:`itertools.islice` to go through the items. + It calls multiple times `__iter__` but does expect the + iterator to continue from where it stopped last time. """ - for value in self.it: + for value in self.it0: yield dumps(value, **self.kwargs) diff --git a/pandas_streaming/exc/exc_streaming.py b/pandas_streaming/exc/exc_streaming.py index e229d45..dfe0bba 100644 --- a/pandas_streaming/exc/exc_streaming.py +++ b/pandas_streaming/exc/exc_streaming.py @@ -16,7 +16,7 @@ def __init__(self, meth): This method is inefficient in streaming mode and not implemented. - @param meth method + :param meth: inefficient method """ Exception.__init__( self, "{0} should not be done in streaming mode.".format(meth))