Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(pyarrow, HDFS) Unable to connect HDFS in pyspark YARN cluster setting #37057

Open
kyoungrok0517 opened this issue Aug 8, 2023 · 0 comments
Open

Comments

@kyoungrok0517
Copy link

kyoungrok0517 commented Aug 8, 2023

Describe the usage question you have. Please include as many useful details as possible.

Hello. I'm trying to interact with HDFS storage from a driver and workers of pyspark YARN cluster. Precisely I'm using huggingface's datasets (link) library that relies on pyarrow to communicate with HDFS. The from_spark() (link) is what I'm invoking in my script.

Below is the error I'm encountering. Note that I've masked sensitive paths. My code is sent to worker containers (docker) from driver container then executed. I confirmed that in both driver and worker images I can connect to HDFS using pyarrow since the envs and required jars are properly set, but strangely that becomes impossible when the same image runs as remote worker process.

These are some peculiarities in my environment that might caused this issue.

  • Cluster requires kerberos authentication
    • But I think the error message implies that's not the problem in this case
  • The user that runs the worker process is different from that built the docker image
    • To avoid permission-related issues I made all directories that are accessed from the script accessible to everyone
  • Pyspark-part of my code has no problem interacting with HDFS.
    • Even pyarrow doesn't experience problem when I run the code in interactive session of the same docker images (driver, worker)
    • The problem occurs only when it runs as cluster's worker runtime

Hope I could get some help. Thanks.

2023-08-08 18:51:19,638 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-08-08 18:51:20,280 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
23/08/08 18:51:22 WARN TaskSetManager: Lost task 0.0 in stage 142.0 (TID 9732) (ac3bax2062.bdp.bdata.ai executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000003/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
  File "/root/spark/python/pyspark/rdd.py", line 828, in func
  File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe
    open(probe_file, "a")
  File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper
    return function(*args, download_config=download_config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen
    file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open
    out = open_files(
          ^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files
    fs, fs_token, paths = get_fs_token_paths(
                          ^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths
    fs = filesystem(protocol, **inkwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem
    return cls(**storage_options)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__
    obj = super().__call__(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__
    fs = HadoopFileSystem(
         ^^^^^^^^^^^^^^^^^
  File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: HDFS connection failed

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

23/08/08 18:51:24 WARN TaskSetManager: Lost task 0.1 in stage 142.0 (TID 9733) (ac3iax2079.bdp.bdata.ai executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000005/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
  File "/root/spark/python/pyspark/rdd.py", line 828, in func
  File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe
    open(probe_file, "a")
  File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper
    return function(*args, download_config=download_config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen
    file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open
    out = open_files(
          ^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files
    fs, fs_token, paths = get_fs_token_paths(
                          ^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths
    fs = filesystem(protocol, **inkwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem
    return cls(**storage_options)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__
    obj = super().__call__(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__
    fs = HadoopFileSystem(
         ^^^^^^^^^^^^^^^^^
  File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: HDFS connection failed

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

23/08/08 18:51:38 WARN TaskSetManager: Lost task 0.2 in stage 142.0 (TID 9734) (<MASKED> executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "<MASKED>/application_1682476586273_25865777/container_e143_1682476586273_25865777_01_000008/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
  File "/root/spark/python/pyspark/rdd.py", line 828, in func
  File "/opt/conda/lib/python3.11/site-packages/datasets/packaged_modules/spark/spark.py", line 130, in create_cache_and_write_probe
    open(probe_file, "a")
  File "/opt/conda/lib/python3.11/site-packages/datasets/streaming.py", line 74, in wrapper
    return function(*args, download_config=download_config, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/datasets/download/streaming_download_manager.py", line 496, in xopen
    file_obj = fsspec.open(file, mode=mode, *args, **kwargs).open()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 439, in open
    out = open_files(
          ^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 282, in open_files
    fs, fs_token, paths = get_fs_token_paths(
                          ^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 609, in get_fs_token_paths
    fs = filesystem(protocol, **inkwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/registry.py", line 267, in filesystem
    return cls(**storage_options)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 79, in __call__
    obj = super().__call__(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/fsspec/implementations/arrow.py", line 278, in __init__
    fs = HadoopFileSystem(
         ^^^^^^^^^^^^^^^^^
  File "pyarrow/_hdfs.pyx", line 96, in pyarrow._hdfs.HadoopFileSystem.__init__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: HDFS connection failed

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Component(s)

Integration, Parquet, Python

@kyoungrok0517 kyoungrok0517 added the Type: usage Issue is a user question label Aug 8, 2023
@kyoungrok0517 kyoungrok0517 changed the title Unable to connect HDFS in pyspark YARN cluster setting Unable to connect HDFS in pyspark YARN cluster setting (pyarrow) Aug 9, 2023
@kyoungrok0517 kyoungrok0517 changed the title Unable to connect HDFS in pyspark YARN cluster setting (pyarrow) (pyarrow, HDFS) Unable to connect HDFS in pyspark YARN cluster setting Aug 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant