-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refresh before syncs when feature flag is on #19888
Changes from 18 commits
20f5474
87b0b72
998bd81
291db74
0093924
5c35008
ca636b2
c6fd2df
2370fdf
8302e23
c0f6bb1
8acca50
b9a869b
6d99d6c
9b013fe
f09f87b
3fc3751
825e42a
a35fc50
6362a97
ed7c615
c3faa67
6b70630
c98cd7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,43 +7,61 @@ | |
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; | ||
|
||
import datadog.trace.api.Trace; | ||
import io.airbyte.api.client.generated.SourceApi; | ||
import io.airbyte.api.client.AirbyteApiClient; | ||
import io.airbyte.api.client.invoker.generated.ApiException; | ||
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; | ||
import io.airbyte.commons.features.EnvVariableFeatureFlags; | ||
import io.airbyte.config.ActorCatalogFetchEvent; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import jakarta.inject.Singleton; | ||
import java.io.IOException; | ||
import java.time.OffsetDateTime; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
@Singleton | ||
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { | ||
|
||
private final Optional<ConfigRepository> configRepository; | ||
|
||
private final SourceApi sourceApi; | ||
private final AirbyteApiClient airbyteApiClient; | ||
private final EnvVariableFeatureFlags envVariableFeatureFlags; | ||
|
||
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) { | ||
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, | ||
AirbyteApiClient airbyteApiClient, | ||
EnvVariableFeatureFlags envVariableFeatureFlags) { | ||
this.configRepository = configRepository; | ||
this.sourceApi = sourceApi; | ||
this.airbyteApiClient = airbyteApiClient; | ||
this.envVariableFeatureFlags = envVariableFeatureFlags; | ||
} | ||
|
||
@Override | ||
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) | ||
public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException { | ||
// if job persistence is unavailable, default to skipping the schema refresh | ||
if (configRepository.isEmpty()) { | ||
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) { | ||
return false; | ||
} | ||
|
||
return !schemaRefreshRanRecently(sourceCatalogId); | ||
} | ||
|
||
@Override | ||
public void refreshSchema(UUID sourceCatalogId) throws ApiException { | ||
public void refreshSchema(UUID sourceCatalogId, UUID connectionId) throws ApiException { | ||
if (!envVariableFeatureFlags.autoDetectSchema()) { | ||
return; | ||
} | ||
|
||
SourceDiscoverSchemaRequestBody requestBody = | ||
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true); | ||
sourceApi.discoverSchemaForSource(requestBody); | ||
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId); | ||
|
||
try { | ||
airbyteApiClient.getSourceApi().discoverSchemaForSource(requestBody); | ||
} catch (final Exception e) { | ||
log.error("Attempted schema refresh, but failed with error: ", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Could be worth adding a comment explaining that we swallow the exception here to avoid blocking the replication if we fail to refresh. |
||
} | ||
} | ||
|
||
private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,22 +12,31 @@ | |
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; | ||
|
||
import datadog.trace.api.Trace; | ||
import io.airbyte.api.client.invoker.generated.ApiException; | ||
import io.airbyte.commons.temporal.scheduling.SyncWorkflow; | ||
import io.airbyte.config.NormalizationInput; | ||
import io.airbyte.config.NormalizationSummary; | ||
import io.airbyte.config.OperatorDbtInput; | ||
import io.airbyte.config.OperatorWebhookInput; | ||
import io.airbyte.config.StandardSync.Status; | ||
import io.airbyte.config.StandardSyncInput; | ||
import io.airbyte.config.StandardSyncOperation; | ||
import io.airbyte.config.StandardSyncOperation.OperatorType; | ||
import io.airbyte.config.StandardSyncOutput; | ||
import io.airbyte.config.StandardSyncSummary; | ||
import io.airbyte.config.StandardSyncSummary.ReplicationStatus; | ||
import io.airbyte.config.SyncStats; | ||
import io.airbyte.config.WebhookOperationSummary; | ||
import io.airbyte.config.persistence.ConfigNotFoundException; | ||
import io.airbyte.metrics.lib.ApmTraceUtils; | ||
import io.airbyte.persistence.job.models.IntegrationLauncherConfig; | ||
import io.airbyte.persistence.job.models.JobRunConfig; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.validation.json.JsonValidationException; | ||
import io.airbyte.workers.temporal.annotations.TemporalActivityStub; | ||
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; | ||
import io.temporal.workflow.Workflow; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.UUID; | ||
|
@@ -42,6 +51,8 @@ public class SyncWorkflowImpl implements SyncWorkflow { | |
private static final int CURRENT_VERSION = 2; | ||
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check"; | ||
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1; | ||
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIt: make sure that we have an issue to disable that after it is deployed. |
||
private static final int AUTO_DETECT_SCHEMA_VERSION = 1; | ||
|
||
@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions") | ||
private ReplicationActivity replicationActivity; | ||
|
@@ -55,6 +66,10 @@ public class SyncWorkflowImpl implements SyncWorkflow { | |
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; | ||
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") | ||
private WebhookOperationActivity webhookOperationActivity; | ||
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") | ||
private RefreshSchemaActivity refreshSchemaActivity; | ||
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") | ||
private ConfigFetchActivity configFetchActivity; | ||
|
||
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) | ||
@Override | ||
|
@@ -72,6 +87,29 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, | |
|
||
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); | ||
final String taskQueue = Workflow.getInfo().getTaskQueue(); | ||
|
||
final int autoDetectSchemaVersion = | ||
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); | ||
if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be in a single try catch? I think that the stack trace will be enough to say which step failed. |
||
final UUID sourceId = configFetchActivity.getStandardSync(connectionId).getSourceId(); | ||
if (refreshSchemaActivity.shouldRefreshSchema(sourceId)) { | ||
LOGGER.info("Refreshing source schema..."); | ||
refreshSchemaActivity.refreshSchema(sourceId, connectionId); | ||
} | ||
final Status status = configFetchActivity.getStandardSync(connectionId).getStatus(); | ||
if (Status.INACTIVE == status) { | ||
LOGGER.info("Connection is disabled. Cancelling run."); | ||
final StandardSyncOutput output = | ||
new StandardSyncOutput() | ||
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); | ||
return output; | ||
} | ||
} catch (JsonValidationException | ConfigNotFoundException | IOException | ApiException e) { | ||
LOGGER.error("An error occurred during schema refresh processing. Skipping schema refresh. ", e); | ||
gosusnp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
StandardSyncOutput syncOutput = | ||
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we inject the specific ApiClient that are needed rather than the top level one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this but was getting a micronaut bean error, so I followed how you did it in the PersistStateActivity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the time
PersistStateActivity
was added, I don't think we had the independent clients yet.The way to solve the micronaut bean error should be to add the client to the API factory:
airbyte/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java
Line 38 in 8c780bd
@benmoriceau, any specific reasons we do not have those yet other than did not have time to get to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I know of. We should add them