diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8dcb9645cdc6..698336cfce18 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -610,7 +610,10 @@ def __init__(self, prev, func): self.is_checkpointed = False self._jdstream_val = None - if (isinstance(prev, TransformedDStream) and + # Using type() to avoid folding the functions and compacting the DStreams which is not + # not strictly a object of TransformedDStream. + # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges(). + if (type(prev) is TransformedDStream and not prev.is_cached and not prev.is_checkpointed): prev_func = prev.func self.func = lambda t, rdd: func(t, prev_func(t, rdd)) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 6108c845c1ef..214d5be43900 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -850,7 +850,9 @@ def transformWithOffsetRanges(rdd): offsetRanges.append(o) return rdd - stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count()) + # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together, + # only the TransformedDstreams can be folded together. + stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint() self.ssc.start() self.wait_for(offsetRanges, 1)