Skip to content

Commit

Permalink
feat: hang on non-existed topics
Browse files Browse the repository at this point in the history
  • Loading branch information
syhily committed Feb 1, 2023
1 parent 820043e commit 5f2660e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import scala.annotation.tailrec
import scala.collection.mutable
import scala.language.postfixOps

import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles

import org.apache.spark.internal.Logging
import org.apache.spark.sql.pulsar.PulsarOptions._
Expand Down Expand Up @@ -295,6 +301,8 @@ private[pulsar] case class PulsarMetadataReader(
case None =>
throw new RuntimeException("Failed to get topics from configurations")
}

topicQuery()
}

private def getTopicPartitions(): Seq[String] = {
Expand Down Expand Up @@ -336,6 +344,25 @@ private[pulsar] case class PulsarMetadataReader(
.filter(tp => shortenedTopicsPattern.matcher(tp.split("\\:\\/\\/")(1)).matches())
}

private def topicQuery(): Unit = {
if (caseInsensitiveParameters.getOrElse(WaitingForNonExistedTopic, "false").toBoolean) {
// This method will wait the desired topics until it's created.

val waitList = mutable.ListBuffer(topics: _*)
while (waitList.nonEmpty) {
val topic = waitList.head
try {
admin.topics().getPartitionedTopicMetadata(topic)
waitList -= topic
} catch {
case _: NotFoundException =>
logInfo(s"The desired $topic doesn't existed, wait for 5 seconds.")
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS)
}
}
}
}

def offsetForEachTopic(
params: Map[String, String],
defaultOffsets: PulsarOffset,
Expand Down Expand Up @@ -494,6 +521,7 @@ private[pulsar] case class PulsarMetadataReader(
}
}

@tailrec
private def fetchOffsetForTopic(
poolTimeoutMs: Int,
reportDataLoss: String => Unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private[pulsar] object PulsarOptions {
val TlsHostnameVerificationEnable: String = "tlsHostnameVerificationEnable"

val AllowDifferentTopicSchemas: String = "allowDifferentTopicSchemas".toLowerCase(Locale.ROOT)
val WaitingForNonExistedTopic: String = "waitingForNonExistedTopic".toLowerCase(Locale.ROOT)

val InstructionForFailOnDataLossFalse: String =
"""
Expand Down

0 comments on commit 5f2660e

Please sign in to comment.