Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _unittests/ut_df/data/example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"a": 1, "b": 2}
{"a": 3, "b": 4}
1 change: 1 addition & 0 deletions _unittests/ut_df/data/example2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"a":1,"b":2},{"a":3,"b":4}]
24 changes: 24 additions & 0 deletions _unittests/ut_df/test_dataframe_io_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def test_read_json_raw_head(self):
h1 = it.head()
h2 = it.head()
self.assertEqualDataFrame(h1, h2)
self.assertGreater(h1.shape[0], 1)
self.assertGreater(h2.shape[0], 1)

def test_pandas_json_chunksize(self):
jsonl = '''{"a": 1, "b": 2}
Expand Down Expand Up @@ -186,6 +188,28 @@ def test_read_json_rows2_head(self):
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_rows_file_head(self):
data = self.abs_path_join(__file__, 'data', 'example2.json')
dfs = pandas.read_json(data, orient='records')
self.assertEqual(dfs.shape, (2, 2))
it = StreamingDataFrame.read_json(data)
h1 = it.head()
h2 = it.head()
self.assertNotEmpty(h1)
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_rows_file_lines_head(self):
data = self.abs_path_join(__file__, 'data', 'example.json')
dfs = pandas.read_json(data, orient='records', lines=True)
self.assertEqual(dfs.shape, (2, 2))
it = StreamingDataFrame.read_json(data, lines="stream")
h1 = it.head()
h2 = it.head()
self.assertNotEmpty(h1)
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_ijson(self):
it = StreamingDataFrame.read_json(
BytesIO(TestDataFrameIOHelpers.text_json))
Expand Down
16 changes: 13 additions & 3 deletions pandas_streaming/df/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
and it must be defined to return an iterator.
If *lines* is True, the function falls back into
:epkg:`pandas:read_json`, otherwise it used
@see fn enumerate_json_items. If *lines is ``'stream'``,
@see fn enumerate_json_items. If *lines* is ``'stream'``,
*enumerate_json_items* is called with parameter
``lines=True``.
Parameter *flatten* uses the trick described at
Expand Down Expand Up @@ -212,6 +212,13 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
it = StreamingDataFrame.read_json(BytesIO(data))
dfs = list(it)
print(dfs)

.. index:: IncompleteJSONError

The parsed json must have an empty line at the end otherwise
the following exception is raised:
`ijson.common.IncompleteJSONError: `
`parse error: unallowed token at this point in JSON text`.
"""
if not isinstance(chunksize, int) or chunksize <= 0:
raise ValueError( # pragma: no cover
Expand All @@ -228,7 +235,8 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
del kwargs['lines']

def localf(a0=args[0]):
a0.seek(0)
if hasattr(a0, 'seek'):
a0.seek(0)
return enumerate_json_items(
a0, encoding=kwargs.get('encoding', None), lines=True,
flatten=flatten)
Expand Down Expand Up @@ -280,6 +288,7 @@ def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()):
**kwargs_create)

def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
st.seek(0)
for r in pandas.read_json(
st, *args, chunksize=chunksize, nrows=chunksize,
lines=True, **kw):
Expand Down Expand Up @@ -920,8 +929,9 @@ def __getitem__(self, *args):

def iterate_col():
"iterate on one column"
one_col = [cols]
for df in iter_creation():
yield df[[cols]]
yield df[one_col]
return StreamingSeries(iterate_col, **self.get_kwargs())

if not isinstance(cols, list):
Expand Down
9 changes: 8 additions & 1 deletion pandas_streaming/df/dataframe_io_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ def flatten_dictionary(dico, sep="_"):
"""
Flattens a dictionary with nested structure to a dictionary with no
hierarchy.

:param dico: dictionary to flatten
:param sep: string to separate dictionary keys by
:return: flattened dictionary

Inspired from `flatten_json <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
Inspired from `flatten_json
<https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
"""
flattened_dict = {}

Expand Down Expand Up @@ -223,6 +225,11 @@ def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fL

for item in enumerate_json_items(text_json):
print(item)

The parsed json must have an empty line at the end otherwise
the following exception is raised:
`ijson.common.IncompleteJSONError: `
`parse error: unallowed token at this point in JSON text`.
"""
if isinstance(filename, str):
if "{" not in filename and os.path.exists(filename):
Expand Down