Skip to content

Conversation

@srdo
Copy link
Contributor

@srdo srdo commented Aug 8, 2017

See https://issues.apache.org/jira/browse/STORM-2675

This builds on #2268, so please ignore the first commit.

Trident uses json-simple under the hood to persist some objects to Zookeeper. This isn't mentioned on the API docs (or I missed it), so the current storm-kafka-client implementation returns a bunch of objects json-simple can't figure out how to serialize. The result is that json-simple writes the toString of the objects to Zookeeper, which can't be read back out. This causes the Trident Kafka spout to start over every time it's rebooted.

TransactionalState, which is used by Trident to read/write to/from Zookeeper, uses JSONValue.parse to read. That function fails quietly by returning null when there's a parsing error. There's a note in the code that we deliberately don't use the version of the parse function that throws exception on error, but we should at least log when it happens, since it's likely to be due to a bug in the spout or coordinator.

This PR makes the following changes:

  • Manually serialize meta objects to List or Map so json-simple can handle them.
  • Print a warning log when TransactionalState fails to deserialize something from Zookeeper.
  • Fix the Trident Kafka spout's logic around first polls. It was referring to the committed offset of the KafkaConsumer, which is never updated because we don't commit to Kafka. The offsets are instead stored in Zookeeper via the meta object returned by Emitter.emitPartitionBatch.
  • Fix the generic type of Coordinator on OpaquePartitionedTridentSpout. It should use the same type as the first parameter for Emitter, since OpaquePartitionedSpoutExecutor will call getOrderedPartitions and getPartitionsForTask on the emitter with the object returned by Coordinator.getPartitionsForBatch. I don't think this is a breaking change, since this was a de facto constraint anyway.
  • Clarify in a comment on Subscription that Trident expects partitions to remain assigned to specific tasks. As far as I can tell there's a potential issue with the batch metadata cache kept by OpaqueTridentSpoutExecutor getting outdated if partitions are shuffled without workers dying.

If anyone has suggestions for tests of this, I'm happy to add some. I'm wondering why we don't use Kryo for serialization to Zookeeper, since the json-simple library is so inflexible (it can only handle some collections and primitive wrappers).

@srdo
Copy link
Contributor Author

srdo commented Aug 8, 2017

Also stumbled on an issue with the way the coordinator and spout communicate, which I've put here https://issues.apache.org/jira/browse/STORM-2691.

@srdo srdo force-pushed the STORM-2675 branch 2 times, most recently from f04bac6 to ef4fde4 Compare August 9, 2017 13:32
@hmcl
Copy link
Contributor

hmcl commented Aug 10, 2017

@srdo shouldn't this PR come in before this one?

@srdo
Copy link
Contributor Author

srdo commented Aug 10, 2017

@hmcl See my comment on that PR, I think the fixes are not related, but if we make the suggested changes we'll probably need to update this too. The changes should solve STORM-2691 though.

Copy link
Contributor

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo the changes look fine to me, but I think it would be cleaner if you could rebase this before I give a final +1.

<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.version}</version>
<scope>${provided.scope}</scope>
<scope>${kafka.dependency.scope}</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just make this compile like the others? You can still override it in your jars if you want to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, since storm-kafka-client-examples doesn't depend on storm-kafka-examples anymore this isn't important. I'll change it to compile in storm-kafka-client-examples too.

Copy link
Contributor

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still +1

@asfgit asfgit merged commit 54a829c into apache:master Sep 8, 2017
asfgit pushed a commit that referenced this pull request Sep 8, 2017
…-2675

STORM-2675: Fix storm-kafka-client Trident spout failing to serialize
meta objects to Zookeeper

This closes #2271
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants