Skip to content

Commit

Permalink
Bmoric/right error for refresh (#22471)
Browse files Browse the repository at this point in the history
* Get a better failure reason for the refresh schema error

* Use share empty stats

* Format

* Add missing import

* fix pmd
  • Loading branch information
benmoriceau authored Feb 7, 2023
1 parent 3d2a995 commit f9939c7
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 28 deletions.
1 change: 1 addition & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4167,6 +4167,7 @@ components:
- config_error
- system_error
- manual_cancellation
- refresh_schema
AttemptStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ properties:
- config_error
- system_error
- manual_cancellation
- refresh_schema
internalMessage:
description: Human readable failure description for consumption by technical system operators, like Airbyte engineers or OSS users.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

package io.airbyte.workers.temporal.scheduling;

import static io.airbyte.workers.temporal.sync.SyncOutputProvider.EMPTY_FAILED_SYNC;

import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.helper.FailureHelper;
import java.util.List;
Expand Down Expand Up @@ -58,19 +58,7 @@ public StandardSyncOutput buildFailureOutput() {
}

final StandardSyncOutput syncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
.withStatus(StandardSyncSummary.ReplicationStatus.FAILED)
.withStartTime(System.currentTimeMillis())
.withEndTime(System.currentTimeMillis())
.withRecordsSynced(0L)
.withBytesSynced(0L)
.withTotalStats(new SyncStats()
.withRecordsEmitted(0L)
.withBytesEmitted(0L)
.withSourceStateMessagesEmitted(0L)
.withDestinationStateMessagesEmitted(0L)
.withRecordsCommitted(0L)));
.withStandardSyncSummary(EMPTY_FAILED_SYNC);

if (failureOutput.getFailureReason() != null) {
syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import java.util.List;

public class SyncOutputProvider {

public final static StandardSyncSummary EMPTY_FAILED_SYNC = new StandardSyncSummary()
.withStatus(StandardSyncSummary.ReplicationStatus.FAILED)
.withStartTime(System.currentTimeMillis())
.withEndTime(System.currentTimeMillis())
.withRecordsSynced(0L)
.withBytesSynced(0L)
.withTotalStats(new SyncStats()
.withRecordsEmitted(0L)
.withBytesEmitted(0L)
.withSourceStateMessagesEmitted(0L)
.withDestinationStateMessagesEmitted(0L)
.withRecordsCommitted(0L));

public static StandardSyncOutput getRefreshSchemaFailure(final Exception e) {
return new StandardSyncOutput()
.withFailures(List.of(new FailureReason()
.withFailureType(FailureReason.FailureType.REFRESH_SCHEMA)
.withFailureOrigin(FailureReason.FailureOrigin.SOURCE)
.withExternalMessage("Failed to detect if there is a schema change. If the error persist please contact the support team.")
.withInternalMessage("Failed to launch the refresh schema activity because of: " + e.getMessage())
.withStacktrace(e.toString())))
.withStandardSyncSummary(EMPTY_FAILED_SYNC);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
try {
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
} catch (final Exception e) {
return SyncOutputProvider.getRefreshSchemaFailure(e);
}
}

final Optional<ConnectionStatus> status = configFetchActivity.getStatus(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.*;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -181,6 +170,7 @@ void setUp() {
.build();
discoveryActivityOptions = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(360))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();

final BeanIdentifier longActivitiesBeanIdentifier = mock(BeanIdentifier.class);
Expand Down Expand Up @@ -418,6 +408,18 @@ void testSkipReplicationAfterRefreshSchema() {
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.CANCELLED);
}

@Test
void testGetProperFailureIfRefreshFails() {
when(refreshSchemaActivity.shouldRefreshSchema(any())).thenReturn(true);
doThrow(new RuntimeException())
.when(refreshSchemaActivity).refreshSchema(any(), any());
final StandardSyncOutput output = execute();
assertEquals(output.getStandardSyncSummary().getStatus(), ReplicationStatus.FAILED);
assertEquals(output.getFailures().size(), 1);
assertEquals(output.getFailures().get(0).getFailureOrigin(), FailureReason.FailureOrigin.SOURCE);
assertEquals(output.getFailures().get(0).getFailureType(), FailureReason.FailureType.REFRESH_SCHEMA);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private void cancelWorkflow() {
final WorkflowServiceBlockingStub temporalService = testEnv.getWorkflowService().blockingStub();
Expand Down

0 comments on commit f9939c7

Please sign in to comment.