Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse_tabular(nrows=3) is slow #210

Closed
dberenbaum opened this issue Jul 31, 2024 · 3 comments · Fixed by #221
Closed

parse_tabular(nrows=3) is slow #210

dberenbaum opened this issue Jul 31, 2024 · 3 comments · Fixed by #221
Labels
bug Something isn't working performance priority-p1

Comments

@dberenbaum
Copy link
Contributor

dberenbaum commented Jul 31, 2024

Description

Thanks to both @mattseddon and @volkfox for raising this. Any of from_csv/from_parquet/parse_tabular will be slow if used with nrows and a cloud path.

To reproduce, run https://github.com/iterative/datachain/blob/main/examples/get_started/json-csv-reader.py. The last example takes a long time to complete even though it uses nrow=3. After diving into it a bit, it looks to be an issue with either pyarrow or fsspec. Opened apache/arrow#43497 to track the issue upstream.

Version Info

$ datachain --version
0.2.10.dev6+gcc0b63b.d20240716
@dberenbaum dberenbaum added the bug Something isn't working label Jul 31, 2024
@shcheklein
Copy link
Member

@dberenbaum is it happening even if you run it second time (indexing is cached)?

bc, now we index the whole parent even if we access a single file

@mattseddon
Copy link
Member

@dberenbaum is it happening even if you run it second time (indexing is cached)?

bc, now we index the whole parent even if we access a single file

    uri = "gs://datachain-demo/laion-aesthetics-csv"
    print()
    print("========================================================================")
    print("dynamic CSV with header schema test parsing 3/3M objects")
    print("========================================================================")
    dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion", nrows=3)
    dynamic_csv_ds.print_schema()
    print(dynamic_csv_ds.to_pandas())

I have run the above example multiple times with nrows=3 and without. The result for nrows=3 is always the same. The query hangs indefinitely (Longer than running for the whole 3M records).

@mattseddon
Copy link
Member

mattseddon commented Jul 31, 2024

From my investigations last week it seems like pyarrow is doing some non-thread safe business with fsspec. When we break out of the generator here an async thread is left waiting and because of that the program never exits. Thread information is shown below:

Thread 6133035008:
  File "/datachain/.env/lib/python3.12/site-packages/fsspec/spec.py", line 1941, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/datachain/.env/lib/python3.12/site-packages/fsspec/caching.py", line 234, in _fetch
    self.cache = self.fetcher(start, end)  # new block replaces old
  File "/datachain/.env/lib/python3.12/site-packages/gcsfs/core.py", line 1924, in _fetch_range
    return self.gcsfs.cat_file(self.path, start=start, end=end)
  File "/datachain/.env/lib/python3.12/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/datachain/.env/lib/python3.12/site-packages/fsspec/asyn.py", line 91, in sync
    if event.wait(1):
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 655, in wait
    signaled = self._cond.wait(timeout)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)

Thread Thread-1:
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/datachain/.env/lib/python3.12/site-packages/tqdm/_monitor.py", line 60, in run
    self.was_killed.wait(self.sleep_interval)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 655, in wait
    signaled = self._cond.wait(timeout)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 359, in wait
    gotit = waiter.acquire(True, timeout)

Thread asyncio_0:
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/thread.py", line 89, in _worker
    work_item = work_queue.get(block=True)

Thread fsspecIO:
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
    self._run_once()
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 1949, in _run_once
    event_list = self._selector.select(timeout)
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/selectors.py", line 566, in select
    kev_list = self._selector.control(None, max_ev, timeout)

Thread MainThread:
  File "/datachain/examples/get_started/json-csv-reader.py", line 113, in <module>
    main()
  File "/datachain/examples/get_started/json-csv-reader.py", line 108, in main
    traceback.print_stack(frame)

Now that we avoid threads for the pyarrow to_batches operation (here) I cannot terminate the program without killing the Python process via another terminal. I have started another run now and will monitor it for the next couple of hours to see if it ever returns (it should only take 10 minutes to run through the whole csv) edit: The nrows=3 process does return in roughly the same time it would take to read the entire file as long as I leave it alone.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working performance priority-p1
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants