Skip to content

Commit

Permalink
Add failure origins to APM trace (#19665)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Nov 28, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 373ba6a commit 2423c7d
Showing 7 changed files with 73 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -173,6 +173,7 @@ public static FailureReason dbtFailure(final Throwable t, final Long jobId, fina

public static FailureReason unknownOriginFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.UNKNOWN)
.withExternalMessage("An unknown failure occurred");
}

Original file line number Diff line number Diff line change
@@ -179,4 +179,14 @@ void testOrderedFailures() throws Exception {
assertEquals(failureReasonList.get(0), TRACE_FAILURE_REASON);
}

@Test
void testUnknownOriginFailure() {
final Throwable t = new RuntimeException();
final Long jobId = 12345L;
final Integer attemptNumber = 1;
final FailureReason failureReason = FailureHelper.unknownOriginFailure(t, jobId, attemptNumber);
assertEquals(FailureOrigin.UNKNOWN, failureReason.getFailureOrigin());
assertEquals("An unknown failure occurred", failureReason.getExternalMessage());
}

}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ properties:
- normalization
- dbt
- airbyte_platform
- unknown
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
Original file line number Diff line number Diff line change
@@ -63,6 +63,11 @@ public static final class Tags {
*/
public static final String DOCKER_IMAGE_KEY = "docker_image";

/**
* Name of the APM trace tag that holds the failure origin(s) associated with the trace.
*/
public static final String FAILURE_ORIGINS_KEY = "failure_origins";

/**
* Name of the APM trace tag that holds the job ID value associated with the trace.
*/
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ public static String getReleaseStage(final ReleaseStage stage) {
}

public static String getFailureOrigin(final FailureOrigin origin) {
return origin != null ? origin.value() : UNKNOWN;
return origin != null ? origin.value() : FailureOrigin.UNKNOWN.value();
}

public static String getJobStatus(final JobStatus status) {
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;

@@ -22,6 +23,7 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
@@ -56,6 +58,7 @@
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.run.WorkerRun;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.nio.file.Path;
@@ -67,6 +70,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@@ -179,9 +183,8 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
@Override
public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job createdJob = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
@@ -200,9 +203,8 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input)
@Override
public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job createdJob = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
@@ -221,10 +223,9 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI
@Override
public void jobSuccess(final JobSuccessInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
@@ -287,12 +288,13 @@ public void jobFailure(final JobFailureInput input) {
@Override
public void attemptFailure(final AttemptFailureInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final int attemptId = input.getAttemptId();
final long jobId = input.getJobId();
final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();

ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
traceFailures(failureSummary);

jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);

@@ -302,11 +304,7 @@ public void attemptFailure(final AttemptFailureInput input) {
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
for (final FailureReason reason : failureSummary.getFailures()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}

trackFailures(failureSummary);
} catch (final IOException e) {
throw new RetryableException(e);
}
@@ -329,10 +327,9 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
@Override
public void jobCancelled(final JobCancelledInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary());
jobPersistence.cancelJob(jobId);
@@ -487,4 +484,37 @@ private void trackCompletionForInternalFailure(final Long jobId,
jobTracker.trackSyncForInternalFailure(jobId, connectionId, attemptId, Enums.convertTo(status, JobState.class), e);
}

/**
* Adds the failure origins to the APM trace.
*
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
*/
private void traceFailures(final AttemptFailureSummary failureSummary) {
if (failureSummary != null) {
if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) {
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map(
FailureOrigin::name).collect(Collectors.joining(","))));
}
} else {
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, FailureOrigin.UNKNOWN.value()));
}
}

/**
* Records a metric for each failure reason.
*
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
*/
private void trackFailures(final AttemptFailureSummary failureSummary) {
if (failureSummary != null) {
for (final FailureReason reason : failureSummary.getFailures()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}
} else {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, FailureOrigin.UNKNOWN.value()));
}
}

}
Original file line number Diff line number Diff line change
@@ -448,6 +448,16 @@ void setAttemptFailure() throws IOException {
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
}

@Test
void setAttemptFailureManuallyTerminated() throws IOException {
jobCreationAndStatusUpdateActivity
.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput, null));

verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, null);
}

@Test
void setAttemptFailureWrapException() throws IOException {
final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE);

0 comments on commit 2423c7d

Please sign in to comment.