Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Normalization failures to Sentry #15695

Merged
merged 22 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,30 @@ public class JobErrorReporter {
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_META_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_META_KEY = "normalization_repository";
private static final String JOB_ID_KEY = "job_id";

private final ConfigRepository configRepository;
private final DeploymentMode deploymentMode;
private final String airbyteVersion;
private final String normalizationImage;
private final String normalizationVersion;
private final WebUrlHelper webUrlHelper;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final DeploymentMode deploymentMode,
final String airbyteVersion,
final String normalizationImage,
final String normalizationVersion,
final WebUrlHelper webUrlHelper,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.deploymentMode = deploymentMode;
this.airbyteVersion = airbyteVersion;
this.normalizationImage = normalizationImage;
this.normalizationVersion = normalizationVersion;
this.webUrlHelper = webUrlHelper;
this.jobErrorReportingClient = jobErrorReportingClient;
}
Expand Down Expand Up @@ -96,6 +103,21 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu
final String dockerImage = jobContext.destinationDockerImage();
final Map<String, String> metadata = MoreMaps.merge(commonMetadata, getDestinationMetadata(destinationDefinition));

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.NORMALIZATION) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
// since error could be arising from source or destination or normalization itself, we want all the
// metadata
// prefixing source keys so we don't overlap (destination as 'true' keys since normalization runs on
// the destination)
final Map<String, String> metadata = MoreMaps.merge(
commonMetadata,
getNormalizationMetadata(),
prefixConnectorMetadataKeys(getSourceMetadata(sourceDefinition), "source"),
getDestinationMetadata(destinationDefinition));
final String dockerImage = String.format("%s:%s", normalizationImage, normalizationVersion);

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
}
}
Expand Down Expand Up @@ -208,6 +230,19 @@ private Map<String, String> getSourceMetadata(final StandardSourceDefinition sou
Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()));
}

private Map<String, String> getNormalizationMetadata() {
return Map.ofEntries(
Map.entry(NORMALIZATION_REPOSITORY_META_KEY, normalizationImage));
}

private Map<String, String> prefixConnectorMetadataKeys(final Map<String, String> connectorMetadata, final String prefix) {
Map<String, String> prefixedMetadata = new HashMap<>();
for (final Map.Entry<String, String> entry : connectorMetadata.entrySet()) {
prefixedMetadata.put(String.format("%s_%s", prefix, entry.getKey()), entry.getValue());
}
return prefixedMetadata;
}

private void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@
import io.sentry.protocol.SentryStackFrame;
import io.sentry.protocol.SentryStackTrace;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SentryExceptionHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(SentryExceptionHelper.class);

public static final String ERROR_MAP_MESSAGE_KEY = "errorMessage";
public static final String ERROR_MAP_TYPE_KEY = "errorType";

public enum ERROR_MAP_KEYS {
ERROR_MAP_MESSAGE_KEY,
ERROR_MAP_TYPE_KEY
}

/**
* Processes a raw stacktrace string into structured SentryExceptions
* <p>
Expand All @@ -32,6 +47,9 @@ public Optional<List<SentryException>> buildSentryExceptions(final String stackt
if (stacktrace.contains("\tat ") && stacktrace.contains(".java")) {
return buildJavaSentryExceptions(stacktrace);
}
if (stacktrace.startsWith("AirbyteDbtError: ")) {
return buildNormalizationDbtSentryExceptions(stacktrace);
}

return Optional.empty();
}, Optional.empty());
Expand Down Expand Up @@ -166,4 +184,134 @@ private static Optional<List<SentryException>> buildJavaSentryExceptions(final S
return Optional.of(sentryExceptions);
}

private static Optional<List<SentryException>> buildNormalizationDbtSentryExceptions(final String stacktrace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how common is it that the line immediately following Database error is the "interesting" one? e.g: is it 100% of the time based on a reasonable empirical sample size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was going for a classic 80-20 coverage move here, using the following data:

all normalization-system-errors (with the new failure reasons)
1,110 rows

Filtering for 'database errors'
736 rows
Nominal 736/1110 (66%)
Unique 368/442 (83%)

694/736 (~94%) follow the pattern of line immediately following "Database Error in model" is the single useful part.

Copy link
Contributor Author

@Phlair Phlair Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some futher investigation today looking deeper at the complete set of failures we've encountered (coming from those metabase links above) and decided it's not significant extra work to cover a lot of the possible error structures based on what we've seen so far:

The other 42 errors follow 1 of 3 patterns:

  • SQL compilation error:\n (next line with detail)
  • Invalid input (a line X after this one that is "context: {}" has detail)
  • syntax error at or near "{value}" (next line with detail)

Currently implementing specific logic for these three extra edge cases so we cover all currently known database errors. (No doubt there are other edge cases that exist and we haven't encountered but we can build upon this parsing code over time.)

Also implementing logic for the rest of the error types (the other ~20% of unique errors we've seen):

  • Unhandled error
  • Compilation Error
  • Runtime Error

Basing the logic from our dataset in Metabase (in previous comment links), now covering >95% of every dbt error we've seen since implementing normalization failure reasons.

Copy link
Contributor Author

@Phlair Phlair Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the title we see in the list of incidents in sentry's incident view? if so could we change that to be more unique for each issue?

These changes will also solve this point

final List<SentryException> sentryExceptions = new ArrayList<>();

Map<ERROR_MAP_KEYS, String> usefulErrorMap = getUsefulErrorMessageAndTypeFromDbtError(stacktrace);

// if our errorMessage from the function != stacktrace then we know we've pulled out something
// useful
if (!usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY).equals(stacktrace)) {
final SentryException usefulException = new SentryException();
usefulException.setValue(usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY));
usefulException.setType(usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY));
sentryExceptions.add(usefulException);
}

if (sentryExceptions.isEmpty())
return Optional.empty();

return Optional.of(sentryExceptions);
}

public static Map<ERROR_MAP_KEYS, String> getUsefulErrorMessageAndTypeFromDbtError(String stacktrace) {
// the dbt 'stacktrace' is really just all the log messages at 'error' level, stuck together.
// therefore there is not a totally consistent structure to these,
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
// see the docs: https://docs.getdbt.com/guides/legacy/debugging-errors
// the logic below is built based on the ~450 unique dbt errors we encountered before this PR
// and is a best effort to isolate the useful part of the error logs for debugging and grouping
// and bring some semblance of exception 'types' to differentiate between errors.
Map<ERROR_MAP_KEYS, String> errorMessageAndType = new HashMap<>();
String[] stacktraceLines = stacktrace.split("\n");

boolean defaultNextLine = false;
// TODO: this whole code block is quite ugh, commented to try and make each part clear but could be
// much more readable.
mainLoop: for (int i = 0; i < stacktraceLines.length; i++) {
// This order is important due to how these errors can co-occur.
// This order attempts to keep error definitions consistent based on our observations of possible
// dbt error structures.
try {
// Database Errors
if (stacktraceLines[i].contains("Database Error in model")) {
// Database Error : SQL compilation error
if (stacktraceLines[i + 1].contains("SQL compilation error")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s %s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseSQLCompilationError");
break;
}
// Database Error: Invalid input
else if (stacktraceLines[i + 1].contains("Invalid input")) {
for (String followingLine : Arrays.copyOfRange(stacktraceLines, i + 1, stacktraceLines.length)) {
if (followingLine.trim().startsWith("context:")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s\n%s", stacktraceLines[i + 1].trim(), followingLine.trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseInvalidInputError");
break mainLoop;
}
}
}
// Database Error: Syntax error
else if (stacktraceLines[i + 1].contains("syntax error at or near \"")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s\n%s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseSyntaxError");
break;
}
// Database Error: default
else {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseError");
defaultNextLine = true;
}
}
// Unhandled Error
else if (stacktraceLines[i].contains("Unhandled error while executing model")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtUnhandledError");
defaultNextLine = true;
}
// Compilation Errors
else if (stacktraceLines[i].contains("Compilation Error")) {
// Compilation Error: Ambiguous Relation
if (stacktraceLines[i + 1].contains("When searching for a relation, dbt found an approximate match.")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s %s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtCompilationAmbiguousRelationError");
break;
}
// Compilation Error: default
else {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtCompilationError");
defaultNextLine = true;
}
}
// Runtime Errors
else if (stacktraceLines[i].contains("Runtime Error")) {
// Runtime Error: Database error
for (String followingLine : Arrays.copyOfRange(stacktraceLines, i + 1, stacktraceLines.length)) {
if ("Database Error".equals(followingLine.trim())) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s", stacktraceLines[Arrays.stream(stacktraceLines).toList().indexOf(followingLine) + 1].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtRuntimeDatabaseError");
break mainLoop;
}
}
// Runtime Error: default
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtRuntimeError");
defaultNextLine = true;
}
// Database Error: formatted differently, catch last to avoid counting other types of errors as
// Database Error
else if ("Database Error".equals(stacktraceLines[i].trim())) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseError");
defaultNextLine = true;
}
// handle the default case without repeating code
if (defaultNextLine) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY, stacktraceLines[i + 1].trim());
break;
}
} catch (final ArrayIndexOutOfBoundsException e) {
// this means our logic is slightly off, our assumption of where error lines are is incorrect
LOGGER.warn("Failed trying to parse useful error message out of dbt error, defaulting to full stacktrace");
}
}
if (errorMessageAndType.isEmpty()) {
// For anything we haven't caught, just return full stacktrace
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY, stacktrace);
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "AirbyteDbtError");
}
return errorMessageAndType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class JobErrorReporterTest {
private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection";
private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
private static final String AIRBYTE_VERSION = "0.1.40";
private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_VERSION = "0.2.18";
private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID();
private static final String SOURCE_DEFINITION_NAME = "stripe";
private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe";
Expand All @@ -53,13 +55,15 @@ class JobErrorReporterTest {
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_KEY = "failure_origin";
private static final String SOURCE = "source";
private static final String PREFIX_FORMAT_STRING = "%s_%s";
private static final String FAILURE_TYPE_KEY = "failure_type";
private static final String SYSTEM_ERROR = "system_error";
private static final String CONNECTOR_DEFINITION_ID_KEY = "connector_definition_id";
private static final String CONNECTOR_REPOSITORY_KEY = "connector_repository";
private static final String CONNECTOR_NAME_KEY = "connector_name";
private static final String CONNECTOR_RELEASE_STAGE_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_KEY = "normalization_repository";

private ConfigRepository configRepository;
private JobErrorReportingClient jobErrorReportingClient;
Expand All @@ -71,7 +75,8 @@ void setup() {
configRepository = mock(ConfigRepository.class);
jobErrorReportingClient = mock(JobErrorReportingClient.class);
webUrlHelper = mock(WebUrlHelper.class);
jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient);
jobErrorReporter = new JobErrorReporter(
configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, NORMALIZATION_IMAGE, NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient);
}

@Test
Expand All @@ -88,11 +93,16 @@ void testReportSyncJobFailure() {
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason normalizationFailureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withFailureOrigin(FailureOrigin.NORMALIZATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason nonTraceMessageFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE);
final FailureReason replicationFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.REPLICATION);

Mockito.when(mFailureSummary.getFailures())
.thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));
Mockito.when(mFailureSummary.getFailures()).thenReturn(List.of(
sourceFailureReason, destinationFailureReason, normalizationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));

final long syncJobId = 1L;
final SyncJobReportingContext jobReportingContext = new SyncJobReportingContext(
Expand Down Expand Up @@ -150,9 +160,30 @@ void testReportSyncJobFailure() {
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

final Map<String, String> expectedNormalizationMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "normalization"),
Map.entry(FAILURE_TYPE_KEY, SYSTEM_ERROR),
Map.entry(NORMALIZATION_REPOSITORY_KEY, NORMALIZATION_IMAGE),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_DEFINITION_ID_KEY), SOURCE_DEFINITION_ID.toString()),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_REPOSITORY_KEY), SOURCE_DOCKER_REPOSITORY),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_NAME_KEY), SOURCE_DEFINITION_NAME),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_RELEASE_STAGE_KEY), SOURCE_RELEASE_STAGE.toString()),
Map.entry(CONNECTOR_DEFINITION_ID_KEY, DESTINATION_DEFINITION_ID.toString()),
Map.entry(CONNECTOR_REPOSITORY_KEY, DESTINATION_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE,
expectedDestinationMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(
mWorkspace, normalizationFailureReason, String.format("%s:%s", NORMALIZATION_IMAGE, NORMALIZATION_VERSION), expectedNormalizationMetadata);
Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
}

Expand Down
Loading