Skip to content

Commit

Permalink
feat: support iceberg offset parsing (#9)
Browse files Browse the repository at this point in the history
Add the ability to parse snapshot_id offsets from iceberg streams.
Also add reportGauge and reportHistogram overrides so that they can be
accessed from pyspark.


Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
Co-authored-by: Florian Sachse <florian@contiamo.com>
  • Loading branch information
LucasRoesler and megaflo authored Jun 19, 2023
1 parent 14e5287 commit 41510fd
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Build Image
run: docker build -t ${DOCKER_REGISTRY}:${GITHUB_REF_NAME} .
- name: Log Into Registry
uses: docker/login-action@v1
uses: docker/login-action@v2
with:
username: _json_key
password: ${{ secrets.GCR_JSON_KEY }}
Expand Down
95 changes: 92 additions & 3 deletions prometheus-export/src/main/scala/StreamingQuerySource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,19 @@ object StreamingQuerySource extends Logging {

/** extract the source name from the description for example
* "KafkaV2[Subscribe[mpathic-event]]" becomes "KafkaV2"
* "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681" becomes
* "iceberg"
*/
def extractSourceName(value: String): String = {
value.split("\\[").head
// if the value starts with org.apache.iceberg.spark.source.SparkMicroBatchStream return iceberg
if (
value.startsWith("org.apache.iceberg.spark.source.SparkMicroBatchStream")
) {
"IcebergMicroBatchStream"
} else {
// otherwise split on [ and take the first value
value.split("\\[").head
}
}

// An offset object is a json field with a map of topics to a map of partitions to offsets
Expand Down Expand Up @@ -80,6 +90,41 @@ object StreamingQuerySource extends Logging {
}
}

// An iceberg structured streaming offset looks like this
// {
// "description" : "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681",
// "startOffset" : {
// "version" : 1,
// "snapshot_id" : 3202691127021887425,
// "position" : 1,
// "scan_all_files" : false
// },
// "endOffset" : {
// "version" : 1,
// "snapshot_id" : 1926525026331941549,
// "position" : 1,
// "scan_all_files" : false
// },
// "latestOffset" : null,
// "numInputRows" : 15,
// "inputRowsPerSecond" : 0.25,
// "processedRowsPerSecond" : 5.210142410559222
// }
def icebergOffset(offsetJSON: String): Map[String, Either[Long, Boolean]] = {
if (offsetJSON != null && offsetJSON.nonEmpty) {
val trimmedOffsetJSON = offsetJSON.trim
trimmedOffsetJSON.headOption match {
case Some('{') if trimmedOffsetJSON.lastOption.contains('}') =>
implicit val formats = DefaultFormats
parse(offsetJSON).extract[Map[String, Either[Long, Boolean]]]
case _ =>
log.trace(s"Streaming offset data is not processable: $offsetJSON")
Map()
}
} else {
Map()
}
}
}

class StreamingQuerySource() extends Source with Logging {
Expand All @@ -101,6 +146,14 @@ class StreamingQuerySource() extends Source with Logging {
m.setValue(value)
}

def reportGauge(
name: String,
labels: java.util.HashMap[String, String],
value: Long
): Unit = {
reportGauge(name, labels.asScala.toMap, value)
}

def reportHistogram(
name: String,
labels: Map[String, String],
Expand All @@ -111,6 +164,14 @@ class StreamingQuerySource() extends Source with Logging {
meter.update(value)
}

def reportHistogram(
name: String,
labels: java.util.HashMap[String, String],
value: Long
): Unit = {
reportHistogram(name, labels.asScala.toMap, value)
}

/** Process the progress object and report metrics to the registry
*
* We want to use the `name` as a label in each metric we create
Expand Down Expand Up @@ -159,6 +220,7 @@ class StreamingQuerySource() extends Source with Logging {
}

for (source <- progress.sources) {
log.info(s"Reporting metrics for source ${source}")
val name = extractSourceName(source.description)
name match {
case "KafkaV2" => {
Expand Down Expand Up @@ -202,6 +264,33 @@ class StreamingQuerySource() extends Source with Logging {
}
}
}

case "IcebergMicroBatchStream" => {
reportHistogram(
"streaming.source.iceberg.rows",
commonLabels + ("source" -> name),
source.numInputRows
)

val latestOffset = icebergOffset(source.endOffset)
if (latestOffset.nonEmpty) {
reportGauge(
s"streaming.source.icberg.partition.offset",
commonLabels + ("offset_type" -> "latestOffset"),
latestOffset("snapshot_id").left.get
)
}

val startOffset = icebergOffset(source.startOffset)
if (startOffset.nonEmpty) {
reportGauge(
s"streaming.source.icberg.partition.offset",
commonLabels + ("offset_type" -> "startOffset"),
startOffset("snapshot_id").left.get
)
}
}

// TODO: support other sources
case _ => None
}
Expand All @@ -217,7 +306,7 @@ class StreamingQuerySource() extends Source with Logging {
): Unit = {
reportGauge(
"streaming.active_streams",
Map.empty,
Map.empty[String, String],
spark.streams.active.length
)
}
Expand All @@ -227,7 +316,7 @@ class StreamingQuerySource() extends Source with Logging {
): Unit = {
reportGauge(
"streaming.active_streams",
Map.empty,
Map.empty[String, String],
spark.streams.active.length
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ class StreamingQuerySourceSpec extends AnyFlatSpec with Matchers {
actualOutput shouldEqual expectedOutput
}

"extractSourceName" should "return the source name from a string" in {
"extractSourceName" should "return the Kafka source name from a string" in {
val input = "KafkaV2[Subscribe[mpathic-event]]"
val expectedOutput = "KafkaV2"
val actualOutput = StreamingQuerySource.extractSourceName(input)
actualOutput shouldEqual expectedOutput
}

"extractSourceName" should "return the Iceberg source name from a string" in {
val input = "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681"
val expectedOutput = "IcebergMicroBatchStream"
val actualOutput = StreamingQuerySource.extractSourceName(input)
actualOutput shouldEqual expectedOutput
}

"extractSubscriptionName" should "return the kafka topic from the description" in {
val input = "KafkaV2[Subscribe[mpathic-event]]"
val expectedOutput = "mpathic-event"
Expand Down

0 comments on commit 41510fd

Please sign in to comment.