Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging

private[kafka010] object CachedKafkaProducer extends Logging {
Expand Down Expand Up @@ -79,6 +79,10 @@ private[kafka010] object CachedKafkaProducer extends Logging {
* one instance per specified kafkaParams.
*/
private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
logDebug(s"Reattempt detected, invalidating cached producer for params $kafkaParams")
close(kafkaParams)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fine; is there a way to close this earlier, when the task fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since any part of the code can throw exception which may or may not caught I thought this is the most safe solution. The other consideration was that the consumer part works similar way without problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi We cannot close a cached producer that can still be used by other tasks. A Kafka producer can be shared by all tasks that are using the same Kafka parameters. It is different than the consumer cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't the assumption that a bad producer will cause all those tasks to fail anyway? This would recover from that situation (and prevent the task retries from failing).

It may be that the task failed for other reasons and other tasks using the same producer would make progress, but that sounds both less likely and more complicated to handle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin even if a bad producer could happen, this approach is still not correct. The new created producer can be closed by an attempt of a different task at once.

AFAIK, the current issue about the cached Kafka producer is https://issues.apache.org/jira/browse/SPARK-21869, which definitely can be solved in a smarter way.

By the way, I have never seen that anyone reported an issue about corrupt Kafka producers in Spark or Kafka community. @gaborgsomogyi do you have any ticket related to this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new created producer can be closed by an attempt of a different task at once.

Good point. Seems hard to solve without keeping more state about the producer... :-/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @zsxwing, and once Kafka producer is made to thread-safe, it should have self-heal mechanism in itself to prevent one broken request-response to break others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new created producer can be closed by an attempt of a different task at once.

Also think it's a good point. Some sort of if (!inUse) close() mechanism would be correct.

@zsxwing Just for the sake of my deeper understanding in which scenario can happen that a 2 tasks in the same executor are writing the same topicpartition?

@ScrapCodes are you proceeding with SPARK-21869? This PR needs the inUse flag what you've shown in #19096. Happy to help any way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi , I wanted to revive it soon. sorry for the delay. Now I am on it. I will need your help for sure, to discuss possible approaches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, ping me and coming...

}
val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
try {
guavaCache.get(paramsSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.ConcurrentMap

import scala.collection.JavaConverters._

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.PrivateMethodTester

import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.sql.test.SharedSQLContext

class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest {
Expand All @@ -35,43 +38,68 @@ class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester
CachedKafkaProducer.clear()
}

test("Should not throw exception on calling close with non-existing key.") {
val kafkaParams = getKafkaParams()
CachedKafkaProducer.close(kafkaParams)
assert(getCacheMap().size === 0)
}

test("Should return the cached instance on calling getOrCreate with same params.") {
val kafkaParams = new ju.HashMap[String, Object]()
kafkaParams.put("acks", "0")
// Here only host should be resolvable, it does not need a running instance of kafka server.
kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
val kafkaParams = getKafkaParams()
val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
assert(producer == producer2)

val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
val map = CachedKafkaProducer.invokePrivate(cacheMap())
assert(map.size == 1)
assert(producer === producer2)
val cacheMap = getCacheMap()
assert(cacheMap.size === 1)
}

test("Should close the correct kafka producer for the given kafkaPrams.") {
val kafkaParams = new ju.HashMap[String, Object]()
kafkaParams.put("acks", "0")
kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
val kafkaParams = getKafkaParams()
val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
kafkaParams.put("acks", "1")
val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
// With updated conf, a new producer instance should be created.
assert(producer != producer2)

val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
val map = CachedKafkaProducer.invokePrivate(cacheMap())
assert(map.size == 2)
val cacheMap = getCacheMap()
assert(cacheMap.size === 2)

CachedKafkaProducer.close(kafkaParams)
val map2 = CachedKafkaProducer.invokePrivate(cacheMap())
assert(map2.size == 1)
import scala.collection.JavaConverters._
val (seq: Seq[(String, Object)], _producer: KP) = map2.asScala.toArray.apply(0)
assert(_producer == producer)
val cacheMap2 = getCacheMap()
assert(cacheMap2.size === 1)
assert(getCacheMapItem(cacheMap2, 0) === producer)
}

test("Should return new instance on calling getOrCreate with same params but task retry.") {
val kafkaParams = getKafkaParams()
val taskContext = new TaskContextImpl(0, 0, 0, 0, attemptNumber = 0, null, null, null)
TaskContext.setTaskContext(taskContext)
val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
val retryTaskContext = new TaskContextImpl(0, 0, 0, 0, attemptNumber = 1, null, null, null)
TaskContext.setTaskContext(retryTaskContext)
val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
assert(producer != producer2)
val cacheMap = getCacheMap()
assert(cacheMap.size === 1)
assert(getCacheMapItem(cacheMap, 0) === producer2)
}

private def getKafkaParams(): ju.HashMap[String, Object] = {
val kafkaParams = new ju.HashMap[String, Object]()
kafkaParams.put("acks", "0")
// Here only host should be resolvable, it does not need a running instance of kafka server.
kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
kafkaParams
}

private def getCacheMap(): ConcurrentMap[Seq[(String, Object)], KP] = {
val getAsMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
CachedKafkaProducer.invokePrivate(getAsMap())
}

private def getCacheMapItem(map: ConcurrentMap[Seq[(String, Object)], KP], offset: Int): KP = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... maps don't necessarily have deterministic iteration order, so this method only really makes sense if the map has a single item. Since you always call it with 0 as the offset anyway, it'd be better to just simplify it (map.values().iterator().next()).

Or maybe explicitly using map.get(kafkaParams) in the tests.

val (_: Seq[(String, Object)], _producer: KP) = map.asScala.toArray.apply(0)
_producer
}
}