Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.UnsupportedOperationException using pandas in Spark #1168

Closed
angelcervera opened this issue Sep 25, 2020 · 6 comments · Fixed by #1198
Closed

java.lang.UnsupportedOperationException using pandas in Spark #1168

angelcervera opened this issue Sep 25, 2020 · 6 comments · Fixed by #1198
Labels
tag:Upstream A problem with one of the upstream packages installed in the docker images type:Bug A problem with the definition of one of the docker images maintained here

Comments

@angelcervera
Copy link

angelcervera commented Sep 25, 2020

What docker image you are using?
jupyter/all-spark-notebook

What complete docker command do you run to launch the container (omitting sensitive values)?
docker run -d -p 8888:8888 jupyter/all-spark-notebook

What steps do you take once the container is running to reproduce the issue?

  1. Visit http://localhost:8888 with the token
  2. Create a new spylon-kernel notebook.
  3. Copy and paste the example from Spark documentation:
    %%python
    from pyspark.sql.functions import pandas_udf
    df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
    def filter_func(iterator):
        for pdf in iterator:
            yield pdf[pdf.id == 1]
    
    df.mapInPandas(filter_func, df.schema).show()
  4. Enjoy the exception:
Traceback (most recent call last):
  File "python cell", line 7, in <module>
  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 440, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
Py4JJavaError: An error occurred while calling o135.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, a9b99f9c2052, executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

What do you expect to happen?

+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+

What actually happens?

There is know limitation with pySpark (more precise with Arrow) and Java11 and there are few option to fix it. Downgrade to Java8 or patch the configuration adding -Dio.netty.tryReflectionSetAccessible=true property.

Reference
From the Spark documentation

For Java 11, -Dio.netty.tryReflectionSetAccessible=true is required additionally for Apache Arrow library. This prevents java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available when Apache Arrow uses Netty internally.

@romainx
Copy link
Collaborator

romainx commented Sep 26, 2020

Hello @angelcervera,

Thanks for reporting this issue.

Not sure we want to downgrade to Java 8. So if you want to draft a PR to implement the -Dio.netty.tryReflectionSetAccessible=true solution, go for it. In this case, it would be interesting to add a test case for that based on this example.

https://github.com/jupyter/docker-stacks/blob/332db3db9ebb51167ef6b21cf261022bd0fc4eae/pyspark-notebook/test/test_spark.py

Many thanks.

@romainx romainx added type:Bug A problem with the definition of one of the docker images maintained here tag:Upstream A problem with one of the upstream packages installed in the docker images labels Sep 26, 2020
@Bidek56
Copy link
Contributor

Bidek56 commented Sep 29, 2020

Please add these config options to resolve the issue:

conf.set("spark.sql.legacy.setCommandRejectsSparkCoreConfs","false") conf.set("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") conf.set("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")

I am going to try upgrading JRE to see if this issue is fixed in the newer versions, but last time I have tried, Spark would not work with anything higher than JRE 11, which is very old these days.

@angelcervera
Copy link
Author

angelcervera commented Sep 29, 2020 via email

@Bidek56
Copy link
Contributor

Bidek56 commented Sep 29, 2020

I just tried upgrading Open JDK to 14, Spark work OK but this error still exists.
Not sure if it's worth upgrading JDK to 14 since it's not a LTS and it does not fixes this issue.
We may have to wait until JDK 14.0.2 or 15 to be available.

@Bidek56
Copy link
Contributor

Bidek56 commented Sep 29, 2020

I may do a PR for JDK 14 anyway hoping that it fixes this issue.

@romainx
Copy link
Collaborator

romainx commented Dec 13, 2020

Hello, just submitted the PR #1198 to fix it.
Please let me know if it's OK for you.
Best.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tag:Upstream A problem with one of the upstream packages installed in the docker images type:Bug A problem with the definition of one of the docker images maintained here
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants