Skip to content

Commit b7cd9e9

Browse files
tgravescspwendell
authored andcommitted
SPARK-1195: set map_input_file environment variable in PipedRDD
Hadoop uses the config mapreduce.map.input.file to indicate the input filename to the map when the input split is of type FileSplit. Some of the hadoop input and output formats set or use this config. This config can also be used by user code. PipedRDD runs an external process and the configs aren't available to that process. Hadoop Streaming does something very similar and the way they make configs available is exporting them into the environment replacing '.' with '_'. Spark should also export this variable when launching the pipe command so the user code has access to that config. Note that the config mapreduce.map.input.file is the new one, the old one which is deprecated but not yet removed is map.input.file. So we should handle both. Perhaps it would be better to abstract this out somehow so it goes into the HadoopParition code? Author: Thomas Graves <tgraves@apache.org> Closes #94 from tgravescs/map_input_file and squashes the following commits: cc97a6a [Thomas Graves] Update test to check for existence of command, add a getPipeEnvVars function to HadoopRDD e3401dc [Thomas Graves] Merge remote-tracking branch 'upstream/master' into map_input_file 2ba805e [Thomas Graves] set map_input_file environment variable in PipedRDD
1 parent dabeb6f commit b7cd9e9

File tree

3 files changed

+158
-53
lines changed

3 files changed

+158
-53
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.spark.rdd
1919

2020
import java.io.EOFException
21+
import scala.collection.immutable.Map
2122

2223
import org.apache.hadoop.conf.{Configurable, Configuration}
24+
import org.apache.hadoop.mapred.FileSplit
2325
import org.apache.hadoop.mapred.InputFormat
2426
import org.apache.hadoop.mapred.InputSplit
2527
import org.apache.hadoop.mapred.JobConf
@@ -43,6 +45,23 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
4345
override def hashCode(): Int = 41 * (41 + rddId) + idx
4446

4547
override val index: Int = idx
48+
49+
/**
50+
* Get any environment variables that should be added to the users environment when running pipes
51+
* @return a Map with the environment variables and corresponding values, it could be empty
52+
*/
53+
def getPipeEnvVars(): Map[String, String] = {
54+
val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
55+
val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
56+
// map_input_file is deprecated in favor of mapreduce_map_input_file but set both
57+
// since its not removed yet
58+
Map("map_input_file" -> is.getPath().toString(),
59+
"mapreduce_map_input_file" -> is.getPath().toString())
60+
} else {
61+
Map()
62+
}
63+
envVars
64+
}
4665
}
4766

4867
/**

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.reflect.ClassTag
2828

2929
import org.apache.spark.{Partition, SparkEnv, TaskContext}
3030

31+
3132
/**
3233
* An RDD that pipes the contents of each parent partition through an external command
3334
* (printing them one per line) and returns the output as a collection of strings.
@@ -59,6 +60,13 @@ class PipedRDD[T: ClassTag](
5960
val currentEnvVars = pb.environment()
6061
envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }
6162

63+
// for compatibility with Hadoop which sets these env variables
64+
// so the user code can access the input filename
65+
if (split.isInstanceOf[HadoopPartition]) {
66+
val hadoopSplit = split.asInstanceOf[HadoopPartition]
67+
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
68+
}
69+
6270
val proc = pb.start()
6371
val env = SparkEnv.get
6472

core/src/test/scala/org/apache/spark/PipedRDDSuite.scala

Lines changed: 131 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,74 +19,152 @@ package org.apache.spark
1919

2020
import org.scalatest.FunSuite
2121

22+
23+
import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
24+
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
25+
import org.apache.hadoop.fs.Path
26+
27+
import scala.collection.Map
28+
import scala.sys.process._
29+
import scala.util.Try
30+
import org.apache.hadoop.io.{Text, LongWritable}
31+
2232
class PipedRDDSuite extends FunSuite with SharedSparkContext {
2333

2434
test("basic pipe") {
25-
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
35+
if (testCommandAvailable("cat")) {
36+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
2637

27-
val piped = nums.pipe(Seq("cat"))
38+
val piped = nums.pipe(Seq("cat"))
2839

29-
val c = piped.collect()
30-
assert(c.size === 4)
31-
assert(c(0) === "1")
32-
assert(c(1) === "2")
33-
assert(c(2) === "3")
34-
assert(c(3) === "4")
40+
val c = piped.collect()
41+
assert(c.size === 4)
42+
assert(c(0) === "1")
43+
assert(c(1) === "2")
44+
assert(c(2) === "3")
45+
assert(c(3) === "4")
46+
} else {
47+
assert(true)
48+
}
3549
}
3650

3751
test("advanced pipe") {
38-
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
39-
val bl = sc.broadcast(List("0"))
40-
41-
val piped = nums.pipe(Seq("cat"),
42-
Map[String, String](),
43-
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
44-
(i:Int, f: String=> Unit) => f(i + "_"))
45-
46-
val c = piped.collect()
47-
48-
assert(c.size === 8)
49-
assert(c(0) === "0")
50-
assert(c(1) === "\u0001")
51-
assert(c(2) === "1_")
52-
assert(c(3) === "2_")
53-
assert(c(4) === "0")
54-
assert(c(5) === "\u0001")
55-
assert(c(6) === "3_")
56-
assert(c(7) === "4_")
57-
58-
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
59-
val d = nums1.groupBy(str=>str.split("\t")(0)).
60-
pipe(Seq("cat"),
61-
Map[String, String](),
62-
(f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
63-
(i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
64-
assert(d.size === 8)
65-
assert(d(0) === "0")
66-
assert(d(1) === "\u0001")
67-
assert(d(2) === "b\t2_")
68-
assert(d(3) === "b\t4_")
69-
assert(d(4) === "0")
70-
assert(d(5) === "\u0001")
71-
assert(d(6) === "a\t1_")
72-
assert(d(7) === "a\t3_")
52+
if (testCommandAvailable("cat")) {
53+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
54+
val bl = sc.broadcast(List("0"))
55+
56+
val piped = nums.pipe(Seq("cat"),
57+
Map[String, String](),
58+
(f: String => Unit) => {
59+
bl.value.map(f(_)); f("\u0001")
60+
},
61+
(i: Int, f: String => Unit) => f(i + "_"))
62+
63+
val c = piped.collect()
64+
65+
assert(c.size === 8)
66+
assert(c(0) === "0")
67+
assert(c(1) === "\u0001")
68+
assert(c(2) === "1_")
69+
assert(c(3) === "2_")
70+
assert(c(4) === "0")
71+
assert(c(5) === "\u0001")
72+
assert(c(6) === "3_")
73+
assert(c(7) === "4_")
74+
75+
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
76+
val d = nums1.groupBy(str => str.split("\t")(0)).
77+
pipe(Seq("cat"),
78+
Map[String, String](),
79+
(f: String => Unit) => {
80+
bl.value.map(f(_)); f("\u0001")
81+
},
82+
(i: Tuple2[String, Seq[String]], f: String => Unit) => {
83+
for (e <- i._2) {
84+
f(e + "_")
85+
}
86+
}).collect()
87+
assert(d.size === 8)
88+
assert(d(0) === "0")
89+
assert(d(1) === "\u0001")
90+
assert(d(2) === "b\t2_")
91+
assert(d(3) === "b\t4_")
92+
assert(d(4) === "0")
93+
assert(d(5) === "\u0001")
94+
assert(d(6) === "a\t1_")
95+
assert(d(7) === "a\t3_")
96+
} else {
97+
assert(true)
98+
}
7399
}
74100

75101
test("pipe with env variable") {
76-
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
77-
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
78-
val c = piped.collect()
79-
assert(c.size === 2)
80-
assert(c(0) === "LALALA")
81-
assert(c(1) === "LALALA")
102+
if (testCommandAvailable("printenv")) {
103+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
104+
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
105+
val c = piped.collect()
106+
assert(c.size === 2)
107+
assert(c(0) === "LALALA")
108+
assert(c(1) === "LALALA")
109+
} else {
110+
assert(true)
111+
}
82112
}
83113

84114
test("pipe with non-zero exit status") {
85-
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
86-
val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
87-
intercept[SparkException] {
88-
piped.collect()
115+
if (testCommandAvailable("cat")) {
116+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
117+
val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
118+
intercept[SparkException] {
119+
piped.collect()
120+
}
121+
} else {
122+
assert(true)
89123
}
90124
}
91125

126+
test("test pipe exports map_input_file") {
127+
testExportInputFile("map_input_file")
128+
}
129+
130+
test("test pipe exports mapreduce_map_input_file") {
131+
testExportInputFile("mapreduce_map_input_file")
132+
}
133+
134+
def testCommandAvailable(command: String): Boolean = {
135+
Try(Process(command) !!).isSuccess
136+
}
137+
138+
def testExportInputFile(varName: String) {
139+
if (testCommandAvailable("printenv")) {
140+
val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
141+
classOf[Text], 2) {
142+
override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
143+
144+
override val getDependencies = List[Dependency[_]]()
145+
146+
override def compute(theSplit: Partition, context: TaskContext) = {
147+
new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
148+
new Text("b"))))
149+
}
150+
}
151+
val hadoopPart1 = generateFakeHadoopPartition()
152+
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
153+
val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
154+
taskMetrics = null)
155+
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
156+
val arr = rddIter.toArray
157+
assert(arr(0) == "/some/path")
158+
} else {
159+
// printenv isn't available so just pass the test
160+
assert(true)
161+
}
162+
}
163+
164+
def generateFakeHadoopPartition(): HadoopPartition = {
165+
val split = new FileSplit(new Path("/some/path"), 0, 1,
166+
Array[String]("loc1", "loc2", "loc3", "loc4", "loc5"))
167+
new HadoopPartition(sc.newRddId(), 1, split)
168+
}
169+
92170
}

0 commit comments

Comments
 (0)