Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Metrics][PR#3] Metrics report JSON serializer and LoggingMetricsReporter for the default engine #3904

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {

/////////////////
// Public APIs //
/////////////////

/**
* Serializes a {@link SnapshotReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeSnapshotReport(SnapshotReport snapshotReport)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we would have to do this for every type of report, right?

Any way to avoid that? (Since they would all just call OBJECT_MAPPER.writeValueAsString(inputVariable)

Could we just take in a DeltaOperationReport?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I wasn't sure about this because I also think there's a use-case where you are only serializing a specific report type. maybe we can have both serializeSnapshotReport and serializeDeltaOperationReport

But then I need to make DeltaOperationReport serializable (which currently I did not). What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every report we create will implement DeltaOperationReport, right?

And every report we create we will by default log in with our default engine, right?

Then it seems fair that DeltaOperationReport be serializable 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I have some thoughts that I haven't fully fleshed out.

Every operation-type report we create will implement DeltaOperationReport, possibly not every report. But yes we probably plan to log all report types in our default engine (I can't think of a reason not to currently). But that would argue that we should make MetricsReport serializable....

If we add a new report type i.e. XXReport extends DeltaOperationReport but don't make it serializable, it will still be serialized but will be missing additional information in XXReport.

Do we ever expect an report that only extends DeltaOperationReport? Maybe we just want to report that an operation occurred or success vs failure?

Need to verify that if both DeltaOperationReport and SnapshotReport are serializable jackson will use the lowest ancestor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is something we can flesh out later? These are all internal APIs and for now, I think, making sure the main report types are serializable is enough.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is something we can flesh out later?

Yup! SGTM!

throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/////////////////////////////////
// Private fields and methods //
////////////////////////////////

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.registerModule(new Jdk8Module()) // To support Optional
.registerModule( // Serialize Exception using toString()
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()));

private MetricsReportSerializers() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public SnapshotMetricsResult captureSnapshotMetricsResult() {
loadInitialDeltaActionsTimer.totalDurationNs();

@Override
public Optional<Long> timestampToVersionResolutionDurationNs() {
public Optional<Long> getTimestampToVersionResolutionDurationNs() {
return timestampToVersionResolutionDurationResult;
}

@Override
public long loadInitialDeltaActionsDurationNs() {
public long getLoadInitialDeltaActionsDurationNs() {
return loadInitialDeltaActionsDurationResult;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,32 @@ private SnapshotReportImpl(
}

@Override
public String tablePath() {
public String getTablePath() {
return tablePath;
}

@Override
public UUID reportUUID() {
public UUID getReportUUID() {
return reportUUID;
}

@Override
public SnapshotMetricsResult snapshotMetrics() {
public SnapshotMetricsResult getSnapshotMetrics() {
return snapshotMetrics;
}

@Override
public Optional<Long> version() {
public Optional<Long> getVersion() {
return version;
}

@Override
public Optional<Long> providedTimestamp() {
public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

@Override
public Optional<Exception> exception() {
public Optional<Exception> getException() {
return exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
public interface DeltaOperationReport extends MetricsReport {

/** @return the path of the table */
String tablePath();
String getTablePath();

/** @return a string representation of the operation this report is for */
String operationType();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> exception();
String getOperationType();

/** @return a unique ID for this report */
UUID reportUUID();
UUID getReportUUID();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> getException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public interface SnapshotMetricsResult {
* @return the duration (ns) to resolve the provided timestamp to a table version for timestamp
* time-travel queries. Empty for time-travel by version or non-time-travel queries.
*/
Optional<Long> timestampToVersionResolutionDurationNs();
Optional<Long> getTimestampToVersionResolutionDurationNs();

/**
* @return the duration (ns) to load the initial delta actions for the snapshot (such as the table
* protocol and metadata). 0 if snapshot construction fails before log replay.
*/
long loadInitialDeltaActionsDurationNs();
long getLoadInitialDeltaActionsDurationNs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@
*/
package io.delta.kernel.metrics;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Optional;

/** Defines the metadata and metrics for a snapshot construction {@link MetricsReport} */
@JsonSerialize(as = SnapshotReport.class)
@JsonPropertyOrder({
"tablePath",
"operationType",
"reportUUID",
"exception",
"version",
"providedTimestamp",
"snapshotMetrics"
})
public interface SnapshotReport extends DeltaOperationReport {

/**
Expand All @@ -30,19 +42,19 @@ public interface SnapshotReport extends DeltaOperationReport {
*
* @return the version of the snapshot
*/
Optional<Long> version();
Optional<Long> getVersion();

/**
* @return the timestamp provided for time-travel, empty if this is not a timestamp-based
* time-travel query
*/
Optional<Long> providedTimestamp();
Optional<Long> getProvidedTimestamp();

/** @return the metrics for this snapshot construction */
SnapshotMetricsResult snapshotMetrics();
SnapshotMetricsResult getSnapshotMetrics();

@Override
default String operationType() {
default String getOperationType() {
return "Snapshot";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics

import java.util.Optional

import io.delta.kernel.metrics.SnapshotReport
import org.scalatest.funsuite.AnyFunSuite

class MetricsReportSerializerSuite extends AnyFunSuite {

private def optionToString[T](option: Optional[T]): String = {
if (option.isPresent) {
if (option.get().isInstanceOf[String]) {
s""""${option.get()}"""" // For string objects wrap with quotes
} else {
option.get().toString
}
} else {
"null"
}
}

private def testSnapshotReport(snapshotReport: SnapshotReport): Unit = {
val timestampToVersionResolutionDuration = optionToString(
snapshotReport.getSnapshotMetrics().getTimestampToVersionResolutionDurationNs())
val loadProtocolAndMetadataDuration =
snapshotReport.getSnapshotMetrics().getLoadInitialDeltaActionsDurationNs()
val exception: Optional[String] = snapshotReport.getException().map(_.toString)
val expectedJson =
s"""
|{"tablePath":"${snapshotReport.getTablePath()}",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport.getReportUUID()}",
|"exception":${optionToString(exception)},
|"version":${optionToString(snapshotReport.getVersion())},
|"providedTimestamp":${optionToString(snapshotReport.getProvidedTimestamp())},
|"snapshotMetrics":{
|"timestampToVersionResolutionDurationNs":${timestampToVersionResolutionDuration},
|"loadInitialDeltaActionsDurationNs":${loadProtocolAndMetadataDuration}
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport))
}

test("SnapshotReport serializer") {
val snapshotContext1 = SnapshotQueryContext.forTimestampSnapshot("/table/path", 0)
snapshotContext1.getSnapshotMetrics.timestampToVersionResolutionTimer.record(10)
snapshotContext1.getSnapshotMetrics.loadInitialDeltaActionsTimer.record(1000)
snapshotContext1.setVersion(1)
val exception = new RuntimeException("something something failed")

val snapshotReport1 = SnapshotReportImpl.forError(
snapshotContext1,
exception
)

// Manually check expected JSON
val expectedJson =
s"""
|{"tablePath":"/table/path",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport1.getReportUUID()}",
|"exception":"$exception",
|"version":1,
|"providedTimestamp":0,
|"snapshotMetrics":{
|"timestampToVersionResolutionDurationNs":10,
|"loadInitialDeltaActionsDurationNs":1000
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1))

// Check with test function
testSnapshotReport(snapshotReport1)

// Empty options for all possible fields (version, providedTimestamp and exception)
val snapshotContext2 = SnapshotQueryContext.forLatestSnapshot("/table/path")
val snapshotReport2 = SnapshotReportImpl.forSuccess(snapshotContext2)
testSnapshotReport(snapshotReport2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.delta.kernel.defaults.engine;

import io.delta.kernel.engine.*;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

/** Default implementation of {@link Engine} based on Hadoop APIs. */
Expand Down Expand Up @@ -46,6 +48,11 @@ public ParquetHandler getParquetHandler() {
return new DefaultParquetHandler(hadoopConf);
}

@Override
public List<MetricsReporter> getMetricsReporters() {
return Collections.singletonList(new LoggingMetricsReporter());
};

/**
* Create an instance of {@link DefaultEngine}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.delta.kernel.engine.MetricsReporter;
import io.delta.kernel.internal.metrics.MetricsReportSerializers;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info
* level.
*/
public class LoggingMetricsReporter implements MetricsReporter {

private static final Logger logger = LoggerFactory.getLogger(LoggingMetricsReporter.class);

@Override
public void report(MetricsReport report) {
try {
if (report instanceof SnapshotReport) {
logger.info(
"SnapshotReport = {}",
MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report));
} else {
logger.info(
"{} = [{} does not support serializing this type of MetricReport]",
report.getClass(),
this.getClass());
}
} catch (JsonProcessingException e) {
logger.info("Encountered exception while serializing report {}: {}", report, e);
}
}
}
Loading
Loading