Skip to content

Commit

Permalink
Add new actorCatalogWithUpdatedAt endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew authored Jan 5, 2023
1 parent 6ec3967 commit bb84fac
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 31 deletions.
33 changes: 33 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/most_recent_source_actor_catalog:
post:
tags:
- source
summary: Get most recent ActorCatalog for source
operationId: getMostRecentSourceActorCatalog
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ActorCatalogWithUpdatedAt"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/search:
post:
tags:
Expand Down Expand Up @@ -3692,6 +3715,16 @@ components:
properties:
logType:
$ref: "#/components/schemas/LogType"
# ACTOR CATALOG
ActorCatalogWithUpdatedAt:
description: A source actor catalog with the timestamp it was mostly recently updated
type: object
properties:
updatedAt:
type: integer
format: int64
catalog:
type: object
# SCHEMA CATALOG
AirbyteCatalog:
description: describes the available schema (catalog).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"$schema": http://json-schema.org/draft-07/schema#
title: ActorCatalogWithUpdatedAt
description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp.
type: object
additionalProperties: false
required:
- id
- catalog
- catalogHash
- updatedAt
properties:
id:
type: string
format: uuid
catalog:
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalogHash:
type: string
updatedAt:
type: integer
format: int64
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
Expand Down Expand Up @@ -1314,6 +1315,16 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalogWithUpdatedAt> getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT)
.from(ACTOR_CATALOG)
.join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt);
}

public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
.from(ACTOR_CATALOG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
Expand Down Expand Up @@ -218,6 +219,14 @@ public static ActorCatalog buildActorCatalog(final Record record) {
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) {
return new ActorCatalogWithUpdatedAt()
.withId(record.get(ACTOR_CATALOG.ID))
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH))
.withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC));
}

public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.server.apis;

import io.airbyte.api.generated.SourceApi;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
Expand Down Expand Up @@ -66,6 +67,11 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody));
}

@Override
public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody));
}

@Override
public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.SourceCloneConfiguration;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
Expand All @@ -31,6 +32,7 @@
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;

Expand Down Expand Up @@ -132,6 +134,17 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody)
return buildSourceRead(sourceIdRequestBody.getSourceId());
}

public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody)
throws IOException {
Optional<io.airbyte.config.ActorCatalogWithUpdatedAt> actorCatalog =
configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId());
if (actorCatalog.isEmpty()) {
return new ActorCatalogWithUpdatedAt();
} else {
return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog());
}
}

public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {
// read source configuration from db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class WebBackendConnectionsHandlerTest {
private SchedulerHandler schedulerHandler;
private StateHandler stateHandler;
private WebBackendConnectionsHandler wbHandler;

private SourceRead sourceRead;
private ConnectionRead connectionRead;
private ConnectionRead brokenConnectionRead;
Expand Down Expand Up @@ -1090,6 +1089,7 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep
new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId));

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());

when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize(
"{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))));
when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,33 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

private final SourceApi sourceApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
SourceApi sourceApi,
public RefreshSchemaActivityImpl(SourceApi sourceApi,
EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
// if job persistence is unavailable, default to skipping the schema refresh
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

Expand All @@ -66,12 +60,13 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
try {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
if (mostRecentFetchEvent.isEmpty()) {
SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId);
ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody);
if (mostRecentFetchEvent.getUpdatedAt() == null) {
return false;
}
return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (IOException e) {
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (ApiException e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@

package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -29,7 +27,6 @@
@ExtendWith(MockitoExtension.class)
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;
static private SourceApi mSourceApi;
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;

Expand All @@ -40,32 +37,31 @@ class RefreshSchemaActivityTest {
@BeforeEach
void setUp() {
mSourceApi = mock(SourceApi.class);
mConfigRepository = mock(ConfigRepository.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags);
}

@Test
void testShouldRefreshSchemaNoRecentRefresh() throws IOException {
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty());
void testShouldRefreshSchemaNoRecentRefresh() throws ApiException {
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt());
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException {
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException {
Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond();
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo);
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException {
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException {
Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond();
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo);
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

Expand Down
Loading

0 comments on commit bb84fac

Please sign in to comment.