Skip to content

Conversation

@koeninger
Copy link
Contributor

…fails

@SparkQA
Copy link

SparkQA commented Aug 12, 2015

Test build #40652 has finished for PR 8133 at commit 406259d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Aug 13, 2015

LGTM. Merging this to master and 1.5. Thanks!

asfgit pushed a commit that referenced this pull request Aug 13, 2015
…fails

Author: cody koeninger <cody@koeninger.org>

Closes #8133 from koeninger/SPARK-9780 and squashes the following commits:

406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails

(cherry picked from commit 8ce6096)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 8ce6096 Aug 13, 2015
@shyamsalimkumar
Copy link

I get the same NPE issue in Spark 1.4.1 also. I'm not sure what's wrong (yet) as my spark streaming job was running fine printing out Array[Byte] data, but after I added a custom decoder (Avro to my class Swipe) it keeps spitting the NPE error (consumer is null). Any hints on what could be the root cause?

val swipes = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics) works, but
val swipes = KafkaUtils.createDirectStream[Array[Byte], Swipe, DefaultDecoder, SwipeDecoder](ssc, kafkaParams, topics) doesn't. In that aspect the NPE is sorta misleading (maybe).

CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
…fails

Author: cody koeninger <cody@koeninger.org>

Closes apache#8133 from koeninger/SPARK-9780 and squashes the following commits:

406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails
@koeninger
Copy link
Contributor Author

KafkaRDD attempts to instantiate the value decoder right before connecting
the consumer:

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[

VerifiableProperties])

  .newInstance(kc.config.props)

  .asInstanceOf[Decoder[V]]

val consumer = connectLeader

Something's wrong with your SwipeDecoder

On Mon, Aug 17, 2015 at 3:16 AM, Shyam S Kumar notifications@github.com
wrote:

I get the same NPE issue in Spark 1.4.1 also. I'm not sure what's wrong
(yet) as my spark streaming job was running fine printing out Array[Byte]
data, but after I added a custom decoder (Avro to my class Swipe) it
keeps spitting the NPE error (consumer is null). Any hints on what could
be the root cause?

val swipes = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics) works, but
val swipes = KafkaUtils.createDirectStream[Array[Byte], Swipe,
DefaultDecoder, SwipeDecoder](ssc, kafkaParams, topics) doesn't. In that
aspect the NPE is sorta misleading (maybe).


Reply to this email directly or view it on GitHub
#8133 (comment).

@shyamsalimkumar
Copy link

Yes... I know. 😄 I had forgotten to accept props: VerifiableProperties in it. Silly me. But I still think that the error thrown is misleading (unless of course you know the above).

@koeninger
Copy link
Contributor Author

This PR has already been merged to suppress the NPE when someone messes up
instantiation....
On Aug 18, 2015 1:13 AM, "Shyam S Kumar" notifications@github.com wrote:

Yes... I know. [image: 😄] I had forgotten to accept props:
VerifiableProperties in it. Silly me. But I still think that the error
thrown is misleading (unless of course you know the above).


Reply to this email directly or view it on GitHub
#8133 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants