Skip to content

Commit

Permalink
[Kernel][Metrics][PR#3] Metrics report JSON serializer and LoggingMet…
Browse files Browse the repository at this point in the history
…ricsReporter for the default engine (delta-io#3904)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

This PR is based off of delta-io#3903
See the diff for just this PR
[here](https://github.com/delta-io/delta/pull/3904/files/aec95cf3dc0086c37f4c45e2b3e192b7b881768c..678ac473f4de65a8f7fd770696aad2d31a15aef7)

Adds a JSON serializer for metrics reports with serialization logic for
SnapshotReport. Also adds a `LoggingMetricsReporter` to the default
implementation which simply logs the JSON serialized reports using
Log4J.

## How was this patch tested?

Adds a test suite.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
allisonport-db authored and huan233usc committed Jan 17, 2025
1 parent dde5289 commit e1bdc33
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 31 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {

/////////////////
// Public APIs //
/////////////////

/**
* Serializes a {@link SnapshotReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeSnapshotReport(SnapshotReport snapshotReport)
throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/////////////////////////////////
// Private fields and methods //
////////////////////////////////

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // To support Optional
.registerModule( // Serialize Exception using toString()
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()));

private MetricsReportSerializers() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public SnapshotMetricsResult captureSnapshotMetricsResult() {
loadInitialDeltaActionsTimer.totalDurationNs();

@Override
public Optional<Long> timestampToVersionResolutionDurationNs() {
public Optional<Long> getTimestampToVersionResolutionDurationNs() {
return timestampToVersionResolutionDurationResult;
}

@Override
public long loadInitialDeltaActionsDurationNs() {
public long getLoadInitialDeltaActionsDurationNs() {
return loadInitialDeltaActionsDurationResult;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,32 @@ private SnapshotReportImpl(
}

@Override
public String tablePath() {
public String getTablePath() {
return tablePath;
}

@Override
public UUID reportUUID() {
public UUID getReportUUID() {
return reportUUID;
}

@Override
public SnapshotMetricsResult snapshotMetrics() {
public SnapshotMetricsResult getSnapshotMetrics() {
return snapshotMetrics;
}

@Override
public Optional<Long> version() {
public Optional<Long> getVersion() {
return version;
}

@Override
public Optional<Long> providedTimestamp() {
public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

@Override
public Optional<Exception> exception() {
public Optional<Exception> getException() {
return exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.metrics;

import java.util.Optional;
import java.util.UUID;

/** Defines the common fields that are shared by reports for Delta operations */
public interface DeltaOperationReport extends MetricsReport {

/** @return the path of the table */
String getTablePath();

/** @return a string representation of the operation this report is for */
String getOperationType();

/** @return a unique ID for this report */
UUID getReportUUID();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> getException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public interface SnapshotMetricsResult {
* @return the duration (ns) to resolve the provided timestamp to a table version for timestamp
* time-travel queries. Empty for time-travel by version or non-time-travel queries.
*/
Optional<Long> timestampToVersionResolutionDurationNs();
Optional<Long> getTimestampToVersionResolutionDurationNs();

/**
* @return the duration (ns) to load the initial delta actions for the snapshot (such as the table
* protocol and metadata). 0 if snapshot construction fails before log replay.
*/
long loadInitialDeltaActionsDurationNs();
long getLoadInitialDeltaActionsDurationNs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@
*/
package io.delta.kernel.metrics;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Optional;

/** Defines the metadata and metrics for a snapshot construction {@link MetricsReport} */
@JsonSerialize(as = SnapshotReport.class)
@JsonPropertyOrder({
"tablePath",
"operationType",
"reportUUID",
"exception",
"version",
"providedTimestamp",
"snapshotMetrics"
})
public interface SnapshotReport extends DeltaOperationReport {

/**
Expand All @@ -30,19 +42,19 @@ public interface SnapshotReport extends DeltaOperationReport {
*
* @return the version of the snapshot
*/
Optional<Long> version();
Optional<Long> getVersion();

/**
* @return the timestamp provided for time-travel, empty if this is not a timestamp-based
* time-travel query
*/
Optional<Long> providedTimestamp();
Optional<Long> getProvidedTimestamp();

/** @return the metrics for this snapshot construction */
SnapshotMetricsResult snapshotMetrics();
SnapshotMetricsResult getSnapshotMetrics();

@Override
default String operationType() {
default String getOperationType() {
return "Snapshot";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics

import java.util.Optional

import io.delta.kernel.metrics.SnapshotReport
import org.scalatest.funsuite.AnyFunSuite

class MetricsReportSerializerSuite extends AnyFunSuite {

private def optionToString[T](option: Optional[T]): String = {
if (option.isPresent) {
if (option.get().isInstanceOf[String]) {
s""""${option.get()}"""" // For string objects wrap with quotes
} else {
option.get().toString
}
} else {
"null"
}
}

private def testSnapshotReport(snapshotReport: SnapshotReport): Unit = {
val timestampToVersionResolutionDuration = optionToString(
snapshotReport.getSnapshotMetrics().getTimestampToVersionResolutionDurationNs())
val loadProtocolAndMetadataDuration =
snapshotReport.getSnapshotMetrics().getLoadInitialDeltaActionsDurationNs()
val exception: Optional[String] = snapshotReport.getException().map(_.toString)
val expectedJson =
s"""
|{"tablePath":"${snapshotReport.getTablePath()}",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport.getReportUUID()}",
|"exception":${optionToString(exception)},
|"version":${optionToString(snapshotReport.getVersion())},
|"providedTimestamp":${optionToString(snapshotReport.getProvidedTimestamp())},
|"snapshotMetrics":{
|"timestampToVersionResolutionDurationNs":${timestampToVersionResolutionDuration},
|"loadInitialDeltaActionsDurationNs":${loadProtocolAndMetadataDuration}
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport))
}

test("SnapshotReport serializer") {
val snapshotContext1 = SnapshotQueryContext.forTimestampSnapshot("/table/path", 0)
snapshotContext1.getSnapshotMetrics.timestampToVersionResolutionTimer.record(10)
snapshotContext1.getSnapshotMetrics.loadInitialDeltaActionsTimer.record(1000)
snapshotContext1.setVersion(1)
val exception = new RuntimeException("something something failed")

val snapshotReport1 = SnapshotReportImpl.forError(
snapshotContext1,
exception
)

// Manually check expected JSON
val expectedJson =
s"""
|{"tablePath":"/table/path",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport1.getReportUUID()}",
|"exception":"$exception",
|"version":1,
|"providedTimestamp":0,
|"snapshotMetrics":{
|"timestampToVersionResolutionDurationNs":10,
|"loadInitialDeltaActionsDurationNs":1000
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1))

// Check with test function
testSnapshotReport(snapshotReport1)

// Empty options for all possible fields (version, providedTimestamp and exception)
val snapshotContext2 = SnapshotQueryContext.forLatestSnapshot("/table/path")
val snapshotReport2 = SnapshotReportImpl.forSuccess(snapshotContext2)
testSnapshotReport(snapshotReport2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.delta.kernel.defaults.engine;

import io.delta.kernel.engine.*;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

/** Default implementation of {@link Engine} based on Hadoop APIs. */
Expand Down Expand Up @@ -46,6 +48,11 @@ public ParquetHandler getParquetHandler() {
return new DefaultParquetHandler(hadoopConf);
}

@Override
public List<MetricsReporter> getMetricsReporters() {
return Collections.singletonList(new LoggingMetricsReporter());
};

/**
* Create an instance of {@link DefaultEngine}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.delta.kernel.engine.MetricsReporter;
import io.delta.kernel.internal.metrics.MetricsReportSerializers;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info
* level.
*/
public class LoggingMetricsReporter implements MetricsReporter {

private static final Logger logger = LoggerFactory.getLogger(LoggingMetricsReporter.class);

@Override
public void report(MetricsReport report) {
try {
if (report instanceof SnapshotReport) {
logger.info(
"SnapshotReport = {}",
MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report));
} else {
logger.info(
"{} = [{} does not support serializing this type of MetricReport]",
report.getClass(),
this.getClass());
}
} catch (JsonProcessingException e) {
logger.info("Encountered exception while serializing report {}: {}", report, e);
}
}
}
Loading

0 comments on commit e1bdc33

Please sign in to comment.