Skip to content
This repository has been archived by the owner on Nov 16, 2019. It is now read-only.

An Py4JJavaError happened when follow the python instructions #61

Closed
dejunzhang opened this issue Apr 27, 2016 · 21 comments
Closed

An Py4JJavaError happened when follow the python instructions #61

dejunzhang opened this issue Apr 27, 2016 · 21 comments

Comments

@dejunzhang
Copy link

Hi, i am following the python instructions from:
https://github.com/yahoo/CaffeOnSpark/wiki/GetStarted_python
and trying to use the python APIs to train models. But when i use the following example command:
pushd ${CAFFE_ON_SPARK}/data/
unzip ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
IPYTHON=1 pyspark --master yarn
--num-executors 1
--driver-library-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--driver-class-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--conf spark.cores.max=1
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}"
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"
--py-files ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
--files ${CAFFE_ON_SPARK}/data/caffe/_caffe.so
--jars "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
Then run examples as below, there is a error appeared for the last line:
from pyspark import SparkConf,SparkContext
from com.yahoo.ml.caffe.RegisterContext import registerContext,registerSQLContext
from com.yahoo.ml.caffe.CaffeOnSpark import CaffeOnSpark
from com.yahoo.ml.caffe.Config import Config
from com.yahoo.ml.caffe.DataSource import DataSource
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
registerContext(sc)
registerSQLContext(sqlContext)
cos=CaffeOnSpark(sc,sqlContext)
cfg=Config(sc)
cfg.protoFile='/Users/afeng/dev/ml/CaffeOnSpark/data/lenet_memory_solver.prototxt'
cfg.modelPath = 'file:/tmp/lenet.model'
cfg.devices = 1
cfg.isFeature=True
cfg.label='label'
cfg.features=['ip1']
cfg.outputFormat = 'json'
cfg.clusterSize = 1
cfg.lmdb_partitions=cfg.clusterSize

Train

dl_train_source = DataSource(sc).getSource(cfg,True)
cos.train(dl_train_source) <------------------error happened after call this.

the error message is :
In [41]: cos.train(dl_train_source)
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at CaffeOnSpark.scala:127
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 4 (collect at CaffeOnSpark.scala:127) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (collect at CaffeOnSpark.scala:127)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.2 KB, free 23.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.1 KB, free 25.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.110.53.146:59213 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 4.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 10, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on sweet:46000 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 4 (collect at CaffeOnSpark.scala:127) finished in 0.084 s
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 10) in 84 ms on sweet (1/1)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 4 finished: collect at CaffeOnSpark.scala:127, took 0.092871 s
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank = 0, address = null, hostname = sweet
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank 0:sweet
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 112.0 B, free 26.0 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 221.0 B, free 26.3 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.110.53.146:59213 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 6 from broadcast at CaffeOnSpark.scala:146
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at CaffeOnSpark.scala:155
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 5 (collect at CaffeOnSpark.scala:155) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 5 (collect at CaffeOnSpark.scala:155)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.6 KB, free 28.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1597.0 B, free 30.4 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on 10.110.53.146:59213 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 5.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 11, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on sweet:46000 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on sweet:46000 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 11) in 48 ms on sweet (1/1)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 5 (collect at CaffeOnSpark.scala:155) finished in 0.049 s
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 5 finished: collect at CaffeOnSpark.scala:155, took 0.058122 s
16/04/27 10:44:34 INFO caffe.LmdbRDD: local LMDB path:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_train_lmdb
16/04/27 10:44:34 INFO caffe.LmdbRDD: 1 LMDB RDD partitions
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: reduce at CaffeOnSpark.scala:205
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 6 (reduce at CaffeOnSpark.scala:205) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (reduce at CaffeOnSpark.scala:205)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[17] at mapPartitions at CaffeOnSpark.scala:190), which has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 3.4 KB, free 33.8 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 2.2 KB, free 35.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on 10.110.53.146:59213 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[17] at mapPartitions at CaffeOnSpark.scala:190)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 6.0 with 1 tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 12, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on sweet:46000 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added rdd_12_0 on disk on sweet:46000 (size: 26.0 B)
16/04/27 10:44:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12, sweet): java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 6.0 (TID 13, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 6.0 (TID 13) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 1]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 6.0 (TID 14, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 6.0 (TID 14) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 2]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 6.0 (TID 15, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 6.0 (TID 15) on executor sweet: java.lang.UnsupportedOperationException (empty.reduceLeft) [duplicate 3]
16/04/27 10:44:34 ERROR scheduler.TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
16/04/27 10:44:34 INFO cluster.YarnScheduler: Cancelling stage 6
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 6 (reduce at CaffeOnSpark.scala:205) failed in 0.117 s

16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 6 failed: reduce at CaffeOnSpark.scala:205, took 0.124712 s

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 cos.train(dl_train_source)

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/CaffeOnSpark.py in train(self, train_source)
29 :param DataSource: the source for training data
30 """
---> 31 self.dict.get('cos').train(train_source)
32
33 def test(self,test_source):

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py in call(self, _args)
814 for i in self.syms:
815 try:
--> 816 return callJavaMethod(i,self.javaInstance,self._evalDefaults(),self.mirror,_args)
817 except Py4JJavaError:
818 raise

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py in callJavaMethod(sym, javaInstance, defaults, mirror, _args)
617 return javaInstance(__getConvertedTuple(args,sym,defaults,mirror))
618 else:
--> 619 return toPython(javaInstance.getattr(name)(*_getConvertedTuple(args,sym,defaults,mirror)))
620 #It is good for debugging to know whether the argument conversion was successful.
621 #If it was, a Py4JJavaError may be raised from the Java code.

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in call(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(_a, *_kw)
43 def deco(_a, *_kw):
44 try:
---> 45 return f(_a, *_kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o2122.train.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 15, sweet): java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
at com.yahoo.ml.caffe.CaffeOnSpark.train(CaffeOnSpark.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: empty.reduceLeft
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more

Could you please help me to check what was happened?

@mriduljain
Copy link
Contributor

Did you change the path to your prototxt file and also mentioned the data source accordingly, in it?
cfg.protoFile='/Users/afeng/dev/ml/CaffeOnSpark/data/lenet_memory_solver.prototxt'

@anfeng
Copy link
Contributor

anfeng commented Apr 27, 2016

You need to adjust configuration file.

cfg.protoFile='/Users/afeng/dev/ml/CaffeOnSpark/data/
lenet_memory_solver.prototxt'

Andy

On Tue, Apr 26, 2016 at 8:07 PM, dejunzhang notifications@github.com
wrote:

Hi, i am following the python instructions from:
https://github.com/yahoo/CaffeOnSpark/wiki/GetStarted_python
and trying to use the python APIs to train models. But when i use the
following example command:
pushd ${CAFFE_ON_SPARK}/data/
unzip ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
IPYTHON=1 pyspark --master yarn
--num-executors 1
--driver-library-path
"${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"

--driver-class-path
"${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"

--conf spark.cores.max=1
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}"
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"
--py-files ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
--files ${CAFFE_ON_SPARK}/data/caffe/_caffe.so
--jars
"${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"

Then run examples as below, there is a error appeared for the last line:
from pyspark import SparkConf,SparkContext
from com.yahoo.ml.caffe.RegisterContext import
registerContext,registerSQLContext
from com.yahoo.ml.caffe.CaffeOnSpark import CaffeOnSpark
from com.yahoo.ml.caffe.Config import Config
from com.yahoo.ml.caffe.DataSource import DataSource
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
registerContext(sc)
registerSQLContext(sqlContext)
cos=CaffeOnSpark(sc,sqlContext)
cfg=Config(sc)

cfg.protoFile='/Users/afeng/dev/ml/CaffeOnSpark/data/lenet_memory_solver.prototxt'
cfg.modelPath = 'file:/tmp/lenet.model'
cfg.devices = 1
cfg.isFeature=True
cfg.label='label'
cfg.features=['ip1']
cfg.outputFormat = 'json'
cfg.clusterSize = 1
cfg.lmdb_partitions=cfg.clusterSize
#Train
dl_train_source = DataSource(sc).getSource(cfg,True)
cos.train(dl_train_source) <------------------error happened after call
this.

the error message is :
In [41]: cos.train(dl_train_source)
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at
CaffeOnSpark.scala:127
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 4 (collect at
CaffeOnSpark.scala:127) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 4
(collect at CaffeOnSpark.scala:127)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 4
(MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116), which has no
missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5 stored as
values in memory (estimated size 3.2 KB, free 23.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_5_piece0
stored as bytes in memory (estimated size 2.1 KB, free 25.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0
in memory on 10.110.53.146:59213 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 5 from
broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 4 (MapPartitionsRDD[14] at map at CaffeOnSpark.scala:116)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 4.0 with 1
tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 4.0 (TID 10, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_5_piece0
in memory on sweet:46000 (size: 2.1 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 4 (collect at
CaffeOnSpark.scala:127) finished in 0.084 s
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
stage 4.0 (TID 10) in 84 ms on sweet (1/1)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 4.0, whose
tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 4 finished: collect at
CaffeOnSpark.scala:127, took 0.092871 s
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank = 0, address = null,
hostname = sweet
16/04/27 10:44:34 INFO caffe.CaffeOnSpark: rank 0:sweet
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6 stored as
values in memory (estimated size 112.0 B, free 26.0 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_6_piece0
stored as bytes in memory (estimated size 221.0 B, free 26.3 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0
in memory on 10.110.53.146:59213 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 6 from
broadcast at CaffeOnSpark.scala:146
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: collect at
CaffeOnSpark.scala:155
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 5 (collect at
CaffeOnSpark.scala:155) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 5
(collect at CaffeOnSpark.scala:155)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 5
(MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149), which has no
missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7 stored as
values in memory (estimated size 2.6 KB, free 28.9 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_7_piece0
stored as bytes in memory (estimated size 1597.0 B, free 30.4 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0
in memory on 10.110.53.146:59213 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 7 from
broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 5 (MapPartitionsRDD[16] at map at CaffeOnSpark.scala:149)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 5.0 with 1
tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 5.0 (TID 11, sweet, partition 0,PROCESS_LOCAL, 2169 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_7_piece0
in memory on sweet:46000 (size: 1597.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_6_piece0
in memory on sweet:46000 (size: 221.0 B, free: 511.5 MB)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
stage 5.0 (TID 11) in 48 ms on sweet (1/1)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 5 (collect at
CaffeOnSpark.scala:155) finished in 0.049 s
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 5.0, whose
tasks have all completed, from pool
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 5 finished: collect at
CaffeOnSpark.scala:155, took 0.058122 s
16/04/27 10:44:34 INFO caffe.LmdbRDD: local LMDB
path:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_train_lmdb
16/04/27 10:44:34 INFO caffe.LmdbRDD: 1 LMDB RDD partitions
16/04/27 10:44:34 INFO spark.SparkContext: Starting job: reduce at
CaffeOnSpark.scala:205
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Got job 6 (reduce at
CaffeOnSpark.scala:205) with 1 output partitions
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Final stage: ResultStage 6
(reduce at CaffeOnSpark.scala:205)
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Parents of final stage:
List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Missing parents: List()
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting ResultStage 6
(MapPartitionsRDD[17] at mapPartitions at CaffeOnSpark.scala:190), which
has no missing parents
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8 stored as
values in memory (estimated size 3.4 KB, free 33.8 KB)
16/04/27 10:44:34 INFO storage.MemoryStore: Block broadcast_8_piece0
stored as bytes in memory (estimated size 2.2 KB, free 35.9 KB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0
in memory on 10.110.53.146:59213 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO spark.SparkContext: Created broadcast 8 from
broadcast at DAGScheduler.scala:1006
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from ResultStage 6 (MapPartitionsRDD[17] at mapPartitions at
CaffeOnSpark.scala:190)
16/04/27 10:44:34 INFO cluster.YarnScheduler: Adding task set 6.0 with 1
tasks
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.0 in
stage 6.0 (TID 12, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added broadcast_8_piece0
in memory on sweet:46000 (size: 2.2 KB, free: 511.5 MB)
16/04/27 10:44:34 INFO storage.BlockManagerInfo: Added rdd_12_0 on disk on
sweet:46000 (size: 26.0 B)
16/04/27 10:44:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
6.0 (TID 12, sweet): java.lang.UnsupportedOperationException:
empty.reduceLeft
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.1 in
stage 6.0 (TID 13, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage
6.0 (TID 13) on executor sweet: java.lang.UnsupportedOperationException
(empty.reduceLeft) [duplicate 1]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.2 in
stage 6.0 (TID 14, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage
6.0 (TID 14) on executor sweet: java.lang.UnsupportedOperationException
(empty.reduceLeft) [duplicate 2]
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Starting task 0.3 in
stage 6.0 (TID 15, sweet, partition 0,PROCESS_LOCAL, 1992 bytes)
16/04/27 10:44:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage
6.0 (TID 15) on executor sweet: java.lang.UnsupportedOperationException
(empty.reduceLeft) [duplicate 3]
16/04/27 10:44:34 ERROR scheduler.TaskSetManager: Task 0 in stage 6.0
failed 4 times; aborting job
16/04/27 10:44:34 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose
tasks have all completed, from pool
16/04/27 10:44:34 INFO cluster.YarnScheduler: Cancelling stage 6
16/04/27 10:44:34 INFO scheduler.DAGScheduler: ResultStage 6 (reduce at
CaffeOnSpark.scala:205) failed in 0.117 s
16/04/27 10:44:34 INFO scheduler.DAGScheduler: Job 6 failed: reduce at
CaffeOnSpark.scala:205, took 0.124712 s

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 cos.train(dl_train_source)

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/CaffeOnSpark.py
in train(self, train_source)
29 :param DataSource: the source for training data
30 """
---> 31 self.dict.get('cos').train(train_source)
32
33 def test(self,test_source):

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py
in call(self,

_args) 814 for i in self.syms: 815 try: --> 816 return
callJavaMethod(i,self.javaInstance,self._evalDefaults(),self.mirror,_args)
817 except Py4JJavaError:
818 raise

/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/com/yahoo/ml/caffe/ConversionUtil.py
in callJavaMethod(sym, javaInstance, defaults, mirror,
args) 617 return javaInstance(

_getConvertedTuple(args,sym,defaults,mirror)) 618 else: --> 619 return
toPython(javaInstance.getattr
(name)(*_getConvertedTuple(args,sym,defaults,mirror)))
620 #It is good for debugging to know whether the argument conversion was
successful.
621 #If it was, a Py4JJavaError may be raised from the Java code.

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
in call(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc
in deco(_a, *_kw)
43 def deco(_a, *_kw):
44 try:
---> 45 return f(_a, *_kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()

/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling o2122.train.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage
6.0 (TID 15, sweet): java.lang.UnsupportedOperationException:
empty.reduceLeft
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
at com.yahoo.ml.caffe.CaffeOnSpark.train(CaffeOnSpark.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: empty.reduceLeft
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:167)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:195)
at scala.collection.AbstractIterator.reduce(Iterator.scala:1157)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:199)
at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$7.apply(CaffeOnSpark.scala:191)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more

Could you please help me to check what was happened?


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub
#61

@dejunzhang
Copy link
Author

And i check the variable cfg. the following error happend.
In [13]: cfg
Out[13]: ---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
in ()
----> 1 cfg

/usr/lib/python2.7/dist-packages/IPython/core/displayhook.pyc in call(self, result)
245 self.start_displayhook()
246 self.write_output_prompt()
--> 247 format_dict, md_dict = self.compute_format_data(result)
248 self.write_format_data(format_dict, md_dict)
249 self.update_user_ns(result)

/usr/lib/python2.7/dist-packages/IPython/core/displayhook.pyc in compute_format_data(self, result)
155
156 """
--> 157 return self.shell.display_formatter.format(result)
158
159 def write_format_data(self, format_dict, md_dict=None):

/usr/lib/python2.7/dist-packages/IPython/core/formatters.pyc in format(self, obj, include, exclude)
150 md = None
151 try:
--> 152 data = formatter(obj)
153 except:
154 # FIXME: log the exception

/usr/lib/python2.7/dist-packages/IPython/core/formatters.pyc in call(self, obj)
479 type_pprinters=self.type_printers,
480 deferred_pprinters=self.deferred_printers)
--> 481 printer.pretty(obj)
482 printer.flush()
483 return stream.getvalue()

/usr/lib/python2.7/dist-packages/IPython/lib/pretty.pyc in pretty(self, obj)
360 if callable(meth):
361 return meth(obj, self, cycle)
--> 362 return _default_pprint(obj, self, cycle)
363 finally:
364 self.end_group()

/usr/lib/python2.7/dist-packages/IPython/lib/pretty.pyc in _default_pprint(obj, p, cycle)
480 if getattr(klass, 'repr', None) not in _baseclass_reprs:
481 # A user-provided repr.
--> 482 p.text(repr(obj))
483 return
484 p.begin_group(1, '<')

TypeError: 'str' object is not callable

@dejunzhang
Copy link
Author

dejunzhang commented Apr 27, 2016

@mriduljain @anfeng yes. i changed the proto file path like below:
cfg.protoFile='/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/lenet_memory_solver.prototxt'

I also change the path in lenet_memory_train_test.prototxt.

@mriduljain
Copy link
Contributor

what about the path to your data in lenet_memory_train_test.prototxt?

@dejunzhang
Copy link
Author

@mriduljain, i changed from
source: "file:/Users/mridul/bigml/demodl/mnist_train_lmdb"
to:
source: "file:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_train_lmdb"

and from:
source: "file:/Users/mridul/bigml/demodl/mnist_test_lmdb/"
to:
source: "file:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_test_lmdb"

@dejunzhang
Copy link
Author

@mriduljain, @anfeng .Thank you very much for your reply and kindly help:)
i use the latest code from master branch.

@mriduljain
Copy link
Contributor

hmm it's confusing indeed. Jaa is throwing an exception. What is the size of your data files in mnist_train_lmdb

@dejunzhang
Copy link
Author

@mriduljain yes. you catch the problem. the data.mdb is damaged i think.
the size of data.mdb is 7KB, and data.mdb.filepart is about 60316 KB.
when i copy a new one from other machine, the problem disappeared.
Another problem happened that:

Requested # of executors: 1 actual # of executors:2. Please try to set --conf spark.scheduler.maxRegisteredResourcesWaitingTime with a large value (default 30s)
.........
actual number of executors is not as expected

After i add " --num-executors 1 " in the command. the problem is solved.
Thank you for your suggestions :)

@dejunzhang
Copy link
Author

@mriduljain after the training, when i run below code in notebook:
In [31]: lr_raw_source = DataSource(sc).getSource(cfg,False)
16/04/27 14:41:25 INFO caffe.DataSource$: Source data layer:1
16/04/27 14:41:25 INFO caffe.LMDB: Batch size:100
In [32]: extracted_df = cos.features(lr_raw_source)

there is another problem which is similar with this one:
#41 (comment)
error log is :
Py4JJavaError: An error occurred while calling o864.features.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#92L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#110L])
+- InMemoryColumnarTableScan InMemoryRelation [SampleID#74,ip1#75,label#76], true, 10000, StorageLevel(true, false, false, false, 1), ConvertToUnsafe, None

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1554)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1553)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1553)
at com.yahoo.ml.caffe.CaffeOnSpark.features(CaffeOnSpark.scala:267)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#110L])
+- InMemoryColumnarTableScan InMemoryRelation [SampleID#74,ip1#75,label#76], true, 10000, StorageLevel(true, false, false, false, 1), ConvertToUnsafe, None

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 29 more

Caused by: org.apache.spark.SparkException: addFile does not support local directories when not running local mode.
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1368)
* at com.yahoo.ml.caffe.LmdbRDD.getPartitions(LmdbRDD.scala:44)*
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
at org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 37 more

I also check the code of lmdbRDD.scala L43:

//make sourceFilePath downloaded to all nodes
if (!lmdb_path.startsWith(FSUtils.localfsPrefix))
  sc.addFile(lmdb_path, true)      <------------it's called. it means lmdb_path is not a local path. but the lmdb_path in lenet_memory_train_test.prototxt is:
source: "file:/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/mnist_train_lmdb"

@mriduljain
Copy link
Contributor

For now could you copy your dataset to hdfs and give hdfs:///...../mnist_train_lmdb everywhere?

@anfeng
Copy link
Contributor

anfeng commented Apr 27, 2016

@dejunzhang Can you check your ImageDataSource.scala file? Its L69 should be
sourceFilePath = FSUtils.localfsPrefix+f.getAbsolutePath()
In your use case, lmdb_path prefix should be file:, and thus addFile() should not be called.

@dejunzhang
Copy link
Author

@anfeng ,yes. in file ImageDataSource.scala, L69 is:
sourceFilePath = FSUtils.localfsPrefix+f.getAbsolutePath()
I will try @mriduljain 's suggestions. thank you.

@dejunzhang
Copy link
Author

@mriduljain ,i also try the following command:

spark-submit --master yarn
--num-executors 1
--driver-library-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--driver-class-path "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--conf spark.cores.max=5
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}"
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"
--py-files ${CAFFE_ON_SPARK}/caffe-grid/target/caffeonsparkpythonapi.zip
--files ${CAFFE_ON_SPARK}/data/caffe/_caffe.so,${CAFFE_ON_SPARK}/data/lenet_memory_solver.prototxt,${CAFFE_ON_SPARK}/data/lenet_memory_train_test.prototxt
--jars "${CAFFE_ON_SPARK}/caffe-grid/target/caffe-grid-0.1-SNAPSHOT-jar-with-dependencies.jar"
--conf spark.pythonargs="-conf ${CAFFE_ON_SPARK}/data/lenet_memory_solver.prototxt -model file:///tmp/lenet.model -features accuracy,ip1,ip2 -label label -output file:///tmp/output
-devices 1 -outputFormat json -clusterSize 1"
${CAFFE_ON_SPARK}/caffe-grid/src/main/python/examples/MultiClassLogisticRegression.py

And the similar error happend:

16/04/28 10:06:48 INFO cluster.YarnScheduler: Removed TaskSet 14.0, whose tasks have all completed, from pool
16/04/28 10:06:48 INFO caffe.CaffeOnSpark: Schema:StructType(StructField(SampleID,StringType,false), StructField(accuracy,ArrayType(FloatType,true),false), StructField(ip1,ArrayType(FloatType,true),false), StructField(ip2,ArrayType(FloatType,true),false), StructField(label,ArrayType(FloatType,true),false))
Traceback (most recent call last):
File "/home/atlas/work/caffe_spark/CaffeOnSpark-master/caffe-grid/src/main/python/examples/MultiClassLogisticRegression.py", line 36, in
extracted_df = cos.features(lr_raw_source)
File "/home/atlas/work/caffe_spark/CaffeOnSpark-master/caffe-grid/target/caffeonsparkpythonapi.zip/com/yahoo/ml/caffe/CaffeOnSpark.py", line 45, in features
File "/home/atlas/work/caffe_spark/CaffeOnSpark-master/caffe-grid/target/caffeonsparkpythonapi.zip/com/yahoo/ml/caffe/ConversionUtil.py", line 816, in call
File "/home/atlas/work/caffe_spark/CaffeOnSpark-master/caffe-grid/target/caffeonsparkpythonapi.zip/com/yahoo/ml/caffe/ConversionUtil.py", line 619, in callJavaMethod
File "/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call
File "/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/home/atlas/work/caffe_spark/3rdparty/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o864.features.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#30L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#58L])
+- InMemoryColumnarTableScan InMemoryRelation [SampleID#0,accuracy#1,ip1#2,ip2#3,label#4], true, 10000, StorageLevel(true, false, false, false, 1), ConvertToUnsafe, None

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1554)
at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1553)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1553)
at com.yahoo.ml.caffe.CaffeOnSpark.features(CaffeOnSpark.scala:267)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#58L])
+- InMemoryColumnarTableScan InMemoryRelation [SampleID#0,accuracy#1,ip1#2,ip2#3,label#4], true, 10000, StorageLevel(true, false, false, false, 1), ConvertToUnsafe, None

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 29 more

Caused by: org.apache.spark.SparkException: addFile does not support local directories when not running local mode.
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1368)
at com.yahoo.ml.caffe.LmdbRDD.getPartitions(LmdbRDD.scala:44)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)

At that moment, from the executor log, we can see that the model is trained successfully. it means that the local file can be accessed during training, but for feature extraction, it was not.

I0428 10:06:35.210611 3137 solver.cpp:253] Train net output #0: loss = 0.00997569 (* 1 = 0.00997569 loss)
I0428 10:06:35.210630 3137 sgd_solver.cpp:106] Iteration 9800, lr = 0.00599102
I0428 10:06:41.777729 3137 solver.cpp:237] Iteration 9900, loss = 0.0067299
I0428 10:06:41.777781 3137 solver.cpp:253] Train net output #0: loss = 0.0067299 (* 1 = 0.0067299 loss)
I0428 10:06:41.777799 3137 sgd_solver.cpp:106] Iteration 9900, lr = 0.00596843
16/04/28 10:06:48 INFO caffe.CaffeProcessor: Snapshot saving into files at iteration #10000
I0428 10:06:48.283884 3137 solver.cpp:459] Snapshotting to binary proto file mnist_lenet_iter_10000.caffemodel
I0428 10:06:48.288913 3137 sgd_solver.cpp:273] Snapshotting solver state to binary proto file mnist_lenet_iter_10000.solverstate
16/04/28 10:06:48 INFO caffe.FSUtils$: destination file:file:///tmp/mnist_lenet_iter_10000.caffemodel
16/04/28 10:06:48 INFO caffe.FSUtils$: /tmp/hadoop-atlas/nm-local-dir/usercache/atlas/appcache/application_1461720051154_0015/container_1461720051154_0015_01_000002/mnist_lenet_iter_10000.caffemodel-->/tmp/mnist_lenet_iter_10000.caffemodel
16/04/28 10:06:48 INFO caffe.FSUtils$: destination file:file:///tmp/mnist_lenet_iter_10000.solverstate
16/04/28 10:06:48 INFO caffe.FSUtils$: /tmp/hadoop-atlas/nm-local-dir/usercache/atlas/appcache/application_1461720051154_0015/container_1461720051154_0015_01_000002/mnist_lenet_iter_10000.solverstate-->/tmp/mnist_lenet_iter_10000.solverstate
16/04/28 10:06:48 INFO caffe.CaffeProcessor: Model saving into file at the end of training:file:///tmp/lenet.model
I0428 10:06:48.291647 3137 solver.cpp:459] Snapshotting to binary proto file mnist_lenet_iter_10000.caffemodel
I0428 10:06:48.296265 3137 sgd_solver.cpp:273] Snapshotting solver state to binary proto file mnist_lenet_iter_10000.solverstate
16/04/28 10:06:48 INFO caffe.FSUtils$: destination file:file:///tmp/lenet.model
16/04/28 10:06:48 INFO caffe.FSUtils$: /tmp/hadoop-atlas/nm-local-dir/usercache/atlas/appcache/application_1461720051154_0015/container_1461720051154_0015_01_000002/mnist_lenet_iter_10000.caffemodel-->/tmp/lenet.model
16/04/28 10:06:48 INFO executor.Executor: Finished task 0.0 in stage 12.0 (TID 12). 2122 bytes result sent to driver
16/04/28 10:06:48 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 13
16/04/28 10:06:48 INFO executor.Executor: Running task 0.0 in stage 13.0 (TID 13)
16/04/28 10:06:48 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 14
16/04/28 10:06:48 INFO storage.MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 1275.0 B, free 12.7

@dejunzhang
Copy link
Author

@mriduljain For hdfs, there is no error when extract features with below code:
#Extract features
lr_raw_source = DataSource(sc).getSource(cfg,False)
extracted_df = cos.features(lr_raw_source)
extracted_df.show(10)
and get :
+--------+--------------------+-----+
|SampleID| ip1|label|
+--------+--------------------+-----+
|00000000|[0.0, 0.0, 1.2782...|[7.0]|
|00000001|[0.0, 3.2785423, ...|[2.0]|
|00000002|[0.0, 0.0, 1.5023...|[1.0]|
|00000003|[0.0, 2.01363, 0....|[0.0]|
|00000004|[0.5984455, 2.751...|[4.0]|
|00000005|[0.0, 0.0, 2.0688...|[1.0]|
|00000006|[0.0, 2.0721931, ...|[4.0]|
|00000007|[0.0, 0.0, 0.3567...|[9.0]|
|00000008|[0.8896437, 0.478...|[5.0]|
|00000009|[0.0, 0.0, 0.0, 0...|[9.0]|
+--------+--------------------+-----+
only showing top 10 rows

@mriduljain
Copy link
Contributor

@dejunzhang I tried to reproduce your earlier problem (i.e local lmdbs) but couldn't :(. Will try again later

@dejunzhang
Copy link
Author

@mriduljain thanks a lot for your kindly help.:)

@dejunzhang
Copy link
Author

@mriduljain i'd like to ask another question: whether caffeonspark support other kind of sources in prototxt.
for example, the source for training data is a file list, like below:
source: "/home/atlas/work/caffe_spark/CaffeOnSpark-master/data/train.txt"

@anfeng
Copy link
Contributor

anfeng commented Apr 28, 2016

We have 3 built-in data sources: LMDB, ImageDataFrame and SeqImageDataSource.

SeqImageDataSource could be constructed from file list. Please checkout tools.Binary2Sequence.

You should be able to use those tools, or introduce new tools or data sources.

@dejunzhang
Copy link
Author

@anfeng ,i will have a try. thanks a lot.

@mollyStark
Copy link

@anfeng I ran into the same question that when executing "cos.features(data_source)", it failed with error message

Caused by: java.lang.UnsupportedOperationException: empty.reduceLeft
2017-04-05T09:24:14.899630607Z at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:180)
2017-04-05T09:24:14.899634837Z at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
2017-04-05T09:24:14.899639027Z at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:208)
2017-04-05T09:24:14.899643067Z at scala.collection.AbstractIterator.reduce(Iterator.scala:1336)
2017-04-05T09:24:14.899647047Z at com.yahoo.ml.caffe.CaffeOnSpark$$anonfun$18.apply(CaffeOnSpark.scala:492) 
...

and in my train_test.prototxt, I use

source: "hdfs://host/path/to/dataframe/"

I checked the dataframe on my local machine, it can go through and show feature result, but why did it get the error on spark cluster?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants