Skip to content

Commit

Permalink
Add a feature flag for refresh schema period (#7152)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Jun 12, 2023
1 parent 77c5fe7 commit d2ff84e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 2 deletions.
2 changes: 2 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ object ConnectorVersionOverride : Permanent<String>(key = "connectors.versionOve

object HandleStreamStatus : Temporary<Boolean>(key = "handle.stream.status", default = false)

object RefreshSchemaPeriod : Temporary<Int>(key= "refreshSchema.period.hours", default = 24)

// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed.
object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES") {
override fun enabled(ctx: Context): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.RefreshSchemaPeriod;
import io.airbyte.featureflag.ShouldRunRefreshSchema;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand Down Expand Up @@ -141,7 +142,14 @@ private boolean schemaRefreshRanRecently(final UUID sourceCatalogId) {
if (mostRecentFetchEvent.getUpdatedAt() == null) {
return false;
}
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24L).toEpochSecond();
final UUID workspaceId = AirbyteApiClient.retryWithJitter(
() -> sourceApi.getSource(sourceIdRequestBody).getWorkspaceId(),
"Retrieve Id of the workspace for the source");
int refreshPeriod = 24;
if (workspaceId != null) {
refreshPeriod = featureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(workspaceId));
}
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(refreshPeriod).toEpochSecond();
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
// catching this exception because we don't want to block replication due to a failed schema refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import io.airbyte.api.client.model.generated.SourceAutoPropagateChange;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRead;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceRead;
import io.airbyte.api.client.model.generated.StreamDescriptor;
import io.airbyte.api.client.model.generated.StreamTransform;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.RefreshSchemaPeriod;
import io.airbyte.featureflag.ShouldRunRefreshSchema;
import io.airbyte.featureflag.TestClient;
import io.airbyte.featureflag.Workspace;
Expand All @@ -53,9 +55,10 @@ class RefreshSchemaActivityTest {
private RefreshSchemaActivityImpl refreshSchemaActivity;

private static final UUID SOURCE_ID = UUID.randomUUID();
private static final UUID WORKSPACE_ID = UUID.randomUUID();

@BeforeEach
void setUp() {
void setUp() throws ApiException {
mSourceApi = mock(SourceApi.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
Expand All @@ -76,17 +79,31 @@ void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException {
final Long twoDaysAgo = OffsetDateTime.now().minusHours(48L).toEpochSecond();
final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
when(mSourceApi.getSource(any())).thenReturn(new SourceRead().workspaceId(WORKSPACE_ID));
when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(24);
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException {
final Long twelveHoursAgo = OffsetDateTime.now().minusHours(12L).toEpochSecond();
final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo);
when(mSourceApi.getSource(any())).thenReturn(new SourceRead().workspaceId(WORKSPACE_ID));
when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(24);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshLessThanValueFromFF() throws ApiException {
final Long twelveHoursAgo = OffsetDateTime.now().minusHours(12L).toEpochSecond();
final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo);
when(mSourceApi.getSource(any())).thenReturn(new SourceRead().workspaceId(WORKSPACE_ID));
when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(10);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testRefreshSchema() throws ApiException {
final UUID sourceId = UUID.randomUUID();
Expand Down
2 changes: 2 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ flags:
serve: true
- name: handle.stream.status
serve: false
- name: refreshSchema.period.hours
serve: 24

0 comments on commit d2ff84e

Please sign in to comment.