Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh before syncs when feature flag is on #19888

Merged
merged 24 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

writeSyncStats(now, syncStats, attemptId, ctx);
if (syncStats != null) {
writeSyncStats(now, syncStats, attemptId, ctx);
}

if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.workers.temporal.sync.NormalizationActivity;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity;
import io.airbyte.workers.temporal.sync.PersistStateActivity;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivity;
import io.airbyte.workers.temporal.sync.ReplicationActivity;
import io.airbyte.workers.temporal.sync.WebhookOperationActivity;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -110,9 +111,11 @@ public List<Object> syncActivities(
final DbtTransformationActivity dbtTransformationActivity,
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity) {
final WebhookOperationActivity webhookOperationActivity,
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.annotation.Factory;
Expand All @@ -35,26 +37,34 @@ public class ApiClientBeanFactory {
private static final int JWT_TTL_MINUTES = 5;

@Singleton
public AirbyteApiClient airbyteApiClient(
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new AirbyteApiClient(
new io.airbyte.api.client.invoker.generated.ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
// time we send a request.
if (!airbyteApiAuthHeaderName.isBlank()) {
builder.setHeader(airbyteApiAuthHeaderName, internalApiAuthToken.get());
}
}));
public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new io.airbyte.api.client.invoker.generated.ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
// time we send a request.
if (!airbyteApiAuthHeaderName.isBlank()) {
builder.setHeader(airbyteApiAuthHeaderName, internalApiAuthToken.get());
}
});
}

@Singleton
public AirbyteApiClient airbyteApiClient(ApiClient apiClient) {
return new AirbyteApiClient(apiClient);
}

@Singleton
public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -19,6 +21,12 @@
@ActivityInterface
public interface ConfigFetchActivity {

@ActivityMethod
Optional<UUID> getSourceId(UUID connectionId);

@ActivityMethod
Optional<Status> getStatus(UUID connectionId);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,26 @@ public GetMaxAttemptOutput getMaxAttempt() {
return new GetMaxAttemptOutput(syncJobMaxAttempts);
}

@Override
public Optional<UUID> getSourceId(UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (JsonValidationException | ConfigNotFoundException | IOException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
}

@Override
public Optional<Status> getStatus(UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getStatus());
} catch (JsonValidationException | ConfigNotFoundException | IOException e) {
log.info("Encountered an error fetching the connection's status: ", e);
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.util.UUID;

@ActivityInterface
public interface RefreshSchemaActivity {

@ActivityMethod
boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException;
boolean shouldRefreshSchema(UUID sourceCatalogId);

public void refreshSchema(UUID sourceCatalogId) throws JsonValidationException, ConfigNotFoundException, IOException, ApiException;
public void refreshSchema(UUID sourceCatalogId, UUID connectionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,74 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

private final SourceApi sourceApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) {
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
SourceApi sourceApi,
EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException {
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
// if job persistence is unavailable, default to skipping the schema refresh
if (configRepository.isEmpty()) {
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

return !schemaRefreshRanRecently(sourceCatalogId);
}

@Override
public void refreshSchema(UUID sourceCatalogId) throws ApiException {
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true);
sourceApi.discoverSchemaForSource(requestBody);
}
public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return;
}

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId);

if (mostRecentFetchEvent.isEmpty()) {
return false;
try {
sourceApi.discoverSchemaForSource(requestBody);
} catch (final Exception e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.error("Attempted schema refresh, but failed with error: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could be worth adding a comment explaining that we swallow the exception here to avoid blocking the replication if we fail to refresh.

}
}

return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
try {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
if (mostRecentFetchEvent.isEmpty()) {
return false;
}
return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (IOException e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand All @@ -42,6 +47,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final int CURRENT_VERSION = 2;
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIt: make sure that we have an issue to disable that after it is deployed.

private static final int AUTO_DETECT_SCHEMA_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -55,6 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -72,6 +83,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,

final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<Status> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && Status.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
Expand All @@ -29,18 +30,21 @@
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;

static private SourceApi mSourceApi;
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;

static private RefreshSchemaActivityImpl refreshSchemaActivity;

static private final UUID SOURCE_ID = UUID.randomUUID();

@BeforeEach
void setUp() {
mSourceApi = mock(SourceApi.class);
mConfigRepository = mock(ConfigRepository.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi);
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
}

@Test
Expand Down Expand Up @@ -68,9 +72,10 @@ void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException
@Test
void testRefreshSchema() throws ApiException {
UUID sourceId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId);
UUID connectionId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId, connectionId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true);
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true).connectionId(connectionId);
verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody);
}

Expand Down
Loading