From fb9bb1330aa68c2a4e1961bbae426a30dde377c6 Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Wed, 8 Jan 2025 18:33:40 -0800 Subject: [PATCH] [Kernel][Metrics][PR#3] Metrics report JSON serializer and LoggingMetricsReporter for the default engine (#3904) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description This PR is based off of https://github.com/delta-io/delta/pull/3903 See the diff for just this PR [here](https://github.com/delta-io/delta/pull/3904/files/aec95cf3dc0086c37f4c45e2b3e192b7b881768c..678ac473f4de65a8f7fd770696aad2d31a15aef7) Adds a JSON serializer for metrics reports with serialization logic for SnapshotReport. Also adds a `LoggingMetricsReporter` to the default implementation which simply logs the JSON serialized reports using Log4J. ## How was this patch tested? Adds a test suite. ## Does this PR introduce _any_ user-facing changes? No. --- build.sbt | 1 + .../metrics/MetricsReportSerializers.java | 54 +++++++++++ .../internal/metrics/SnapshotMetrics.java | 4 +- .../internal/metrics/SnapshotReportImpl.java | 12 +-- .../kernel/metrics/DeltaOperationReport.java | 12 +-- .../kernel/metrics/SnapshotMetricsResult.java | 4 +- .../delta/kernel/metrics/SnapshotReport.java | 20 +++- .../MetricsReportSerializerSuite.scala | 97 +++++++++++++++++++ .../kernel/defaults/engine/DefaultEngine.java | 7 ++ .../engine/LoggingMetricsReporter.java | 51 ++++++++++ .../kernel/defaults/MetricsReportSuite.scala | 39 ++++---- 11 files changed, 264 insertions(+), 37 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java create mode 100644 kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java diff --git a/build.sbt b/build.sbt index 08f524b0f02..a3944719b2e 100644 --- a/build.sbt +++ b/build.sbt @@ -658,6 +658,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion, "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5", + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5", "org.apache.parquet" % "parquet-hadoop" % "1.12.3", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java new file mode 100644 index 00000000000..83aa2b2771f --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java @@ -0,0 +1,54 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.SnapshotReport; + +/** Defines JSON serializers for {@link MetricsReport} types */ +public final class MetricsReportSerializers { + + ///////////////// + // Public APIs // + ///////////////// + + /** + * Serializes a {@link SnapshotReport} to a JSON string + * + * @throws JsonProcessingException + */ + public static String serializeSnapshotReport(SnapshotReport snapshotReport) + throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(snapshotReport); + } + + ///////////////////////////////// + // Private fields and methods // + //////////////////////////////// + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .registerModule(new Jdk8Module()) // To support Optional + .registerModule( // Serialize Exception using toString() + new SimpleModule().addSerializer(Exception.class, new ToStringSerializer())); + + private MetricsReportSerializers() {} +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java index 2a03ac3d78c..bd810ad2f2d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java @@ -41,12 +41,12 @@ public SnapshotMetricsResult captureSnapshotMetricsResult() { loadInitialDeltaActionsTimer.totalDurationNs(); @Override - public Optional timestampToVersionResolutionDurationNs() { + public Optional getTimestampToVersionResolutionDurationNs() { return timestampToVersionResolutionDurationResult; } @Override - public long loadInitialDeltaActionsDurationNs() { + public long getLoadInitialDeltaActionsDurationNs() { return loadInitialDeltaActionsDurationResult; } }; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java index ef05fdbc06a..b7de2067b99 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java @@ -76,32 +76,32 @@ private SnapshotReportImpl( } @Override - public String tablePath() { + public String getTablePath() { return tablePath; } @Override - public UUID reportUUID() { + public UUID getReportUUID() { return reportUUID; } @Override - public SnapshotMetricsResult snapshotMetrics() { + public SnapshotMetricsResult getSnapshotMetrics() { return snapshotMetrics; } @Override - public Optional version() { + public Optional getVersion() { return version; } @Override - public Optional providedTimestamp() { + public Optional getProvidedTimestamp() { return providedTimestamp; } @Override - public Optional exception() { + public Optional getException() { return exception; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/DeltaOperationReport.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/DeltaOperationReport.java index 5d8dfe7fbb6..9deca7535d2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/DeltaOperationReport.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/DeltaOperationReport.java @@ -22,14 +22,14 @@ public interface DeltaOperationReport extends MetricsReport { /** @return the path of the table */ - String tablePath(); + String getTablePath(); /** @return a string representation of the operation this report is for */ - String operationType(); - - /** @return the exception thrown if this report is for a failed operation, otherwise empty */ - Optional exception(); + String getOperationType(); /** @return a unique ID for this report */ - UUID reportUUID(); + UUID getReportUUID(); + + /** @return the exception thrown if this report is for a failed operation, otherwise empty */ + Optional getException(); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java index 7ec93e22c69..c129136b4b6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java @@ -24,11 +24,11 @@ public interface SnapshotMetricsResult { * @return the duration (ns) to resolve the provided timestamp to a table version for timestamp * time-travel queries. Empty for time-travel by version or non-time-travel queries. */ - Optional timestampToVersionResolutionDurationNs(); + Optional getTimestampToVersionResolutionDurationNs(); /** * @return the duration (ns) to load the initial delta actions for the snapshot (such as the table * protocol and metadata). 0 if snapshot construction fails before log replay. */ - long loadInitialDeltaActionsDurationNs(); + long getLoadInitialDeltaActionsDurationNs(); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java index 3458bd8180c..c687c7e7f9c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java @@ -15,9 +15,21 @@ */ package io.delta.kernel.metrics; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.Optional; /** Defines the metadata and metrics for a snapshot construction {@link MetricsReport} */ +@JsonSerialize(as = SnapshotReport.class) +@JsonPropertyOrder({ + "tablePath", + "operationType", + "reportUUID", + "exception", + "version", + "providedTimestamp", + "snapshotMetrics" +}) public interface SnapshotReport extends DeltaOperationReport { /** @@ -30,19 +42,19 @@ public interface SnapshotReport extends DeltaOperationReport { * * @return the version of the snapshot */ - Optional version(); + Optional getVersion(); /** * @return the timestamp provided for time-travel, empty if this is not a timestamp-based * time-travel query */ - Optional providedTimestamp(); + Optional getProvidedTimestamp(); /** @return the metrics for this snapshot construction */ - SnapshotMetricsResult snapshotMetrics(); + SnapshotMetricsResult getSnapshotMetrics(); @Override - default String operationType() { + default String getOperationType() { return "Snapshot"; } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala new file mode 100644 index 00000000000..9e94ef11f76 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala @@ -0,0 +1,97 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics + +import java.util.Optional + +import io.delta.kernel.metrics.SnapshotReport +import org.scalatest.funsuite.AnyFunSuite + +class MetricsReportSerializerSuite extends AnyFunSuite { + + private def optionToString[T](option: Optional[T]): String = { + if (option.isPresent) { + if (option.get().isInstanceOf[String]) { + s""""${option.get()}"""" // For string objects wrap with quotes + } else { + option.get().toString + } + } else { + "null" + } + } + + private def testSnapshotReport(snapshotReport: SnapshotReport): Unit = { + val timestampToVersionResolutionDuration = optionToString( + snapshotReport.getSnapshotMetrics().getTimestampToVersionResolutionDurationNs()) + val loadProtocolAndMetadataDuration = + snapshotReport.getSnapshotMetrics().getLoadInitialDeltaActionsDurationNs() + val exception: Optional[String] = snapshotReport.getException().map(_.toString) + val expectedJson = + s""" + |{"tablePath":"${snapshotReport.getTablePath()}", + |"operationType":"Snapshot", + |"reportUUID":"${snapshotReport.getReportUUID()}", + |"exception":${optionToString(exception)}, + |"version":${optionToString(snapshotReport.getVersion())}, + |"providedTimestamp":${optionToString(snapshotReport.getProvidedTimestamp())}, + |"snapshotMetrics":{ + |"timestampToVersionResolutionDurationNs":${timestampToVersionResolutionDuration}, + |"loadInitialDeltaActionsDurationNs":${loadProtocolAndMetadataDuration} + |} + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport)) + } + + test("SnapshotReport serializer") { + val snapshotContext1 = SnapshotQueryContext.forTimestampSnapshot("/table/path", 0) + snapshotContext1.getSnapshotMetrics.timestampToVersionResolutionTimer.record(10) + snapshotContext1.getSnapshotMetrics.loadInitialDeltaActionsTimer.record(1000) + snapshotContext1.setVersion(1) + val exception = new RuntimeException("something something failed") + + val snapshotReport1 = SnapshotReportImpl.forError( + snapshotContext1, + exception + ) + + // Manually check expected JSON + val expectedJson = + s""" + |{"tablePath":"/table/path", + |"operationType":"Snapshot", + |"reportUUID":"${snapshotReport1.getReportUUID()}", + |"exception":"$exception", + |"version":1, + |"providedTimestamp":0, + |"snapshotMetrics":{ + |"timestampToVersionResolutionDurationNs":10, + |"loadInitialDeltaActionsDurationNs":1000 + |} + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1)) + + // Check with test function + testSnapshotReport(snapshotReport1) + + // Empty options for all possible fields (version, providedTimestamp and exception) + val snapshotContext2 = SnapshotQueryContext.forLatestSnapshot("/table/path") + val snapshotReport2 = SnapshotReportImpl.forSuccess(snapshotContext2) + testSnapshotReport(snapshotReport2) + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java index 7b9172c19f1..bc6a7a24756 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java @@ -16,6 +16,8 @@ package io.delta.kernel.defaults.engine; import io.delta.kernel.engine.*; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.conf.Configuration; /** Default implementation of {@link Engine} based on Hadoop APIs. */ @@ -46,6 +48,11 @@ public ParquetHandler getParquetHandler() { return new DefaultParquetHandler(hadoopConf); } + @Override + public List getMetricsReporters() { + return Collections.singletonList(new LoggingMetricsReporter()); + }; + /** * Create an instance of {@link DefaultEngine}. * diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java new file mode 100644 index 00000000000..df71189d895 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java @@ -0,0 +1,51 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.delta.kernel.engine.MetricsReporter; +import io.delta.kernel.internal.metrics.MetricsReportSerializers; +import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.SnapshotReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info + * level. + */ +public class LoggingMetricsReporter implements MetricsReporter { + + private static final Logger logger = LoggerFactory.getLogger(LoggingMetricsReporter.class); + + @Override + public void report(MetricsReport report) { + try { + if (report instanceof SnapshotReport) { + logger.info( + "SnapshotReport = {}", + MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report)); + } else { + logger.info( + "{} = [{} does not support serializing this type of MetricReport]", + report.getClass(), + this.getClass()); + } + } catch (JsonProcessingException e) { + logger.info("Encountered exception while serializing report {}: {}", report, e); + } + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala index 217d7136117..33c6a3f7296 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala @@ -110,34 +110,34 @@ class MetricsReportSuite extends AnyFunSuite with TestUtils { val (snapshotReport, duration, exception) = getSnapshotReport(f, path, expectException) // Verify contents - assert(snapshotReport.tablePath == resolvePath(path)) - assert(snapshotReport.operationType == "Snapshot") + assert(snapshotReport.getTablePath == resolvePath(path)) + assert(snapshotReport.getOperationType == "Snapshot") exception match { case Some(e) => - assert(snapshotReport.exception().isPresent && - Objects.equals(snapshotReport.exception().get(), e)) - case None => assert(!snapshotReport.exception().isPresent) + assert(snapshotReport.getException().isPresent && + Objects.equals(snapshotReport.getException().get(), e)) + case None => assert(!snapshotReport.getException().isPresent) } - assert(snapshotReport.reportUUID != null) - assert(Objects.equals(snapshotReport.version, expectedVersion), - s"Expected version $expectedVersion found ${snapshotReport.version}") - assert(Objects.equals(snapshotReport.providedTimestamp, expectedProvidedTimestamp)) + assert(snapshotReport.getReportUUID != null) + assert(Objects.equals(snapshotReport.getVersion, expectedVersion), + s"Expected version $expectedVersion found ${snapshotReport.getVersion}") + assert(Objects.equals(snapshotReport.getProvidedTimestamp, expectedProvidedTimestamp)) // Since we cannot know the actual durations of these we sanity check that they are > 0 and // less than the total operation duration whenever they are expected to be non-zero/non-empty if (expectNonEmptyTimestampToVersionResolutionDuration) { - assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDurationNs.isPresent) - assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDurationNs.get > 0) - assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDurationNs.get < + assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDurationNs.isPresent) + assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDurationNs.get > 0) + assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDurationNs.get < duration) } else { - assert(!snapshotReport.snapshotMetrics.timestampToVersionResolutionDurationNs.isPresent) + assert(!snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDurationNs.isPresent) } if (expectNonZeroLoadProtocolAndMetadataDuration) { - assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDurationNs > 0) - assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDurationNs < duration) + assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDurationNs > 0) + assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDurationNs < duration) } else { - assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDurationNs == 0) + assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDurationNs == 0) } } @@ -406,6 +406,11 @@ class MetricsReportSuite extends AnyFunSuite with TestUtils { override def report(report: MetricsReport): Unit = buf.append(report) } + private val metricsReporters = new util.ArrayList[MetricsReporter]() {{ + addAll(baseEngine.getMetricsReporters) + add(metricsReporter) + }} + override def getExpressionHandler: ExpressionHandler = baseEngine.getExpressionHandler override def getJsonHandler: JsonHandler = baseEngine.getJsonHandler @@ -415,7 +420,7 @@ class MetricsReportSuite extends AnyFunSuite with TestUtils { override def getParquetHandler: ParquetHandler = baseEngine.getParquetHandler override def getMetricsReporters(): java.util.List[MetricsReporter] = { - java.util.Collections.singletonList(metricsReporter) + metricsReporters } } }