Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Normalization failures to Sentry #15695

Merged
merged 22 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,30 @@ public class JobErrorReporter {
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_META_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_META_KEY = "normalization_repository";
private static final String JOB_ID_KEY = "job_id";

private final ConfigRepository configRepository;
private final DeploymentMode deploymentMode;
private final String airbyteVersion;
private final String normalizationImage;
private final String normalizationVersion;
private final WebUrlHelper webUrlHelper;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final DeploymentMode deploymentMode,
final String airbyteVersion,
final String normalizationImage,
final String normalizationVersion,
final WebUrlHelper webUrlHelper,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.deploymentMode = deploymentMode;
this.airbyteVersion = airbyteVersion;
this.normalizationImage = normalizationImage;
this.normalizationVersion = normalizationVersion;
this.webUrlHelper = webUrlHelper;
this.jobErrorReportingClient = jobErrorReportingClient;
}
Expand Down Expand Up @@ -96,6 +103,21 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu
final String dockerImage = jobContext.destinationDockerImage();
final Map<String, String> metadata = MoreMaps.merge(commonMetadata, getDestinationMetadata(destinationDefinition));

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.NORMALIZATION) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
// since error could be arising from source or destination or normalization itself, we want all the
// metadata
// prefixing source keys so we don't overlap (destination as 'true' keys since normalization runs on
// the destination)
final Map<String, String> metadata = MoreMaps.merge(
commonMetadata,
getNormalizationMetadata(),
prefixConnectorMetadataKeys(getSourceMetadata(sourceDefinition), "source"),
getDestinationMetadata(destinationDefinition));
final String dockerImage = String.format("%s:%s", normalizationImage, normalizationVersion);

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
}
}
Expand Down Expand Up @@ -208,6 +230,19 @@ private Map<String, String> getSourceMetadata(final StandardSourceDefinition sou
Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()));
}

private Map<String, String> getNormalizationMetadata() {
return Map.ofEntries(
Map.entry(NORMALIZATION_REPOSITORY_META_KEY, normalizationImage));
}

private Map<String, String> prefixConnectorMetadataKeys(final Map<String, String> connectorMetadata, final String prefix) {
Map<String, String> prefixedMetadata = new HashMap<>();
for (final Map.Entry<String, String> entry : connectorMetadata.entrySet()) {
prefixedMetadata.put(String.format("%s_%s", prefix, entry.getKey()), entry.getValue());
}
return prefixedMetadata;
}

private void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public Optional<List<SentryException>> buildSentryExceptions(final String stackt
if (stacktrace.contains("\tat ") && stacktrace.contains(".java")) {
return buildJavaSentryExceptions(stacktrace);
}
if (stacktrace.startsWith("AirbyteDbtError: ")) {
return buildNormalizationDbtSentryExceptions(stacktrace);
}

return Optional.empty();
}, Optional.empty());
Expand Down Expand Up @@ -166,4 +169,43 @@ private static Optional<List<SentryException>> buildJavaSentryExceptions(final S
return Optional.of(sentryExceptions);
}

private static Optional<List<SentryException>> buildNormalizationDbtSentryExceptions(final String stacktrace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how common is it that the line immediately following Database error is the "interesting" one? e.g: is it 100% of the time based on a reasonable empirical sample size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was going for a classic 80-20 coverage move here, using the following data:

all normalization-system-errors (with the new failure reasons)
1,110 rows

Filtering for 'database errors'
736 rows
Nominal 736/1110 (66%)
Unique 368/442 (83%)

694/736 (~94%) follow the pattern of line immediately following "Database Error in model" is the single useful part.

Copy link
Contributor Author

@Phlair Phlair Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some futher investigation today looking deeper at the complete set of failures we've encountered (coming from those metabase links above) and decided it's not significant extra work to cover a lot of the possible error structures based on what we've seen so far:

The other 42 errors follow 1 of 3 patterns:

  • SQL compilation error:\n (next line with detail)
  • Invalid input (a line X after this one that is "context: {}" has detail)
  • syntax error at or near "{value}" (next line with detail)

Currently implementing specific logic for these three extra edge cases so we cover all currently known database errors. (No doubt there are other edge cases that exist and we haven't encountered but we can build upon this parsing code over time.)

Also implementing logic for the rest of the error types (the other ~20% of unique errors we've seen):

  • Unhandled error
  • Compilation Error
  • Runtime Error

Basing the logic from our dataset in Metabase (in previous comment links), now covering >95% of every dbt error we've seen since implementing normalization failure reasons.

Copy link
Contributor Author

@Phlair Phlair Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the title we see in the list of incidents in sentry's incident view? if so could we change that to be more unique for each issue?

These changes will also solve this point

final List<SentryException> sentryExceptions = new ArrayList<>();

// focus on Database Errors (these are the common errors seen in Airbyte)
final String databaseErrorIdentifier = "Database Error in model";
// for other dbt error types, we'll default to non-parsed stacktrace grouping
if (!stacktrace.contains(databaseErrorIdentifier)) {
return Optional.empty();
}

final String[] separatedStackTrace = stacktrace.split("\n");

// first let's get our useful error line for grouping
String usefulError = "";
boolean nextLine = false;
for (String errorLine : separatedStackTrace) {
// previous line was "Database Error..." so this is our useful message line
if (nextLine) {
usefulError = errorLine;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading this I think we're only building one SentryException from the stack trace, though I see the dbt stacks could have multiple errors (e.g. error 1 of 2 counts) - is there any value in building a SentryException for each of those?

Copy link
Contributor Author

@Phlair Phlair Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this but decided to scrap it because:

  • the dbt error logs are json formatted and do sometimes have an identifier (.node_info.unique_identifier), however this isn't present in every error log line so we can't rely on that for grouping.
  • we also can't rely on simple order (for example look at the stacktrace in this test event). We could get clever and parse structures that are ordered but this would then still only cover the errors that are structured in a specific way when unfortunately they're wildly variable (e.g. check out all these with different structures).

side note: I will be pushing up some changes soon to cover grabbing a useful message from a wider array of the error structures but for now going to just identify events based on the first error message for simplicity. Will create an issue for incremental improvement on this to handle multiple exceptions.

}
if (errorLine.contains("Database Error in model")) {
nextLine = true;
}
}

if (!"".equals(usefulError)) {
final SentryException usefulException = new SentryException();
usefulException.setValue(usefulError);
usefulException.setType("DbtDatabaseError");
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
sentryExceptions.add(usefulException);
}

if (sentryExceptions.isEmpty())
return Optional.empty();

return Optional.of(sentryExceptions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class JobErrorReporterTest {
private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection";
private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
private static final String AIRBYTE_VERSION = "0.1.40";
private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_VERSION = "0.2.18";
private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID();
private static final String SOURCE_DEFINITION_NAME = "stripe";
private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe";
Expand All @@ -60,6 +62,7 @@ class JobErrorReporterTest {
private static final String CONNECTOR_NAME_KEY = "connector_name";
private static final String CONNECTOR_RELEASE_STAGE_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_KEY = "normalization_repository";

private ConfigRepository configRepository;
private JobErrorReportingClient jobErrorReportingClient;
Expand All @@ -71,7 +74,8 @@ void setup() {
configRepository = mock(ConfigRepository.class);
jobErrorReportingClient = mock(JobErrorReportingClient.class);
webUrlHelper = mock(WebUrlHelper.class);
jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient);
jobErrorReporter = new JobErrorReporter(
configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, NORMALIZATION_IMAGE, NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient);
}

@Test
Expand All @@ -88,11 +92,16 @@ void testReportSyncJobFailure() {
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason normalizationFailureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withFailureOrigin(FailureOrigin.NORMALIZATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason nonTraceMessageFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE);
final FailureReason replicationFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.REPLICATION);

Mockito.when(mFailureSummary.getFailures())
.thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));
Mockito.when(mFailureSummary.getFailures()).thenReturn(List.of(
sourceFailureReason, destinationFailureReason, normalizationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));

final long syncJobId = 1L;
final SyncJobReportingContext jobReportingContext = new SyncJobReportingContext(
Expand Down Expand Up @@ -150,9 +159,30 @@ void testReportSyncJobFailure() {
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

final Map<String, String> expectedNormalizationMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "normalization"),
Map.entry(FAILURE_TYPE_KEY, SYSTEM_ERROR),
Map.entry(NORMALIZATION_REPOSITORY_KEY, NORMALIZATION_IMAGE),
Map.entry(String.format("%s_%s", "source", CONNECTOR_DEFINITION_ID_KEY), SOURCE_DEFINITION_ID.toString()),
Map.entry(String.format("%s_%s", "source", CONNECTOR_REPOSITORY_KEY), SOURCE_DOCKER_REPOSITORY),
Map.entry(String.format("%s_%s", "source", CONNECTOR_NAME_KEY), SOURCE_DEFINITION_NAME),
Map.entry(String.format("%s_%s", "source", CONNECTOR_RELEASE_STAGE_KEY), SOURCE_RELEASE_STAGE.toString()),
Map.entry(CONNECTOR_DEFINITION_ID_KEY, DESTINATION_DEFINITION_ID.toString()),
Map.entry(CONNECTOR_REPOSITORY_KEY, DESTINATION_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE,
expectedDestinationMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(
mWorkspace, normalizationFailureReason, String.format("%s:%s", NORMALIZATION_IMAGE, NORMALIZATION_VERSION), expectedNormalizationMetadata);
Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,29 @@ void testBuildSentryExceptionsJavaMultilineValue() {
FUNCTION, "receiveErrorResponse")));
}

@Test
void testBuildSentryExceptionsNormalizationDbtDatabaseError() {
final String stacktrace =
"""
AirbyteDbtError:\s
1 of 1 ERROR creating table model public.midaug_start_users............................................................. [ERROR in 0.24s]
Database Error in model midaug_start_users (models/generated/airbyte_incremental/public/midaug_start_users.sql)
1292 (22007): Truncated incorrect DOUBLE value: 'ABC'
compiled SQL at ../build/run/airbyte_utils/models/generated/airbyte_incremental/public/midaug_start_users.sql
1 of 1 ERROR creating table model public.midaug_start_users............................................................. [ERROR in 0.24s]
Database Error in model midaug_start_users (models/generated/airbyte_incremental/public/midaug_start_users.sql)
1292 (22007): Truncated incorrect DOUBLE value: 'ABC'
compiled SQL at ../build/run/airbyte_utils/models/generated/airbyte_incremental/public/midaug_start_users.sql
""";

final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
Assertions.assertTrue(optionalSentryExceptions.isPresent());
final List<SentryException> exceptionList = optionalSentryExceptions.get();
Assertions.assertEquals(1, exceptionList.size());

assertExceptionContent(exceptionList.get(0), "DbtDatabaseError", " 1292 (22007): Truncated incorrect DOUBLE value: 'ABC'", List.of());
}

private void assertExceptionContent(final SentryException exception,
final String type,
final String value,
Expand Down
3 changes: 3 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -208,6 +209,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
configRepository,
configs.getDeploymentMode(),
configs.getAirbyteVersionOrWarning(),
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME,
NormalizationRunnerFactory.NORMALIZATION_VERSION,
webUrlHelper,
jobErrorReportingClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
Expand Down Expand Up @@ -477,6 +478,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
configRepository,
configs.getDeploymentMode(),
configs.getAirbyteVersionOrWarning(),
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME,
NormalizationRunnerFactory.NORMALIZATION_VERSION,
webUrlHelper,
jobErrorReportingClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
private Map<Type, List<AirbyteMessage>> airbyteMessagesByType;
private String dbtErrorStack;

private Process process = null;

Expand Down Expand Up @@ -154,7 +155,7 @@ private boolean runProcess(final String jobId,
.collect(Collectors.groupingBy(AirbyteMessage::getType));

// picks up error logs from dbt
String dbtErrorStack = String.join("\n\t", streamFactory.getDbtErrors());
dbtErrorStack = String.join("\n", streamFactory.getDbtErrors());

if (!"".equals(dbtErrorStack)) {
AirbyteMessage dbtTraceMessage = new AirbyteMessage()
Expand All @@ -165,8 +166,11 @@ private boolean runProcess(final String jobId,
.withError(new AirbyteErrorTraceMessage()
.withFailureType(FailureType.SYSTEM_ERROR) // TODO: decide on best FailureType for this
.withMessage("Normalization failed during the dbt run. This may indicate a problem with the data itself.")
.withInternalMessage(dbtErrorStack)
.withStackTrace(dbtErrorStack)));
.withInternalMessage(buildInternalErrorMessageFromDbtStackTrace())
// due to the lack of consistent defining features in dbt errors we're injecting a breadcrumb to the
// stacktrace so we can confidently identify all dbt errors when parsing and sending to Sentry
// see dbt error examples: https://docs.getdbt.com/guides/legacy/debugging-errors for more context
.withStackTrace("AirbyteDbtError: \n".concat(dbtErrorStack))));

airbyteMessagesByType.putIfAbsent(Type.TRACE, List.of(dbtTraceMessage));
}
Expand Down Expand Up @@ -206,6 +210,24 @@ public Stream<AirbyteTraceMessage> getTraceMessages() {
return Stream.empty();
}

private String buildInternalErrorMessageFromDbtStackTrace() {
// Most dbt errors we see in Airbyte are `Database Errors`
// The relevant error message is often the line following the "Database Error..." line
// e.g. "Column 10 in UNION ALL has incompatible types: DATETIME, TIMESTAMP"
boolean nextLine = false;
for (String errorLine : streamFactory.getDbtErrors()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way to DRY this with the method in SentryExceptionHelper?

// previous line was "Database Error..." so this is our useful message line
if (nextLine) {
return errorLine;
}
if (errorLine.contains("Database Error in model")) {
nextLine = true;
}
}
// Not all errors are Database Errors, for other types, we just return the stacktrace
return dbtErrorStack;
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ private Stream<AirbyteMessage> filterOutAndHandleNonAirbyteMessageLines(JsonNode
try (final var mdcScope = containerLogMdcBuilder.build()) {
switch (logLevel) {
case "debug" -> logger.debug(logMsg);
case "info" -> logger.info(logMsg);
case "warn" -> logger.warn(logMsg);
case "error" -> logAndCollectErrorMessage(logMsg);
default -> logger.info(jsonLine.asText()); // this shouldn't happen but logging it to avoid hiding unexpected lines.
default -> logger.info(logMsg); // this shouldn't happen but logging it to avoid hiding unexpected lines.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it would render jsonLines differently than the current impl -- what's the upside to doing it this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did have some wider changes in this part and missed changing this bit back, reverted.

}
}
} catch (final Exception e) {
Expand All @@ -106,9 +105,9 @@ private Stream<AirbyteMessage> filterOutAndHandleNonAirbyteMessageLines(JsonNode
return m.stream();
}

private void logAndCollectErrorMessage(String logMessage) {
logger.error(logMessage);
dbtErrors.add(logMessage);
private void logAndCollectErrorMessage(String logMsg) {
logger.error(logMsg);
dbtErrors.add(logMsg);
}

public List<String> getDbtErrors() {
Expand Down