Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

push failures to segment #10715

Merged
merged 6 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -91,6 +92,10 @@ public static JsonNode emptyObject() {
return jsonNode(Collections.emptyMap());
}

public static ArrayNode arrayNode() {
return OBJECT_MAPPER.createArrayNode();
}

public static <T> T object(final JsonNode jsonNode, final Class<T> klass) {
return OBJECT_MAPPER.convertValue(jsonNode, klass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ void testEmptyObject() {
assertEquals(Jsons.deserialize("{}"), Jsons.emptyObject());
}

@Test
void testArrayNode() {
assertEquals(Jsons.deserialize("[]"), Jsons.arrayNode());
}

@Test
void testToObject() {
final ToClass expected = new ToClass("abc", 999, 888L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

package io.airbyte.scheduler.persistence.job_tracker;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobOutput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardDestinationDefinition;
Expand All @@ -15,7 +20,11 @@
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.Job;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.util.Strings;

Expand Down Expand Up @@ -102,9 +111,50 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
metadata.put("volume_rows", syncSummary.getRecordsSynced());
}
}

final List<FailureReason> failureReasons = failureReasonsList(attempts);
if (!failureReasons.isEmpty()) {
metadata.put("failure_reasons", failureReasonsListAsJson(failureReasons).toString());
metadata.put("main_failure_reason", failureReasonAsJson(failureReasons.get(0)).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sort failureReasons chronologically in the FailureHelper class, so getting index 0 should be pretty reliable here, but it might make sense to explicitly sort here so that we're not relying on the assumption that these are sorted how the job tracker expects

}
}
}
return metadata.build();
}

private static List<FailureReason> failureReasonsList(final List<Attempt> attempts) {
return attempts
.stream()
.map(Attempt::getFailureSummary)
.flatMap(Optional::stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ended up being pretty clean, nice!

.map(AttemptFailureSummary::getFailures)
.flatMap(Collection::stream)
.sorted(Comparator.comparing(FailureReason::getTimestamp))
.toList();
}

private static ArrayNode failureReasonsListAsJson(final List<FailureReason> failureReasons) {
return Jsons.arrayNode().addAll(failureReasons
.stream()
.map(TrackingMetadata::failureReasonAsJson)
.toList());
}

private static JsonNode failureReasonAsJson(final FailureReason failureReason) {
// we want the json to always include failureOrigin and failureType, even when they are null
return Jsons.jsonNode(new LinkedHashMap<String, Object>() {

{
put("failureOrigin", failureReason.getFailureOrigin());
put("failureType", failureReason.getFailureType());
put("internalMessage", failureReason.getInternalMessage());
put("externalMessage", failureReason.getExternalMessage());
put("metadata", failureReason.getMetadata());
put("retryable", failureReason.getRetryable());
put("timestamp", failureReason.getTimestamp());
}

});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Metadata;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.StandardCheckConnectionOutput;
Expand All @@ -48,10 +51,13 @@
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -77,6 +83,7 @@ class JobTrackerTest {
private static final long SYNC_DURATION = 9L; // in sync between end and start time
private static final long SYNC_BYTES_SYNC = 42L;
private static final long SYNC_RECORDS_SYNC = 4L;
private static final long LONG_JOB_ID = 10L; // for sync the job id is a long not a uuid.

private static final ImmutableMap<String, Object> STARTED_STATE_METADATA = ImmutableMap.<String, Object>builder()
.put("attempt_stage", "STARTED")
Expand Down Expand Up @@ -291,6 +298,11 @@ void testTrackResetAttempt() throws ConfigNotFoundException, IOException, JsonVa
testAsynchronousAttempt(ConfigType.RESET_CONNECTION);
}

@Test
void testTrackSyncAttemptWithFailures() throws ConfigNotFoundException, IOException, JsonValidationException {
testAsynchronousAttemptWithFailures(ConfigType.SYNC, SYNC_CONFIG_METADATA);
}

@Test
void testConfigToMetadata() throws IOException {
final String configJson = MoreResources.readResource("example_config.json");
Expand Down Expand Up @@ -320,19 +332,72 @@ void testConfigToMetadata() throws IOException {
}

void testAsynchronousAttempt(final ConfigType configType) throws ConfigNotFoundException, IOException, JsonValidationException {
testAsynchronousAttempt(configType, Collections.emptyMap());
testAsynchronousAttempt(configType, getJobWithAttemptsMock(configType, LONG_JOB_ID), Collections.emptyMap());
}

void testAsynchronousAttempt(final ConfigType configType, final Map<String, Object> additionalExpectedMetadata)
throws ConfigNotFoundException, IOException, JsonValidationException {
// for sync the job id is a long not a uuid.
final long jobId = 10L;
testAsynchronousAttempt(configType, getJobWithAttemptsMock(configType, LONG_JOB_ID), additionalExpectedMetadata);
}

final ImmutableMap<String, Object> metadata = getJobMetadata(configType, jobId);
final Job job = getJobWithAttemptsMock(configType, jobId);
void testAsynchronousAttemptWithFailures(final ConfigType configType, final Map<String, Object> additionalExpectedMetadata)
throws ConfigNotFoundException, IOException, JsonValidationException {
final JsonNode configFailureJson = Jsons.jsonNode(new LinkedHashMap<String, Object>() {

{
put("failureOrigin", "source");
put("failureType", "configError");
put("internalMessage", "Internal config error error msg");
put("externalMessage", "Config error related msg");
put("metadata", ImmutableMap.of("some", "metadata"));
put("retryable", true);
put("timestamp", 1010);
}

});

final JsonNode systemFailureJson = Jsons.jsonNode(new LinkedHashMap<String, Object>() {

{
put("failureOrigin", "replication");
put("failureType", "systemError");
put("internalMessage", "Internal system error error msg");
put("externalMessage", "System error related msg");
put("metadata", ImmutableMap.of("some", "metadata"));
put("retryable", true);
put("timestamp", 1100);
}

});

final JsonNode unknownFailureJson = Jsons.jsonNode(new LinkedHashMap<String, Object>() {

{
put("failureOrigin", null);
put("failureType", null);
put("internalMessage", "Internal unknown error error msg");
put("externalMessage", "Unknown error related msg");
put("metadata", ImmutableMap.of("some", "metadata"));
put("retryable", true);
put("timestamp", 1110);
}

});

final Map<String, Object> failureMetadata = ImmutableMap.of(
"failure_reasons", Jsons.arrayNode().addAll(Arrays.asList(configFailureJson, systemFailureJson, unknownFailureJson)).toString(),
"main_failure_reason", configFailureJson.toString());
testAsynchronousAttempt(configType, getJobWithFailuresMock(configType, LONG_JOB_ID),
MoreMaps.merge(additionalExpectedMetadata, failureMetadata));
}

void testAsynchronousAttempt(final ConfigType configType, final Job job, final Map<String, Object> additionalExpectedMetadata)
throws ConfigNotFoundException, IOException, JsonValidationException {

final ImmutableMap<String, Object> metadata = getJobMetadata(configType, LONG_JOB_ID);
// test when frequency is manual.
when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true));
when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(jobId)).thenReturn(WORKSPACE_ID);
when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(LONG_JOB_ID)).thenReturn(WORKSPACE_ID);
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME));
final Map<String, Object> manualMetadata = MoreMaps.merge(
Expand Down Expand Up @@ -405,9 +470,7 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con
return job;
}

private Job getJobWithAttemptsMock(final ConfigType configType, final long jobId)
throws ConfigNotFoundException, IOException, JsonValidationException {
final Job job = getJobMock(configType, jobId);
private Attempt getAttemptMock() {
final Attempt attempt = mock(Attempt.class);
final JobOutput jobOutput = mock(JobOutput.class);
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
Expand All @@ -420,11 +483,71 @@ private Job getJobWithAttemptsMock(final ConfigType configType, final long jobId
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
when(jobOutput.getSync()).thenReturn(syncOutput);
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
when(job.getAttempts()).thenReturn(List.of(attempt));
return attempt;
}

private Job getJobWithAttemptsMock(final ConfigType configType, final long jobId)
throws ConfigNotFoundException, IOException, JsonValidationException {
return getJobWithAttemptsMock(configType, jobId, List.of(getAttemptMock()));
}

private Job getJobWithAttemptsMock(final ConfigType configType, final long jobId, final List<Attempt> attempts)
throws ConfigNotFoundException, IOException, JsonValidationException {
final Job job = getJobMock(configType, jobId);
when(job.getAttempts()).thenReturn(attempts);
when(jobPersistence.getJob(jobId)).thenReturn(job);
return job;
}

private List<Attempt> getAttemptsWithFailuresMock() {
final Attempt attemptWithSingleFailure = getAttemptMock();
final AttemptFailureSummary singleFailureSummary = mock(AttemptFailureSummary.class);
final FailureReason configFailureReason = new FailureReason()
.withFailureOrigin(FailureReason.FailureOrigin.SOURCE)
.withFailureType(FailureReason.FailureType.CONFIG_ERROR)
.withRetryable(true)
.withMetadata(new Metadata().withAdditionalProperty("some", "metadata"))
.withExternalMessage("Config error related msg")
.withInternalMessage("Internal config error error msg")
.withStacktrace("Don't include stacktrace in call to track")
.withTimestamp(SYNC_START_TIME + 10);
when(singleFailureSummary.getFailures()).thenReturn(List.of(configFailureReason));
when(attemptWithSingleFailure.getFailureSummary()).thenReturn(Optional.of(singleFailureSummary));

final Attempt attemptWithMultipleFailures = getAttemptMock();
final AttemptFailureSummary multipleFailuresSummary = mock(AttemptFailureSummary.class);
final FailureReason systemFailureReason = new FailureReason()
.withFailureOrigin(FailureReason.FailureOrigin.REPLICATION)
.withFailureType(FailureReason.FailureType.SYSTEM_ERROR)
.withRetryable(true)
.withMetadata(new Metadata().withAdditionalProperty("some", "metadata"))
.withExternalMessage("System error related msg")
.withInternalMessage("Internal system error error msg")
.withStacktrace("Don't include stacktrace in call to track")
.withTimestamp(SYNC_START_TIME + 100);
final FailureReason unknownFailureReason = new FailureReason()
.withRetryable(true)
.withMetadata(new Metadata().withAdditionalProperty("some", "metadata"))
.withExternalMessage("Unknown error related msg")
.withInternalMessage("Internal unknown error error msg")
.withStacktrace("Don't include stacktrace in call to track")
.withTimestamp(SYNC_START_TIME + 110);
when(multipleFailuresSummary.getFailures()).thenReturn(List.of(systemFailureReason, unknownFailureReason));
when(attemptWithMultipleFailures.getFailureSummary()).thenReturn(Optional.of(multipleFailuresSummary));

final Attempt attemptWithNoFailures = getAttemptMock();
when(attemptWithNoFailures.getFailureSummary()).thenReturn(Optional.empty());

// in non-test cases we shouldn't actually get failures out of order chronologically
// this is to verify that we are explicitly sorting the results with tracking failure metadata
return List.of(attemptWithMultipleFailures, attemptWithSingleFailure, attemptWithNoFailures);
}

private Job getJobWithFailuresMock(final ConfigType configType, final long jobId)
throws ConfigNotFoundException, IOException, JsonValidationException {
return getJobWithAttemptsMock(configType, jobId, getAttemptsWithFailuresMock());
}

private ImmutableMap<String, Object> getJobMetadata(final ConfigType configType, final long jobId) {
return ImmutableMap.<String, Object>builder()
.put("job_type", configType)
Expand Down