Skip to content

Commit

Permalink
Bmoric/update connection list with breaking (#18125)
Browse files Browse the repository at this point in the history
* add schemaChange

* merge conflict

* frontend tests

* tests

* l

* fix source catalog id

* test

* formatting

* move schema change to build backend web connection

* check if actor catalog id is different

* fix

* tests and fixes

* remove extra var

* remove logging

* continue to pass back new catalog id

* api updates

* fix mockdata

* tests

* add schemaChange

* merge conflict

* frontend tests

* tests

* l

* fix source catalog id

* test

* formatting

* move schema change to build backend web connection

* check if actor catalog id is different

* fix

* tests and fixes

* remove extra var

* remove logging

* continue to pass back new catalog id

* api updates

* fix mockdata

* tests

* tests

* optional of nullable

* Tmp

* For diff

* Add test

* More test

* Fix test and add some

* Fix merge and test

* Fix PMD

* Fix test

* Rm dead code

* Fix pmd

* Address PR comments

* RM unused column

Co-authored-by: alovew <anne@airbyte.io>
  • Loading branch information
2 people authored and nataly committed Nov 3, 2022
1 parent 786109f commit 5a1fe39
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 58 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4649,6 +4649,7 @@ components:
- destination
- status
- isSyncing
- schemaChange
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
Expand All @@ -4674,6 +4675,8 @@ components:
$ref: "#/components/schemas/JobStatus"
isSyncing:
type: boolean
schemaChange:
$ref: "#/components/schemas/SchemaChange"
WebBackendConnectionRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,22 @@ public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSo
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent);
}

public Map<UUID, ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds)
throws IOException {

return database.query(ctx -> ctx.fetch(
"""
select actor_catalog_id, actor_id from
(select actor_catalog_id, actor_id, rank() over (partition by actor_id order by created_at desc) as creation_order_rank
from public.actor_catalog_fetch_event
) table_with_rank
where creation_order_rank = 1;
"""))
.stream().map(DbConverter::buildActorCatalogFetchEvent)
.collect(Collectors.toMap(record -> record.getActorId(),
record -> record));
}

/**
* Stores source catalog information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public static ActorCatalog buildActorCatalog(final Record record) {

public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))
.withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,17 +515,17 @@ void testGetGeographyForConnection() throws IOException {
}

@Test
void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IOException, JsonValidationException {
void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOException, JsonValidationException {
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
}

OffsetDateTime now = OffsetDateTime.now();
OffsetDateTime yesterday = now.minusDays(1l);
final OffsetDateTime now = OffsetDateTime.now();
final OffsetDateTime yesterday = now.minusDays(1l);

List<ActorCatalogFetchEvent> fetchEvents = MockData.actorCatalogFetchEventsSameSource();
ActorCatalogFetchEvent fetchEvent1 = fetchEvents.get(0);
ActorCatalogFetchEvent fetchEvent2 = fetchEvents.get(1);
final List<ActorCatalogFetchEvent> fetchEvents = MockData.actorCatalogFetchEventsSameSource();
final ActorCatalogFetchEvent fetchEvent1 = fetchEvents.get(0);
final ActorCatalogFetchEvent fetchEvent2 = fetchEvents.get(1);

database.transaction(ctx -> {
insertCatalogFetchEvent(
Expand All @@ -542,13 +542,37 @@ void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IO
return null;
});

Optional<ActorCatalogFetchEvent> result =
final Optional<ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSource(fetchEvent1.getActorId());

assertEquals(fetchEvent2.getActorCatalogId(), result.get().getActorCatalogId());
}

private void insertCatalogFetchEvent(DSLContext ctx, UUID sourceId, UUID catalogId, OffsetDateTime creationDate) {
@Test
void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IOException, JsonValidationException {
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
}

database.transaction(ctx -> {
MockData.actorCatalogFetchEventsForAggregationTest().forEach(actorCatalogFetchEvent -> insertCatalogFetchEvent(
ctx,
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(),
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(),
actorCatalogFetchEvent.getCreatedAt()));

return null;
});

final Map<UUID, ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1,
MockData.SOURCE_ID_2));

assertEquals(MockData.ACTOR_CATALOG_ID_1, result.get(MockData.SOURCE_ID_1).getActorCatalogId());
assertEquals(MockData.ACTOR_CATALOG_ID_3, result.get(MockData.SOURCE_ID_2).getActorCatalogId());
}

private void insertCatalogFetchEvent(final DSLContext ctx, final UUID sourceId, final UUID catalogId, final OffsetDateTime creationDate) {
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.columns(
ACTOR_CATALOG_FETCH_EVENT.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@
import io.airbyte.protocol.models.SyncMode;
import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Data;

public class MockData {

private static final UUID WORKSPACE_ID_1 = UUID.randomUUID();
public static final UUID WORKSPACE_ID_1 = UUID.randomUUID();
private static final UUID WORKSPACE_ID_2 = UUID.randomUUID();
private static final UUID WORKSPACE_ID_3 = UUID.randomUUID();
private static final UUID WORKSPACE_CUSTOMER_ID = UUID.randomUUID();
Expand All @@ -72,8 +74,8 @@ public class MockData {
private static final UUID DESTINATION_DEFINITION_ID_2 = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID_3 = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID_4 = UUID.randomUUID();
private static final UUID SOURCE_ID_1 = UUID.randomUUID();
private static final UUID SOURCE_ID_2 = UUID.randomUUID();
public static final UUID SOURCE_ID_1 = UUID.randomUUID();
public static final UUID SOURCE_ID_2 = UUID.randomUUID();
private static final UUID SOURCE_ID_3 = UUID.randomUUID();
private static final UUID DESTINATION_ID_1 = UUID.randomUUID();
private static final UUID DESTINATION_ID_2 = UUID.randomUUID();
Expand All @@ -91,11 +93,12 @@ public class MockData {
private static final UUID SOURCE_OAUTH_PARAMETER_ID_2 = UUID.randomUUID();
private static final UUID DESTINATION_OAUTH_PARAMETER_ID_1 = UUID.randomUUID();
private static final UUID DESTINATION_OAUTH_PARAMETER_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_1 = UUID.randomUUID();
public static final UUID ACTOR_CATALOG_ID_1 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_3 = UUID.randomUUID();
public static final UUID ACTOR_CATALOG_ID_3 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_1 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_3 = UUID.randomUUID();

public static final String MOCK_SERVICE_ACCOUNT_1 = "{\n"
+ " \"type\" : \"service_account\",\n"
Expand Down Expand Up @@ -622,8 +625,8 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEvents() {
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
.withConfigHash("1395")
.withConnectorVersion("1.42.0");
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
}

Expand All @@ -643,6 +646,42 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEventsSameSource() {
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
}

@Data
public static class ActorCatalogFetchEventWithCreationDate {

private final ActorCatalogFetchEvent actorCatalogFetchEvent;
private final OffsetDateTime createdAt;

}

public static List<ActorCatalogFetchEventWithCreationDate> actorCatalogFetchEventsForAggregationTest() {
final OffsetDateTime now = OffsetDateTime.now();
final OffsetDateTime yesterday = OffsetDateTime.now().minusDays(1l);

final ActorCatalogFetchEvent actorCatalogFetchEvent1 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_1)
.withActorCatalogId(ACTOR_CATALOG_ID_1)
.withActorId(SOURCE_ID_1)
.withConfigHash("CONFIG_HASH")
.withConnectorVersion("1.0.0");
final ActorCatalogFetchEvent actorCatalogFetchEvent2 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
final ActorCatalogFetchEvent actorCatalogFetchEvent3 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_3)
.withActorCatalogId(ACTOR_CATALOG_ID_3)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
return Arrays.asList(
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent1, now),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent2, yesterday),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now));
}

public static List<WorkspaceServiceAccount> workspaceServiceAccounts() {
final WorkspaceServiceAccount workspaceServiceAccount = new WorkspaceServiceAccount()
.withWorkspaceId(WORKSPACE_ID_1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public ConnectionStateType getStateType(final ConnectionIdRequestBody connection
return Enums.convertTo(stateHandler.getState(connectionIdRequestBody).getStateType(), ConnectionStateType.class);
}

public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws IOException {
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {

// passing 'false' so that deleted connections are not included
final List<StandardSync> standardSyncs =
Expand All @@ -113,6 +114,9 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
final Map<UUID, JobRead> runningJobByConnectionId =
getRunningJobByConnectionId(standardSyncs.stream().map(StandardSync::getConnectionId).toList());

final Map<UUID, ActorCatalogFetchEvent> newestFetchEventsByActorId =
configRepository.getMostRecentActorCatalogFetchEventForSources(new ArrayList<>());

final List<WebBackendConnectionListItem> connectionItems = Lists.newArrayList();

for (final StandardSync standardSync : standardSyncs) {
Expand All @@ -122,7 +126,8 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
sourceReadById,
destinationReadById,
latestJobByConnectionId,
runningJobByConnectionId));
runningJobByConnectionId,
Optional.ofNullable(newestFetchEventsByActorId.get(standardSync.getSourceId()))));
}

return new WebBackendConnectionReadList().connections(connectionItems);
Expand Down Expand Up @@ -175,51 +180,33 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
webBackendConnectionRead.setLatestSyncJobStatus(job.getStatus());
});

SchemaChange schemaChange = getSchemaChange(connectionRead, currentSourceCatalogId);
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent =
configRepository.getMostRecentActorCatalogFetchEventForSource(connectionRead.getSourceId());

final SchemaChange schemaChange = getSchemaChange(connectionRead, currentSourceCatalogId, mostRecentFetchEvent);

webBackendConnectionRead.setSchemaChange(schemaChange);

return webBackendConnectionRead;
}

/*
* A breakingChange boolean is stored on the connectionRead object and corresponds to the boolean
* breakingChange field on the connection table. If there is not a breaking change, we still have to
* check whether there is a non-breaking schema change by fetching the most recent
* ActorCatalogFetchEvent. A new ActorCatalogFetchEvent is stored each time there is a source schema
* refresh, so if the most recent ActorCatalogFetchEvent has a different actor catalog than the
* existing actor catalog, there is a schema change.
*/
private SchemaChange getSchemaChange(ConnectionRead connectionRead, Optional<UUID> currentSourceCatalogId) throws IOException {
SchemaChange schemaChange = SchemaChange.NO_CHANGE;

if (connectionRead.getBreakingChange()) {
schemaChange = SchemaChange.BREAKING;
} else if (currentSourceCatalogId.isPresent()) {
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent =
configRepository.getMostRecentActorCatalogFetchEventForSource(connectionRead.getSourceId());

if (mostRecentFetchEvent.isPresent()) {
if (!mostRecentFetchEvent.get().getActorCatalogId().equals(currentSourceCatalogId.get())) {
schemaChange = SchemaChange.NON_BREAKING;
}
}
}

return schemaChange;
}

private WebBackendConnectionListItem buildWebBackendConnectionListItem(
final StandardSync standardSync,
final Map<UUID, SourceRead> sourceReadById,
final Map<UUID, DestinationRead> destinationReadById,
final Map<UUID, JobRead> latestJobByConnectionId,
final Map<UUID, JobRead> runningJobByConnectionId) {
final Map<UUID, JobRead> runningJobByConnectionId,
final Optional<ActorCatalogFetchEvent> latestFetchEvent)
throws JsonValidationException, ConfigNotFoundException, IOException {

final SourceRead source = sourceReadById.get(standardSync.getSourceId());
final DestinationRead destination = destinationReadById.get(standardSync.getDestinationId());
final Optional<JobRead> latestSyncJob = Optional.ofNullable(latestJobByConnectionId.get(standardSync.getConnectionId()));
final Optional<JobRead> latestRunningSyncJob = Optional.ofNullable(runningJobByConnectionId.get(standardSync.getConnectionId()));
final ConnectionRead connectionRead = connectionsHandler.getConnection(standardSync.getConnectionId());
final Optional<UUID> currentCatalogId = connectionRead == null ? Optional.empty() : Optional.ofNullable(connectionRead.getSourceCatalogId());

final SchemaChange schemaChange = getSchemaChange(connectionRead, currentCatalogId, latestFetchEvent);

final WebBackendConnectionListItem listItem = new WebBackendConnectionListItem()
.connectionId(standardSync.getConnectionId())
Expand All @@ -230,7 +217,8 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem(
.scheduleType(ApiPojoConverters.toApiConnectionScheduleType(standardSync))
.scheduleData(ApiPojoConverters.toApiConnectionScheduleData(standardSync))
.source(source)
.destination(destination);
.destination(destination)
.schemaChange(schemaChange);

listItem.setIsSyncing(latestRunningSyncJob.isPresent());

Expand All @@ -242,6 +230,34 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem(
return listItem;
}

/*
* A breakingChange boolean is stored on the connectionRead object and corresponds to the boolean
* breakingChange field on the connection table. If there is not a breaking change, we still have to
* check whether there is a non-breaking schema change by fetching the most recent
* ActorCatalogFetchEvent. A new ActorCatalogFetchEvent is stored each time there is a source schema
* refresh, so if the most recent ActorCatalogFetchEvent has a different actor catalog than the
* existing actor catalog, there is a schema change.
*/
@VisibleForTesting
SchemaChange getSchemaChange(
final ConnectionRead connectionRead,
final Optional<UUID> currentSourceCatalogId,
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent) {
if (connectionRead == null || currentSourceCatalogId.isEmpty()) {
return SchemaChange.NO_CHANGE;
}

if (connectionRead.getBreakingChange() != null && connectionRead.getBreakingChange()) {
return SchemaChange.BREAKING;
}

if (mostRecentFetchEvent.isPresent() && !mostRecentFetchEvent.map(ActorCatalogFetchEvent::getActorCatalogId).equals(currentSourceCatalogId)) {
return SchemaChange.NON_BREAKING;
}

return SchemaChange.NO_CHANGE;
}

private SourceRead getSourceRead(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceId);
return sourceHandler.getSource(sourceIdRequestBody);
Expand Down
Loading

0 comments on commit 5a1fe39

Please sign in to comment.