From 5f2660e4facaa6dd1fbc69be3b1c86c8ec2d8bbc Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Thu, 15 Dec 2022 01:59:00 +0800 Subject: [PATCH] feat: hang on non-existed topics --- .../sql/pulsar/PulsarMetadataReader.scala | 28 +++++++++++++++++++ .../spark/sql/pulsar/PulsarOptions.scala | 1 + 2 files changed, 29 insertions(+) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala index 33bbb822..30f5b23c 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala @@ -19,11 +19,17 @@ import java.util.Optional import java.util.concurrent.TimeUnit import java.util.regex.Pattern +import scala.annotation.tailrec +import scala.collection.mutable +import scala.language.postfixOps + import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException} +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient} import org.apache.pulsar.client.impl.schema.BytesSchema import org.apache.pulsar.common.naming.TopicName import org.apache.pulsar.common.schema.SchemaInfo +import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles import org.apache.spark.internal.Logging import org.apache.spark.sql.pulsar.PulsarOptions._ @@ -295,6 +301,8 @@ private[pulsar] case class PulsarMetadataReader( case None => throw new RuntimeException("Failed to get topics from configurations") } + + topicQuery() } private def getTopicPartitions(): Seq[String] = { @@ -336,6 +344,25 @@ private[pulsar] case class PulsarMetadataReader( .filter(tp => shortenedTopicsPattern.matcher(tp.split("\\:\\/\\/")(1)).matches()) } + private def topicQuery(): Unit = { + if (caseInsensitiveParameters.getOrElse(WaitingForNonExistedTopic, "false").toBoolean) { + // This method will wait the desired topics until it's created. + + val waitList = mutable.ListBuffer(topics: _*) + while (waitList.nonEmpty) { + val topic = waitList.head + try { + admin.topics().getPartitionedTopicMetadata(topic) + waitList -= topic + } catch { + case _: NotFoundException => + logInfo(s"The desired $topic doesn't existed, wait for 5 seconds.") + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS) + } + } + } + } + def offsetForEachTopic( params: Map[String, String], defaultOffsets: PulsarOffset, @@ -494,6 +521,7 @@ private[pulsar] case class PulsarMetadataReader( } } + @tailrec private def fetchOffsetForTopic( poolTimeoutMs: Int, reportDataLoss: String => Unit, diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala index db1a06ba..3d89fb83 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala @@ -58,6 +58,7 @@ private[pulsar] object PulsarOptions { val TlsHostnameVerificationEnable: String = "tlsHostnameVerificationEnable" val AllowDifferentTopicSchemas: String = "allowDifferentTopicSchemas".toLowerCase(Locale.ROOT) + val WaitingForNonExistedTopic: String = "waitingForNonExistedTopic".toLowerCase(Locale.ROOT) val InstructionForFailOnDataLossFalse: String = """