Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
Expand Down Expand Up @@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.map(v => (Some(v), None))
case (Seq(), ws) => ws.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
}
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
Expand Down Expand Up @@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
rightOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)

assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

assert(grouped2.map(_ => 1).partitioner === None)
Expand All @@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("fullOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.fullOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (Some(1), Some('x'))),
(1, (Some(2), Some('x'))),
(2, (Some(1), Some('y'))),
(2, (Some(1), Some('z'))),
(3, (Some(1), None)),
(4, (None, Some('w')))
))
}

test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ for details.
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ def dispatch(seq):
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_full_outer_join(rdd, other, numPartitions):
def dispatch(seq):
vbuf, wbuf = [], []
for (n, v) in seq:
if n == 1:
vbuf.append(v)
elif n == 2:
wbuf.append(v)
if not vbuf:
vbuf.append(None)
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return _do_python_join(rdd, other, numPartitions, dispatch)


def python_cogroup(rdds, numPartitions):
def make_mapper(i):
return lambda (k, v): (k, (i, v))
Expand Down
25 changes: 23 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, CompressedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
python_right_outer_join, python_full_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
Expand Down Expand Up @@ -1402,7 +1402,7 @@ def leftOuterJoin(self, other, numPartitions=None):

For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
(k, (v, None)) if no elements in other have key k.
(k, (v, None)) if no elements in C{other} have key k.

Hash-partitions the resulting RDD into the given number of partitions.

Expand Down Expand Up @@ -1430,6 +1430,27 @@ def rightOuterJoin(self, other, numPartitions=None):
"""
return python_right_outer_join(self, other, numPartitions)

def fullOuterJoin(self, other, numPartitions=None):
"""
Perform a right outer join of C{self} and C{other}.

For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
(k, (v, None)) if no elements in C{other} have key k.

Similarly, for each element (k, w) in C{other}, the resulting RDD will
either contain all pairs (k, (v, w)) for v in C{self}, or the pair
(k, (None, w)) if no elements in C{self} have key k.

Hash-partitions the resulting RDD into the given number of partitions.

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("c", 8)])
>>> sorted(x.fullOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
"""
return python_full_outer_join(self, other, numPartitions)

# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}

/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
Expand All @@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
Expand Down Expand Up @@ -658,6 +658,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W](other: JavaPairDStream[K, W])
: JavaPairDStream[K, (Optional[V], Optional[W])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be on the next line. It fits in the previous line I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's slightly over the limit at 103 characters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaargh. My visual estimation failed. :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries :)

implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.fullOuterJoin(other.dstream)
joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
}
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W](
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], Optional[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
}
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
)
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner())
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
* partitions.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
numPartitions: Int
): DStream[(K, (Option[V], Option[W]))] = {
fullOuterJoin[W](other, defaultPartitioner(numPartitions))
}

/**
* Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def fullOuterJoin[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], Option[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
)
}

/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval
* is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("fullOuterJoin") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, Some("x"))) ),
Seq( ("", (Some(1), None)) ),
Seq( ("", (None, Some("x"))) )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}

test("updateStateByKey") {
val inputData =
Seq(
Expand Down