Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh before syncs when feature flag is on #19888

Merged
merged 24 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

writeSyncStats(now, syncStats, attemptId, ctx);
if (syncStats != null) {
writeSyncStats(now, syncStats, attemptId, ctx);
}

if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.workers.temporal.sync.NormalizationActivity;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity;
import io.airbyte.workers.temporal.sync.PersistStateActivity;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivity;
import io.airbyte.workers.temporal.sync.ReplicationActivity;
import io.airbyte.workers.temporal.sync.WebhookOperationActivity;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -110,9 +111,11 @@ public List<Object> syncActivities(
final DbtTransformationActivity dbtTransformationActivity,
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity) {
final WebhookOperationActivity webhookOperationActivity,
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -57,6 +58,29 @@ public AirbyteApiClient airbyteApiClient(
}));
}

@Singleton
public SourceApi sourceApi(
gosusnp marked this conversation as resolved.
Show resolved Hide resolved
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new SourceApi(
new io.airbyte.api.client.invoker.generated.ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
// time we send a request.
if (!airbyteApiAuthHeaderName.isBlank()) {
builder.setHeader(airbyteApiAuthHeaderName, internalApiAuthToken.get());
}
}));
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public interface RefreshSchemaActivity {
@ActivityMethod
boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException;

public void refreshSchema(UUID sourceCatalogId) throws JsonValidationException, ConfigNotFoundException, IOException, ApiException;
public void refreshSchema(UUID sourceCatalogId, UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException, ApiException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,58 @@
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

private final SourceApi sourceApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) {
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
SourceApi sourceApi,
EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException {
// if job persistence is unavailable, default to skipping the schema refresh
if (configRepository.isEmpty()) {
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

return !schemaRefreshRanRecently(sourceCatalogId);
}

@Override
public void refreshSchema(UUID sourceCatalogId) throws ApiException {
public void refreshSchema(UUID sourceCatalogId, UUID connectionId) throws ApiException {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return;
}

SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true);
sourceApi.discoverSchemaForSource(requestBody);
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId);

try {
sourceApi.discoverSchemaForSource(requestBody);
} catch (final Exception e) {
log.error("Attempted schema refresh, but failed with error: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could be worth adding a comment explaining that we swallow the exception here to avoid blocking the replication if we fail to refresh.

}
}

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,31 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -42,6 +51,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final int CURRENT_VERSION = 2;
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIt: make sure that we have an issue to disable that after it is deployed.

private static final int AUTO_DETECT_SCHEMA_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -55,6 +66,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -72,6 +87,29 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,

final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);
if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a single try catch? I think that the stack trace will be enough to say which step failed.

final UUID sourceId = configFetchActivity.getStandardSync(connectionId).getSourceId();
if (refreshSchemaActivity.shouldRefreshSchema(sourceId)) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId, connectionId);
}
final Status status = configFetchActivity.getStandardSync(connectionId).getStatus();
if (Status.INACTIVE == status) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
} catch (JsonValidationException | ConfigNotFoundException | IOException | ApiException e) {
LOGGER.error("An error occurred during schema refresh processing. Skipping schema refresh. ", e);
gosusnp marked this conversation as resolved.
Show resolved Hide resolved
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
Expand All @@ -29,18 +30,21 @@
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;

static private SourceApi mSourceApi;
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;

static private RefreshSchemaActivityImpl refreshSchemaActivity;

static private final UUID SOURCE_ID = UUID.randomUUID();

@BeforeEach
void setUp() {
mSourceApi = mock(SourceApi.class);
mConfigRepository = mock(ConfigRepository.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi);
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
}

@Test
Expand Down Expand Up @@ -68,9 +72,10 @@ void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException
@Test
void testRefreshSchema() throws ApiException {
UUID sourceId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId);
UUID connectionId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId, connectionId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true);
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true).connectionId(connectionId);
verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody);
}

Expand Down
Loading