Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stocator throws error on count operation when reading .root file with 537 columns and 230 rows #122

Closed
charles2588 opened this issue Mar 17, 2017 · 8 comments

Comments

@charles2588
Copy link

val dfData1 = spark.
read.format("org.dianahep.sparkroot").
option("tree", "Events").
option("inferSchema", "true").
load("swift://testobjectstorage." + name + "/test.root")
dfData1.count()

The reference example
https://github.com/diana-hep/spark-root/blob/master/ipynb/publicCMSMuonia_exampleAnalysis_wROOT.ipynb

Printschema works fine but count operation doesn't work.

Note the number of columns are 537.
No error when reading from local file system.
Error for count when reading from Object storage
.
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 1 in stage 6.0 failed 10 times, most recent failure: Lost task 1.9 in stage 6.0 (TID 35, yp-spark-dal09-env5-0021): com.ibm.stocator.fs.common.exception.ConfigurationParseException: Configuration parse exception: Missing mandatory configuration: .auth.url
at com.ibm.stocator.fs.common.Utils.updateProperty(Utils.java:183)
at com.ibm.stocator.fs.swift.ConfigurationHandler.initialize(ConfigurationHandler.java:80)
at com.ibm.stocator.fs.swift.SwiftAPIClient.initiate(SwiftAPIClient.java:179)
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:124)
at com.ibm.stocator.fs.ObjectStoreFileSystem.initialize(ObjectStoreFileSystem.java:90)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.dianahep.root4j.RootFileReader.(RootFileReader.java:181)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:121)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:119)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:785)
Driver stacktrace:
StackTrace: at com.ibm.stocator.fs.common.Utils.updateProperty(Utils.java:183)
at com.ibm.stocator.fs.swift.ConfigurationHandler.initialize(ConfigurationHandler.java:80)
at com.ibm.stocator.fs.swift.SwiftAPIClient.initiate(SwiftAPIClient.java:179)
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:124)
at com.ibm.stocator.fs.ObjectStoreFileSystem.initialize(ObjectStoreFileSystem.java:90)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.dianahep.root4j.RootFileReader.(RootFileReader.java:181)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:121)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:119)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:785)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1461)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1449)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1448)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1448)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:812)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:812)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:812)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1674)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1629)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at java.lang.Thread.getStackTrace(Thread.java:1117)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1887)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1900)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:932)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:378)
at org.apache.spark.rdd.RDD.collect(RDD.scala:931)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
... 48 elided
Caused by: com.ibm.stocator.fs.common.exception.ConfigurationParseException: Configuration parse exception: Missing mandatory configuration: .auth.url
at com.ibm.stocator.fs.common.Utils.updateProperty(Utils.java:183)
at com.ibm.stocator.fs.swift.ConfigurationHandler.initialize(ConfigurationHandler.java:80)
at com.ibm.stocator.fs.swift.SwiftAPIClient.initiate(SwiftAPIClient.java:179)
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:124)
at com.ibm.stocator.fs.ObjectStoreFileSystem.initialize(ObjectStoreFileSystem.java:90)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.dianahep.root4j.RootFileReader.(RootFileReader.java:181)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:121)
at org.dianahep.sparkroot.package$RootTableScan$$anonfun$2.apply(sparkroot.scala:119)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

@djalova
Copy link
Contributor

djalova commented Mar 17, 2017

Hi @charles2588
This seems to be an issue with the configuration settings. You are missing the value for the fs.swift2d.service.SERVICE_NAME.auth.url configuration key.

@charles2588
Copy link
Author

@djalova
I am able to read other files like .csv or .parquet with no issues.
Its specifically issue with .root files.

@gilv
Copy link
Contributor

gilv commented Mar 20, 2017

@djalova Can you check it please?

@djalova
Copy link
Contributor

djalova commented Mar 20, 2017

@charles2588 Are the configs set after Spark has started running or are they configured in the configuration files?

@charles2588
Copy link
Author

charles2588 commented Mar 21, 2017

@djalova

Yes , we set the config after Spark has started.
BTW, this is for Bluemix Spark Service + Data Science Experience Jupyter Notebook.
from pyspark.sql import SparkSession

def set_hadoop_config_with_credentials_216c032f3f574763ae975c6a83a0d523(name):
"""This function sets the Hadoop configuration so it is possible to
access data from Bluemix Object Storage using Spark"""

prefix = 'fs.swift.service.' + name
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
hconf.set(prefix + '.tenant', 'XXXXXX')
hconf.set(prefix + '.username', 'XXXXXX')
hconf.set(prefix + '.password', 'XXXXXXXX')
hconf.setInt(prefix + '.http.port', 8080)
hconf.set(prefix + '.region', 'dallas')
hconf.setBoolean(prefix + '.public', False)

name = 'keystone'
set_hadoop_config_with_credentials_216c032f3f574763ae975c6a83a0d523(name)

@djalova
Copy link
Contributor

djalova commented Mar 21, 2017

@charles2588 I think this is an issue with the root4j package. For some operations it uses the same Configuration instance as the one found in Spark. But in other cases it creates a new Configuration, so it loses the configs in set in Spark.
Here's an example of when it creates a new instance:
https://github.com/diana-hep/root4j/blob/master/src/main/java/org/dianahep/root4j/RootFileReader.java#L179

@superdude264
Copy link

Given root4j is creating a new Hadoop config, I think the issue of why I couldn't read from Object Storage has been resolved from stocator's point of view. I have created an issue w/ root4j and will follow-up with them. Thanks for looking into this.

@charles2588
Copy link
Author

thanks @djalova for looking into this. thanks @superdude264 i will close this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants