From 2423c7d91237c56b9d23ee3be338f53a52dec152 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Mon, 28 Nov 2022 09:26:54 -0500 Subject: [PATCH] Add failure origins to APM trace (#19665) --- .../airbyte/workers/helper/FailureHelper.java | 1 + .../workers/helper/FailureHelperTest.java | 10 ++++ .../main/resources/types/FailureReason.yaml | 1 + .../metrics/lib/ApmTraceConstants.java | 5 ++ .../io/airbyte/metrics/lib/MetricTags.java | 2 +- ...obCreationAndStatusUpdateActivityImpl.java | 60 ++++++++++++++----- ...obCreationAndStatusUpdateActivityTest.java | 10 ++++ 7 files changed, 73 insertions(+), 16 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java index d205894479c6..fff282e412fc 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java @@ -173,6 +173,7 @@ public static FailureReason dbtFailure(final Throwable t, final Long jobId, fina public static FailureReason unknownOriginFailure(final Throwable t, final Long jobId, final Integer attemptNumber) { return genericFailure(t, jobId, attemptNumber) + .withFailureOrigin(FailureOrigin.UNKNOWN) .withExternalMessage("An unknown failure occurred"); } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/FailureHelperTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/FailureHelperTest.java index edbad7d065f8..263396f1e1eb 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/FailureHelperTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/FailureHelperTest.java @@ -179,4 +179,14 @@ void testOrderedFailures() throws Exception { assertEquals(failureReasonList.get(0), TRACE_FAILURE_REASON); } + @Test + void testUnknownOriginFailure() { + final Throwable t = new RuntimeException(); + final Long jobId = 12345L; + final Integer attemptNumber = 1; + final FailureReason failureReason = FailureHelper.unknownOriginFailure(t, jobId, attemptNumber); + assertEquals(FailureOrigin.UNKNOWN, failureReason.getFailureOrigin()); + assertEquals("An unknown failure occurred", failureReason.getExternalMessage()); + } + } diff --git a/airbyte-config/config-models/src/main/resources/types/FailureReason.yaml b/airbyte-config/config-models/src/main/resources/types/FailureReason.yaml index 72dced892a78..3d10988d4f01 100644 --- a/airbyte-config/config-models/src/main/resources/types/FailureReason.yaml +++ b/airbyte-config/config-models/src/main/resources/types/FailureReason.yaml @@ -18,6 +18,7 @@ properties: - normalization - dbt - airbyte_platform + - unknown failureType: description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known. type: string diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java index 23ae35754c7f..41abede3e79b 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java @@ -63,6 +63,11 @@ public static final class Tags { */ public static final String DOCKER_IMAGE_KEY = "docker_image"; + /** + * Name of the APM trace tag that holds the failure origin(s) associated with the trace. + */ + public static final String FAILURE_ORIGINS_KEY = "failure_origins"; + /** * Name of the APM trace tag that holds the job ID value associated with the trace. */ diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 6378b0c12fcd..99e6fbe60469 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -29,7 +29,7 @@ public static String getReleaseStage(final ReleaseStage stage) { } public static String getFailureOrigin(final FailureOrigin origin) { - return origin != null ? origin.value() : UNKNOWN; + return origin != null ? origin.value() : FailureOrigin.UNKNOWN.value(); } public static String getJobStatus(final JobStatus status) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index acadfb2bc98a..cb3deff0c47a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -8,6 +8,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.persistence.job.models.AttemptStatus.FAILED; @@ -22,6 +23,7 @@ import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.DestinationConnection; import io.airbyte.config.FailureReason; +import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.JobConfig; import io.airbyte.config.JobOutput; import io.airbyte.config.JobSyncConfig; @@ -56,6 +58,7 @@ import io.airbyte.workers.run.TemporalWorkerRunFactory; import io.airbyte.workers.run.WorkerRun; import io.micronaut.context.annotation.Requires; +import io.micronaut.core.util.CollectionUtils; import jakarta.inject.Singleton; import java.io.IOException; import java.nio.file.Path; @@ -67,6 +70,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -179,9 +183,8 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds @Override public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); - final long jobId = input.getJobId(); + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId)); final Job createdJob = jobPersistence.getJob(jobId); final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob); @@ -200,9 +203,8 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) @Override public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException { try { - ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId())); - final long jobId = input.getJobId(); + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId)); final Job createdJob = jobPersistence.getJob(jobId); final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob); @@ -221,10 +223,9 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI @Override public void jobSuccess(final JobSuccessInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); - final long jobId = input.getJobId(); final int attemptId = input.getAttemptId(); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId)); if (input.getStandardSyncOutput() != null) { final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); @@ -287,12 +288,13 @@ public void jobFailure(final JobFailureInput input) { @Override public void attemptFailure(final AttemptFailureInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); - final int attemptId = input.getAttemptId(); final long jobId = input.getJobId(); final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary(); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId)); + traceFailures(failureSummary); + jobPersistence.failAttempt(jobId, attemptId); jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary); @@ -302,11 +304,7 @@ public void attemptFailure(final AttemptFailureInput input) { } emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId); - for (final FailureReason reason : failureSummary.getFailures()) { - MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, - new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin()))); - } - + trackFailures(failureSummary); } catch (final IOException e) { throw new RetryableException(e); } @@ -329,10 +327,9 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu @Override public void jobCancelled(final JobCancelledInput input) { try { - ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); - final long jobId = input.getJobId(); final int attemptId = input.getAttemptId(); + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId)); jobPersistence.failAttempt(jobId, attemptId); jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary()); jobPersistence.cancelJob(jobId); @@ -487,4 +484,37 @@ private void trackCompletionForInternalFailure(final Long jobId, jobTracker.trackSyncForInternalFailure(jobId, connectionId, attemptId, Enums.convertTo(status, JobState.class), e); } + /** + * Adds the failure origins to the APM trace. + * + * @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s). + */ + private void traceFailures(final AttemptFailureSummary failureSummary) { + if (failureSummary != null) { + if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) { + ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map( + FailureOrigin::name).collect(Collectors.joining(",")))); + } + } else { + ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, FailureOrigin.UNKNOWN.value())); + } + } + + /** + * Records a metric for each failure reason. + * + * @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s). + */ + private void trackFailures(final AttemptFailureSummary failureSummary) { + if (failureSummary != null) { + for (final FailureReason reason : failureSummary.getFailures()) { + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, + new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin()))); + } + } else { + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, + new MetricAttribute(MetricTags.FAILURE_ORIGIN, FailureOrigin.UNKNOWN.value())); + } + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index a4752364d409..65cc9a3a22ff 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -448,6 +448,16 @@ void setAttemptFailure() throws IOException { verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary); } + @Test + void setAttemptFailureManuallyTerminated() throws IOException { + jobCreationAndStatusUpdateActivity + .attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput, null)); + + verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); + verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput); + verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, null); + } + @Test void setAttemptFailureWrapException() throws IOException { final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE);