diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 4cca953..6954594 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -17,7 +17,7 @@ jobs: - name: Build Image run: docker build -t ${DOCKER_REGISTRY}:${GITHUB_REF_NAME} . - name: Log Into Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: username: _json_key password: ${{ secrets.GCR_JSON_KEY }} diff --git a/prometheus-export/src/main/scala/StreamingQuerySource.scala b/prometheus-export/src/main/scala/StreamingQuerySource.scala index 1de54e0..2bd3723 100644 --- a/prometheus-export/src/main/scala/StreamingQuerySource.scala +++ b/prometheus-export/src/main/scala/StreamingQuerySource.scala @@ -35,9 +35,19 @@ object StreamingQuerySource extends Logging { /** extract the source name from the description for example * "KafkaV2[Subscribe[mpathic-event]]" becomes "KafkaV2" + * "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681" becomes + * "iceberg" */ def extractSourceName(value: String): String = { - value.split("\\[").head + // if the value starts with org.apache.iceberg.spark.source.SparkMicroBatchStream return iceberg + if ( + value.startsWith("org.apache.iceberg.spark.source.SparkMicroBatchStream") + ) { + "IcebergMicroBatchStream" + } else { + // otherwise split on [ and take the first value + value.split("\\[").head + } } // An offset object is a json field with a map of topics to a map of partitions to offsets @@ -80,6 +90,41 @@ object StreamingQuerySource extends Logging { } } + // An iceberg structured streaming offset looks like this + // { + // "description" : "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681", + // "startOffset" : { + // "version" : 1, + // "snapshot_id" : 3202691127021887425, + // "position" : 1, + // "scan_all_files" : false + // }, + // "endOffset" : { + // "version" : 1, + // "snapshot_id" : 1926525026331941549, + // "position" : 1, + // "scan_all_files" : false + // }, + // "latestOffset" : null, + // "numInputRows" : 15, + // "inputRowsPerSecond" : 0.25, + // "processedRowsPerSecond" : 5.210142410559222 + // } + def icebergOffset(offsetJSON: String): Map[String, Either[Long, Boolean]] = { + if (offsetJSON != null && offsetJSON.nonEmpty) { + val trimmedOffsetJSON = offsetJSON.trim + trimmedOffsetJSON.headOption match { + case Some('{') if trimmedOffsetJSON.lastOption.contains('}') => + implicit val formats = DefaultFormats + parse(offsetJSON).extract[Map[String, Either[Long, Boolean]]] + case _ => + log.trace(s"Streaming offset data is not processable: $offsetJSON") + Map() + } + } else { + Map() + } + } } class StreamingQuerySource() extends Source with Logging { @@ -101,6 +146,14 @@ class StreamingQuerySource() extends Source with Logging { m.setValue(value) } + def reportGauge( + name: String, + labels: java.util.HashMap[String, String], + value: Long + ): Unit = { + reportGauge(name, labels.asScala.toMap, value) + } + def reportHistogram( name: String, labels: Map[String, String], @@ -111,6 +164,14 @@ class StreamingQuerySource() extends Source with Logging { meter.update(value) } + def reportHistogram( + name: String, + labels: java.util.HashMap[String, String], + value: Long + ): Unit = { + reportHistogram(name, labels.asScala.toMap, value) + } + /** Process the progress object and report metrics to the registry * * We want to use the `name` as a label in each metric we create @@ -159,6 +220,7 @@ class StreamingQuerySource() extends Source with Logging { } for (source <- progress.sources) { + log.info(s"Reporting metrics for source ${source}") val name = extractSourceName(source.description) name match { case "KafkaV2" => { @@ -202,6 +264,33 @@ class StreamingQuerySource() extends Source with Logging { } } } + + case "IcebergMicroBatchStream" => { + reportHistogram( + "streaming.source.iceberg.rows", + commonLabels + ("source" -> name), + source.numInputRows + ) + + val latestOffset = icebergOffset(source.endOffset) + if (latestOffset.nonEmpty) { + reportGauge( + s"streaming.source.icberg.partition.offset", + commonLabels + ("offset_type" -> "latestOffset"), + latestOffset("snapshot_id").left.get + ) + } + + val startOffset = icebergOffset(source.startOffset) + if (startOffset.nonEmpty) { + reportGauge( + s"streaming.source.icberg.partition.offset", + commonLabels + ("offset_type" -> "startOffset"), + startOffset("snapshot_id").left.get + ) + } + } + // TODO: support other sources case _ => None } @@ -217,7 +306,7 @@ class StreamingQuerySource() extends Source with Logging { ): Unit = { reportGauge( "streaming.active_streams", - Map.empty, + Map.empty[String, String], spark.streams.active.length ) } @@ -227,7 +316,7 @@ class StreamingQuerySource() extends Source with Logging { ): Unit = { reportGauge( "streaming.active_streams", - Map.empty, + Map.empty[String, String], spark.streams.active.length ) } diff --git a/prometheus-export/src/test/scala/StreamingQuerySourceTest.scala b/prometheus-export/src/test/scala/StreamingQuerySourceTest.scala index 15df000..2adef5e 100644 --- a/prometheus-export/src/test/scala/StreamingQuerySourceTest.scala +++ b/prometheus-export/src/test/scala/StreamingQuerySourceTest.scala @@ -19,13 +19,20 @@ class StreamingQuerySourceSpec extends AnyFlatSpec with Matchers { actualOutput shouldEqual expectedOutput } - "extractSourceName" should "return the source name from a string" in { + "extractSourceName" should "return the Kafka source name from a string" in { val input = "KafkaV2[Subscribe[mpathic-event]]" val expectedOutput = "KafkaV2" val actualOutput = StreamingQuerySource.extractSourceName(input) actualOutput shouldEqual expectedOutput } + "extractSourceName" should "return the Iceberg source name from a string" in { + val input = "org.apache.iceberg.spark.source.SparkMicroBatchStream@32c41681" + val expectedOutput = "IcebergMicroBatchStream" + val actualOutput = StreamingQuerySource.extractSourceName(input) + actualOutput shouldEqual expectedOutput + } + "extractSubscriptionName" should "return the kafka topic from the description" in { val input = "KafkaV2[Subscribe[mpathic-event]]" val expectedOutput = "mpathic-event"