Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use existing Spark installation #137

Open
themoritz opened this issue Mar 2, 2021 · 8 comments
Open

How to use existing Spark installation #137

themoritz opened this issue Mar 2, 2021 · 8 comments
Labels
bug Installation and functionality issues

Comments

@themoritz
Copy link

I have an existing Spark installation on a Hadoop cluster that I'd like to use from the Kotlin notebook and have trouble depending on the existing jar files on the system. I've tried two things:

Modify Config

I have tried adding the respective folders to the config.json:

{
    "mainJar": "kotlin-jupyter-kernel-0.8.3.236.jar",
    "mainClass": "org.jetbrains.kotlinx.jupyter.IkotlinKt",
    "classPath": [
        "/etc/spark2/conf/alpaca-spark23-1.0.1.127-full.jar",
        "/etc/spark2/conf/",
        "/usr/hdp/3.1.0.53-1/spark2/jars/*",
        "/etc/hadoop/conf/",
        "lib-0.8.3.236.jar",
        "api-0.8.3.236.jar",
        "kotlin-script-runtime-1.4.30.jar",
        "kotlin-reflect-1.4.30.jar",
        "kotlin-stdlib-1.4.30.jar",
        "kotlin-stdlib-common-1.4.30.jar",
        "annotations-13.0.jar",
        "kotlinx-serialization-json-jvm-1.0.1.jar",
        "kotlinx-serialization-core-jvm-1.0.1.jar"
    ],
    "debuggerConfig": ""
}

The first 4 additional lines in the classPath section correspond to the class path that's used when I do spark-shell on the cluster:

$ SPARK_PRINT_LAUNCH_COMMAND=true spark-shell
Spark Command: /usr/java/openjdk-1.8.0_252/bin/java -Dhdp.version=3.1.0.53-1 -cp /etc/spark2/conf/alpaca-spark23-1.0.1.127-full.jar:/etc/spark2/conf/:/usr/hdp/3.1.0.53-1/spark2/jars/*:/etc/hadoop/conf/ -Dscala.usejavacp=true -Xmx1g -Dhdp.version=3.1.0.53-1 org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell spark-shell

Based on the kernel console output, the class path config seems to be picked up:

[main] INFO ikotlin - Classpath used in script: [/etc/spark2/conf/alpaca-spark23-1.0.1.127-full.jar, /etc/spark2/conf, /usr/hdp/3.1.0.53-1/spark2/jars/*, /etc/hadoop/conf, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/lib-0.8.3.236.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/api-0.8.3.236.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlin-script-runtime-1.4.30.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlin-reflect-1.4.30.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlin-stdlib-1.4.30.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlin-stdlib-common-1.4.30.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/annotations-13.0.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlinx-serialization-json-jvm-1.0.1.jar, /tmp/moritz/miniconda3/lib/python3.8/site-packages/run_kotlin_kernel/jars/kotlinx-serialization-core-jvm-1.0.1.jar]

But in my notebook I am unable to import anything from org.apache.spark.

DependsOn

The other thing I tried (based on an answer in #49) is directly importing the jars in the notebook:

@file:DependsOn("/usr/hdp/3.1.0.53-1/spark2/jars/spark-core_2.11-2.3.2.3.1.0.53-1.jar")

This allows me to import eg org.apache.spark.SparkConf, but of course all of the transitive dependencies are missing. I'd like to write something like

@file:DependsOn("/usr/hdp/3.1.0.53-1/spark2/jars/*")

but that doesn't work either because the glob pattern isn't recognized.

Any help greatly appreciated!

@themoritz
Copy link
Author

Looking into this more, it seems that the issue with my first attempt is that the classPath field in the config also doesn't support the glob pattern. I'm very well able to import packages from the first path which directly specifies a jar.

@ileasile
Copy link
Collaborator

ileasile commented Mar 2, 2021

Yes, we don't support glob patterns for dependencies.
You may try several things:

  1. %use spark. It will import Kotlin Spark API and you will be able to use it from your code
  2. Write your own library descriptor. It is json file where you need to specify all the dependencies once, an then just add it via %use path-to-json
  3. Use Spark from Maven repo (maven central or maven local). In case you specify maven repo and artifact coordinates, transitive dependencies are resolved.

You may find additional info in Readme.

@themoritz
Copy link
Author

Great, option 2 works if I add all of the individual JARs to the dependencies section that match the glob pattern. I am now able to import the Spark classes but am stuck creating a session because the configuration files can't be found. How would you treat configuration files, like core-site.xml Hadoop config? When I add these files I end up with this error:

WARN: error in opening zip file: /etc/hadoop/conf/core-site.xml
java.util.zip.ZipException: error in opening zip file

@ileasile
Copy link
Collaborator

ileasile commented Mar 3, 2021

Dependencies are JVM dependencies, not configs and other files. It seems that your Spark server should load this config itself on starting the session

@themoritz
Copy link
Author

themoritz commented Mar 6, 2021

Alright, so it seems that I made progress. I'm now able to create a JavaSparkContext that's connected to YARN in my cluster by doing the following:

  1. I'm exporting various Spark and Hadoop environment variables (like HADOOP_HOME, SPARK_HOME, ...) before I launch Jupyter
  2. I'm loading various system properties that affect spark in my notebook's fist cell via
    System.getProperties().load(FileInputStream("spark.properties"))
    
  3. I tweaked the line that starts the kotlin kernel in run_kernel.py so that the hadoop libs are able to read the cluster config:
    subprocess.call(['java'] + debug_list + ['-cp', '/etc/hadoop/conf/:/etc/spark2/conf/:' + main_jar_path, 'org.jetbrains.kotlinx.jupyter.IkotlinKt', '-classpath=' + class_path_arg, connection_file, '-home=' + executables_dir])
  4. I'm loading the cluster spark jars using my own library descriptor as you suggested.

However, now I'm running into the problem that the worker nodes don't seem to be able to load the code that I'm writing in my notebook. So my question is, how can I distribute the compiled classes from my notebook to the workers? This is the code in my cell:

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext

val conf = SparkConf().setAppName("Kotlin Notebook").setMaster("yarn")
val sc = JavaSparkContext(conf)

sc.parallelize(listOf(1,2,3)).map { it + 1 }.collect()
And exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, brdn6213.cluster.com, executor 1): java.lang.ClassNotFoundException: Line_5$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1925)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
scala.Option.foreach(Option.scala:257)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.collect(RDD.scala:944)
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
Line_5.<init>(Line_5.jupyter-kts:23)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:96)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:41)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt)
kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:51)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:63)
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:63)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:59)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:28)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withHost(repl.kt:515)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:58)
org.jetbrains.kotlinx.jupyter.repl.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:20)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:357)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:144)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withEvalContext(repl.kt:335)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.eval(repl.kt:350)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt:287)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt)
org.jetbrains.kotlinx.jupyter.ProtocolKt.evalWithIO(protocol.kt:443)
org.jetbrains.kotlinx.jupyter.ProtocolKt.shellMessagesHandler(protocol.kt:286)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer(ikotlin.kt:133)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer$default(ikotlin.kt:101)
org.jetbrains.kotlinx.jupyter.IkotlinKt.main(ikotlin.kt:74)

@themoritz
Copy link
Author

Ah, I think I just learned what the magic line %dumpClassesForSpark is all about. Working now! 🎉

@themoritz
Copy link
Author

themoritz commented Mar 7, 2021

Ok, I think this actually did not end up solving the problem. Whenever I map custom code over an rdd, like here

sc.parallelize(listOf(1,2,3)).map { it + 1 }.collect()
I get the ClassNotFoundException:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, brdn6148.target.com, executor 1): java.lang.ClassNotFoundException: Line_126$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1925)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

And it also seems to have issues with sending closures to the executors:

val x = 3
sc.parallelize(listOf(1,2,3)).map { it + x }.collect()
Exception
org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.map(RDD.scala:370)
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
Line_132.<init>(Line_132.jupyter-kts:3)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:96)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:41)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt)
kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:51)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:63)
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:63)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:59)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:28)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withHost(repl.kt:515)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:58)
org.jetbrains.kotlinx.jupyter.repl.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:20)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:357)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:144)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withEvalContext(repl.kt:335)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.eval(repl.kt:350)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt:287)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt)
org.jetbrains.kotlinx.jupyter.ProtocolKt.evalWithIO(protocol.kt:443)
org.jetbrains.kotlinx.jupyter.ProtocolKt.shellMessagesHandler(protocol.kt:286)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer(ikotlin.kt:133)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer$default(ikotlin.kt:101)
org.jetbrains.kotlinx.jupyter.IkotlinKt.main(ikotlin.kt:74)

Is this an expected limitation of the way the cells are being interpreted, or is it an issue on my end? I guess I'm also still a bit unclear about what exactly %dumpClassesForSpark is doing.

@themoritz
Copy link
Author

I could fix the basic map parallelization by executing %dumpClassesForSpark before creating the SparkContext.


However, some snippets still don't work:

val x = 3

sc.parallelize(listOf(1,2,3)).map {
    it + x
}.collect()
Exception
org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.map(RDD.scala:370)
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
Line_95.<init>(Line_95.jupyter-kts:3)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:96)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:41)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt)
kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:51)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:63)
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:63)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:59)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:28)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withHost(repl.kt:519)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:58)
org.jetbrains.kotlinx.jupyter.repl.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:20)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:357)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:144)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withEvalContext(repl.kt:335)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.eval(repl.kt:350)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt:287)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt)
org.jetbrains.kotlinx.jupyter.ProtocolKt.evalWithIO(protocol.kt:443)
org.jetbrains.kotlinx.jupyter.ProtocolKt.shellMessagesHandler(protocol.kt:286)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer(ikotlin.kt:133)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer$default(ikotlin.kt:101)
org.jetbrains.kotlinx.jupyter.IkotlinKt.main(ikotlin.kt:74)

This works, however:

val go = {
    val x = 3
    
    sc.parallelize(listOf(1,2,3)).map {
        it + x
    }.collect()
}

go()

This again doesn't work:

val go = {
    fun f(i: Int) = i + 3
    
    sc.parallelize(listOf(1,2,3)).map {
        f(it)
    }.collect()
}

go()
Exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 10, brdn6012.target.com, executor 1): java.lang.NoClassDefFoundError: Line_230$go$1$1
	at Line_230$go$1$2.call(Line_230.jupyter-kts:5)
	at Line_230$go$1$2.call(Line_230.jupyter-kts:1)
	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
scala.Option.foreach(Option.scala:257)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.collect(RDD.scala:944)
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
Line_230$go$1.invoke(Line_230.jupyter-kts:6)
Line_230$go$1.invoke(Line_230.jupyter-kts:1)
Line_230.<init>(Line_230.jupyter-kts:9)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:96)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:41)
kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt)
kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:51)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:63)
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:63)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:59)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$$inlined$with$lambda$1.invoke(CellExecutorImpl.kt:28)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withHost(repl.kt:519)
org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:58)
org.jetbrains.kotlinx.jupyter.repl.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:20)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:357)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl$eval$1.invoke(repl.kt:144)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.withEvalContext(repl.kt:335)
org.jetbrains.kotlinx.jupyter.ReplForJupyterImpl.eval(repl.kt:350)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt:287)
org.jetbrains.kotlinx.jupyter.ProtocolKt$shellMessagesHandler$res$1.invoke(protocol.kt)
org.jetbrains.kotlinx.jupyter.ProtocolKt.evalWithIO(protocol.kt:443)
org.jetbrains.kotlinx.jupyter.ProtocolKt.shellMessagesHandler(protocol.kt:286)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer(ikotlin.kt:133)
org.jetbrains.kotlinx.jupyter.IkotlinKt.kernelServer$default(ikotlin.kt:101)
org.jetbrains.kotlinx.jupyter.IkotlinKt.main(ikotlin.kt:74)

@ileasile ileasile added the bug Installation and functionality issues label Jul 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Installation and functionality issues
Projects
None yet
Development

No branches or pull requests

2 participants