-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27042][SS] Close cached Kafka producer in case of task retry #23956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #102982 has finished for PR 23956 at commit
|
| private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { | ||
| if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { | ||
| logDebug(s"Reattempt detected, invalidating cached producer for params $kafkaParams") | ||
| close(kafkaParams) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably fine; is there a way to close this earlier, when the task fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since any part of the code can throw exception which may or may not caught I thought this is the most safe solution. The other consideration was that the consumer part works similar way without problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi We cannot close a cached producer that can still be used by other tasks. A Kafka producer can be shared by all tasks that are using the same Kafka parameters. It is different than the consumer cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't the assumption that a bad producer will cause all those tasks to fail anyway? This would recover from that situation (and prevent the task retries from failing).
It may be that the task failed for other reasons and other tasks using the same producer would make progress, but that sounds both less likely and more complicated to handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin even if a bad producer could happen, this approach is still not correct. The new created producer can be closed by an attempt of a different task at once.
AFAIK, the current issue about the cached Kafka producer is https://issues.apache.org/jira/browse/SPARK-21869, which definitely can be solved in a smarter way.
By the way, I have never seen that anyone reported an issue about corrupt Kafka producers in Spark or Kafka community. @gaborgsomogyi do you have any ticket related to this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new created producer can be closed by an attempt of a different task at once.
Good point. Seems hard to solve without keeping more state about the producer... :-/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @zsxwing, and once Kafka producer is made to thread-safe, it should have self-heal mechanism in itself to prevent one broken request-response to break others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new created producer can be closed by an attempt of a different task at once.
Also think it's a good point. Some sort of if (!inUse) close() mechanism would be correct.
@zsxwing Just for the sake of my deeper understanding in which scenario can happen that a 2 tasks in the same executor are writing the same topicpartition?
@ScrapCodes are you proceeding with SPARK-21869? This PR needs the inUse flag what you've shown in #19096. Happy to help any way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborgsomogyi , I wanted to revive it soon. sorry for the delay. Now I am on it. I will need your help for sure, to discuss possible approaches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, ping me and coming...
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok except for a small test thing.
| CachedKafkaProducer.invokePrivate(getAsMap()) | ||
| } | ||
|
|
||
| private def getCacheMapItem(map: ConcurrentMap[Seq[(String, Object)], KP], offset: Int): KP = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... maps don't necessarily have deterministic iteration order, so this method only really makes sense if the map has a single item. Since you always call it with 0 as the offset anyway, it'd be better to just simplify it (map.values().iterator().next()).
Or maybe explicitly using map.get(kafkaParams) in the tests.
|
(I don't really have enough context to meaningfully review this.) |
|
I think this shouldn't be mixed up with the solution for SPARK-21869 and has to be kept separate "feature". On the other hand SPARK-21869 is definitely needed to proceed with this. |
|
Can one of the admins verify this patch? |
|
#25853 merged which allows to invalidate producers (and not close when in use) in case to task retry. I'm going to come up with a new PR soon... |
What changes were proposed in this pull request?
If a task is failing due to a corrupt cached
KafkaProducerand the task is retried in the same executor then the task is getting the sameKafkaProducerover and over again unless it's invalidated with the timeout configured byspark.kafka.producer.cache.timeoutwhich is not really probable. After several retries the query stops.In this PR I'm closing the old cached
KafkaProducerand reopen a new one. The functionality is similar to theKafkaConsumerside here.How was this patch tested?
Additional unit tests + on cluster.