Kafka-4351: Topic regex behavioral change with MirrorMaker new consumer#2072
Kafka-4351: Topic regex behavioral change with MirrorMaker new consumer#2072huxihx wants to merge 21 commits intoapache:trunkfrom huxihx:kafka-4351_Regex_behavior_change_for_new_consumer
Conversation
…nsumer Author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does.
|
@joestein please help review this. Thanks. |
|
Thanks for the PR. A couple of things:
|
|
@ijuma, totally agreed on that we need to test the change. As for the doc update thing, I think we might not need to do this since right now the document already claims:
And we know this is exactly what this fix is about. Without the change, only old consumer supports CSV-typed regular expression because of the fact that TopicFilter will convert them internally. But the new consumer is not able to do this. Does it make sense? |
|
That's a fair point, if we already document it like that, then it's just a bug fix. Let's add a note to the upgrade notes still though ( |
|
@ijuma Please guide what I should do next. I am a freshman to the process. Thanks. |
| .replace(" ", "") | ||
| .replaceAll("""^["']+""","") | ||
| .replaceAll("""["']+$""","") // property files may bring quotes | ||
| consumer.subscribe(Pattern.compile(revisedWhitelist), consumerRebalanceListener) |
There was a problem hiding this comment.
What if we use something like this
val whitelist = Whitelist(whitelistOpt.get)
consumer.subscribe(Pattern.compile(whitelist.regex), consumerRebalanceListener)
so that regex manipulation is hardcoded in one place?
There was a problem hiding this comment.
@vahidhashemian Agreed on your suggestion. And if we employs Whitelist here, we need to additionally do a minor change for the exception capturing. Here is the analysis:
Before initializing Whitelist, the construction method for TopicFilter will be executed, which invokes Pattern.compile(regex). If an 'invalid' regex is offered, TopicFilter captures it and throws RuntimeException(not PatternSyntaxException), which means, later, the init method for MirrorMakerNewConsumer fails to capture this exception since it stares at PatternSyntaxException (although the impact is ignorable since the enclosing run method captures throwable)
Does it make sense? If you agree with the analysis above, I will make another commit following your suggestion and have this PR ready to be reviewed again. Is it okay to you?
There was a problem hiding this comment.
Thanks for explaining. It makes sense to me.
On a separate note, do you think it makes sense to add some unit test for this change?
There was a problem hiding this comment.
@vahidhashemian Testing is definitely important but as for this case, I am afriad what we need to test is whether Whitelist behaves as expected, which has already been tested in TopicFilterTest. Besides, I made a new commit following your suggestion, please have a review. Thanks.
There was a problem hiding this comment.
Looks like you committed more than needed in the last try?
On the unit test note, I'm wondering if it makes sense to add (at least) one unit test that verifies the issue raised in the JIRA would not occur with this patch?
There was a problem hiding this comment.
@vahidhashemian already committed a new one. And as for the unit test thing, which file do you think should be better for the test case to be added in? TopicFilterTest or MirrorMakerTest?
There was a problem hiding this comment.
For the unit test I'd add it to MirrorMakerTest.
…ttps://github.com/amethystic/kafka into kafka-4351_Regex_behavior_change_for_new_consumer
author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does.
Author: huxi_2b@hotmail.com Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does.
…ehavior_change_for_new_consumer
…ehavior_change_for_new_consumer
…ttps://github.com/amethystic/kafka into kafka-4351_Regex_behavior_change_for_new_consumer
|
@vahidhashemian @ijuma any feedback for this PR? Thanks. |
|
@joestein Could you help review this PR? Thanks. |
|
@amethystic The change you made to fix the issue looks good to me. But @ijuma's concerns above should still be addressed in my opinion. |
|
@vahidhashemian Thanks for your feedback. As for @ijuma 's suggestion, seems that he mentions of upgrade.html, so do you know how to add a update not for this PR? How should I do then ? |
|
|
|
@ijuma Thanks. So can this PR be closed now? |
|
@amethystic no, as it has not been merged yet. To clarify: before we merge it, we need:
|
…ehavior_change_for_new_consumer
…pression is supported for the --whitelist option of the new consumer in MirrorMaker'
|
@ijuma Thanks for the advice. Already updated the upgrade note. Besides, I did a test in my local environment, which was passed. Both single topic and regular expressions are supported now for the new consumer of MirrorMaker. Do you think we need to add some unit test cases although it is not an easy work to do this? |
…ehavior_change_for_new_consumer
|
@ijuma could you please address the comments above? Thanks. |
|
@amethystic, yes it would be good to add at least one test. |
|
@ijuma Sorry, but do you mean the unit test? |
…ehavior_change_for_new_consumer
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
ijuma
left a comment
There was a problem hiding this comment.
Sorry for the delay. Thanks for adding the test, very useful. I left a few suggestions and questions.
| import org.junit.Test | ||
| import org.junit.{After, Before, Test} | ||
|
|
||
| class MirrorMakerTest extends ZooKeeperTestHarness { |
There was a problem hiding this comment.
I suggest creating a new test under integration/kafka/tools/MirrorMaker.scala instead of reusing the existing unit test to avoid making the existing unit tests slower due to initialisation that they don't need.
Also, it seems like it may be better to inherit from KafkaServerTestHarness than ZooKeeperTestHarness?
| val props = new Properties() | ||
| props.put("bootstrap.servers", brokerList) | ||
| props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") |
There was a problem hiding this comment.
Instead of setting the serializers like this, you can just pass them to the KafkaProducer constructor.
| props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) | ||
| producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) | ||
| producer1.flush() // Explicitly invoke flush method to make effect immediately |
There was a problem hiding this comment.
close should have the same effect as flush. Does the test fail if you remove flush?
| props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) | ||
| producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) |
There was a problem hiding this comment.
You should probably call get on the returned Future to ensure that we get a useful error if there is a failure. Also, you can omit [Array[Byte], Array[Byte]] in both cases above.
| val producer1 = new KafkaProducer[Array[Byte], Array[Byte]](props) | ||
| producer1.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, msg.getBytes())) | ||
| producer1.flush() // Explicitly invoke flush method to make effect immediately | ||
| producer1.close() // Close the producer |
|
|
||
| // Create a MirrorMaker consumer | ||
| val config = new util.HashMap[String, AnyRef] | ||
| config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-gropu") |
| config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) | ||
| config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | ||
| config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") | ||
| config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") |
There was a problem hiding this comment.
Again, you can pass the deserializers via the constructor.
| val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config) | ||
| MirrorMaker.createMirrorMakerProducer(brokerList) | ||
|
|
||
| val whitelist = Some("new.*|another_topic|foo" ) // Test a regular expression |
There was a problem hiding this comment.
I thought we wanted to test multiple regular expressions separated by commas?
There was a problem hiding this comment.
I replaced all "|" with commas. Is that what you expect?
There was a problem hiding this comment.
Yeah, that should fail without your fix, I think. If we wanted to make the test even better, we'd produce to another_topic and yet_another_topic and check that the consumer only consumes from the two that match.
But the test as it is verifies the fix, so I won't block the merge if you don't feel like extending the test. :)
| } | ||
|
|
||
| // Only for testing | ||
| private[kafka] def createMirrorMakerProducer(brokerList: String): Unit = { |
There was a problem hiding this comment.
Why is this method here? Can it not be in the test?
| @@ -561,9 +578,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
| val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) | |||
| if (whitelistOpt.isDefined) { | |||
There was a problem hiding this comment.
Instead of whitelistOpt.isDefined, we can write it as:
whitelistOpt.foreach { whitelist =>
try {
consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener)
...
}
}There was a problem hiding this comment.
What are your thoughts on this comment?
…ehavior_change_for_new_consumer
1. Move unit test case under integration/kafka/tools 2. Remove test code in MirrorMaker
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@ijuma already addressed most of your comments. Please have a review. Thanks |
ijuma
left a comment
There was a problem hiding this comment.
Thanks, I think we're close. Left a few more comments.
|
|
||
| private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], | ||
| // Only for testing | ||
| private[kafka] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]], |
| } | ||
|
|
||
| private class MirrorMakerProducer(val producerProps: Properties) { | ||
| private[kafka] class MirrorMakerProducer(val producerProps: Properties) { |
| @After | ||
| override def tearDown() { | ||
| super.tearDown() | ||
| } |
There was a problem hiding this comment.
We don't need tearDown and setUp defined here if they just call super.
| mirrorMakerConsumer.init() | ||
| try { | ||
| val data = mirrorMakerConsumer.receive() | ||
| println(new String(data.value)) |
| try { | ||
| val data = mirrorMakerConsumer.receive() | ||
| println(new String(data.value)) | ||
| assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic.equals(topic)) |
There was a problem hiding this comment.
In Scala, data.topic == topic is more idiomatic. Same for the next line.
| assertTrue(s"MirrorMaker consumer should get the correct topic: $topic", data.topic.equals(topic)) | ||
| assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value).equals(msg)) | ||
| } catch { | ||
| case e: RuntimeException => |
There was a problem hiding this comment.
No need to catch and rethrow, just let the original exception bubble up.
| // Create a test producer to delivery a message | ||
| val producerProps = new Properties() | ||
| producerProps.put("bootstrap.servers", brokerList) | ||
| producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") |
There was a problem hiding this comment.
You can set the class itself instead of the name, I think. e.g. classOf[ByteArraySerializer]. Same in the line below.
| @@ -561,9 +578,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { | |||
| val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener) | |||
| if (whitelistOpt.isDefined) { | |||
There was a problem hiding this comment.
What are your thoughts on this comment?
| import org.junit.Assert.assertTrue | ||
| import org.junit.{After, Before, Test} | ||
|
|
||
| class TestMirrorMaker extends KafkaServerTestHarness { |
There was a problem hiding this comment.
I thought MirrrorMakerTest was fine, why did you rename it?
There was a problem hiding this comment.
The compiler will complain "MirrorMakerTest is already defined as class MirrorMakerTest". That's why I changed the name. Do you think there is another way I could avoid this error?
There was a problem hiding this comment.
Ah, that makes sense because they both have the same package (it's just the source folder that differs). In that case, I suggest MirrorMakerIntegrationTest for this one so the difference is obvious.
| producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| val producer = new MirrorMakerProducer(producerProps) | ||
| MirrorMaker.producer = producer | ||
| MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes())) |
There was a problem hiding this comment.
As I said before, it would be good to call get on the result of send to ensure exceptions get bubbled up.
There was a problem hiding this comment.
The producer here is not of type 'KafkaProducer' but 'MirrorMakerProducer'. It's send method is already KafkaProducer.send().get() by default if not explicitly specifying 'async'. Does it make sense?
There was a problem hiding this comment.
Hmm, the default seems to be async?
val sync = producerProps.getProperty("producer.type", "async").equals("sync")There was a problem hiding this comment.
Sorry, my mistake. Do you think it's okay to add props.put("producer.type", "sync") before MirrorMakerProducer?
…ttps://github.com/amethystic/kafka into kafka-4351_Regex_behavior_change_for_new_consumer
1. narrow down the visibility for some method and class 2. modify test case
…ehavior_change_for_new_consumer
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@ijuma Thanks for your great patience. Please review it again after addressing your comments. Thanks. |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the updates, a few more comments.
| assertTrue("MirrorMaker consumer should read the correct message.", new String(data.value) == msg) | ||
| return | ||
| } catch { | ||
| case _: ConsumerTimeoutException => // swallow it |
There was a problem hiding this comment.
Hmm, this doesn't seem right. Why do we swallow this exception?
There was a problem hiding this comment.
`val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelist)
mirrorMakerConsumer.init()
try {
TestUtils.waitUntilTrue(() => {
try {
val data = mirrorMakerConsumer.receive()
data.topic == topic && new String(data.value) == msg
} catch {
case _: ConsumerTimeoutException => false
}
}, "MirrorMaker consumer should be able to subscribe the correct topic and read the correct message.")
} finally {
consumer.close()
}`
Is that okay to you then? Thanks.
| mirrorMakerConsumer.init() | ||
| try { | ||
| val maxTryCount = 3 // it might need to call multiple poll calls to retrieve the message | ||
| for (_ <- 0 until maxTryCount) { |
There was a problem hiding this comment.
We should use TestUtils.waitUntilTrue instead of this.
| MirrorMaker.producer.close() | ||
|
|
||
| servers foreach { server => | ||
| println(server.zkUtils.getAllTopics().mkString(",")) |
There was a problem hiding this comment.
We shouldn't have this. Unintentional?
There was a problem hiding this comment.
I always forget to remove them before committing. Sorry, my mistake.
| producerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) | ||
| producerProps.put("producer.type", "sync") | ||
| producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") | ||
| producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") |
There was a problem hiding this comment.
Did it not work to use classOf[ByteArraySerializer] instead of hardcoding the String?
There was a problem hiding this comment.
Seems MirrorMakerProducer is not a parameterized class which cannot use type parameters during construction.
There was a problem hiding this comment.
That's not what I was suggesting. That's OK, I'll do it before merging.
fix MirrorMakerIntegrationTest.testRegularExpressionTopic code as per Ijuma's comments.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Thanks for the updates, LGTM with a few minor changes that I will do before merging to trunk: moved upgrade notes to the right place and reworded them a little, added license comment to test, minor style improvements. |
|
Thanks for your great patience. I do learn a lot from this whole review cycle. |
…ted regex This makes it consistent with MirrorMaker with the old consumer. Author: huxi <huxi@zhenrongbao.com> Author: amethystic <huxi_2b@hotmail.com> Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk> Closes apache#2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer
Add anomaly detect-to-fix-complete-timer metrics which measures the time from anomaly being detected to self-healing completed.
Support CSV values in the regexp for MirrorMaker new consumer as OldConsumer does.