Skip to content

Commit

Permalink
Persist geography updates (#18501)
Browse files Browse the repository at this point in the history
* persist geography in connection update

* use ApiPojoConverters

* add test coverage for persisting geography update

* persist geography column for workspace

* use US instead of AUTO in mock data so that tests don't pass due to the database-level default column value

* use ApiPojoConverters instead of Enum.convertTo in workspace handler

* format
  • Loading branch information
pmossman authored Oct 26, 2022
1 parent 9e612b0 commit 5d4b564
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,9 @@ private void writeStandardWorkspace(final List<StandardWorkspace> configs, final
.set(WORKSPACE.NOTIFICATIONS, JSONB.valueOf(Jsons.serialize(standardWorkspace.getNotifications())))
.set(WORKSPACE.FIRST_SYNC_COMPLETE, standardWorkspace.getFirstCompletedSync())
.set(WORKSPACE.FEEDBACK_COMPLETE, standardWorkspace.getFeedbackDone())
.set(WORKSPACE.GEOGRAPHY, Enums.toEnum(
standardWorkspace.getDefaultGeography().value(),
io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow())
.set(WORKSPACE.UPDATED_AT, timestamp)
.set(WORKSPACE.WEBHOOK_OPERATION_CONFIGS, standardWorkspace.getWebhookOperationConfigs() == null ? null
: JSONB.valueOf(Jsons.serialize(standardWorkspace.getWebhookOperationConfigs())))
Expand All @@ -791,6 +794,9 @@ private void writeStandardWorkspace(final List<StandardWorkspace> configs, final
.set(WORKSPACE.FEEDBACK_COMPLETE, standardWorkspace.getFeedbackDone())
.set(WORKSPACE.CREATED_AT, timestamp)
.set(WORKSPACE.UPDATED_AT, timestamp)
.set(WORKSPACE.GEOGRAPHY, Enums.toEnum(
standardWorkspace.getDefaultGeography().value(),
io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow())
.set(WORKSPACE.WEBHOOK_OPERATION_CONFIGS, standardWorkspace.getWebhookOperationConfigs() == null ? null
: JSONB.valueOf(Jsons.serialize(standardWorkspace.getWebhookOperationConfigs())))
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public static List<StandardWorkspace> standardWorkspaces() {
.withNotifications(Collections.singletonList(notification))
.withFirstCompletedSync(true)
.withFeedbackDone(true)
.withDefaultGeography(Geography.AUTO)
.withDefaultGeography(Geography.US)
.withWebhookOperationConfigs(Jsons.jsonNode(
new WebhookOperationConfigs().withWebhookConfigs(List.of(new WebhookConfig().withId(WEBHOOK_CONFIG_ID).withName("name")))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn
if (patch.getResourceRequirements() != null) {
sync.setResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(patch.getResourceRequirements()));
}

if (patch.getGeography() != null) {
sync.setGeography(ApiPojoConverters.toPersistenceGeography(patch.getGeography()));
}
}

private void validateConnectionPatch(final WorkspaceHelper workspaceHelper, final StandardSync persistedSync, final ConnectionUpdate patch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.notification.NotificationClient;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.converters.NotificationConverter;
import io.airbyte.server.converters.WorkspaceWebhookConfigsConverter;
import io.airbyte.server.errors.IdNotFoundKnownException;
Expand Down Expand Up @@ -301,8 +302,7 @@ private void applyPatchToStandardWorkspace(final StandardWorkspace workspace, fi
workspace.setNotifications(NotificationConverter.toConfigList(workspacePatch.getNotifications()));
}
if (workspacePatch.getDefaultGeography() != null) {
workspace.setDefaultGeography(
Enums.convertTo(workspacePatch.getDefaultGeography(), io.airbyte.config.Geography.class));
workspace.setDefaultGeography(ApiPojoConverters.toPersistenceGeography(workspacePatch.getDefaultGeography()));
}
if (workspacePatch.getWebhookConfigs() != null) {
workspace.setWebhookOperationConfigs(WorkspaceWebhookConfigsConverter.toPersistenceWrite(workspacePatch.getWebhookConfigs(), uuidSupplier));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ void testUpdateConnectionPatchingSeveralFieldsAndReplaceAStream() throws JsonVal
.syncCatalog(catalogForUpdate)
.resourceRequirements(resourceRequirements)
.sourceCatalogId(newSourceCatalogId)
.operationIds(List.of(operationId, otherOperationId));
.operationIds(List.of(operationId, otherOperationId))
.geography(io.airbyte.api.model.generated.Geography.EU);

final ConfiguredAirbyteCatalog expectedPersistedCatalog = ConnectionHelpers.generateBasicConfiguredAirbyteCatalog();
expectedPersistedCatalog.getStreams().get(0).getStream().withName(AZKABAN_USERS);
Expand All @@ -600,7 +601,8 @@ void testUpdateConnectionPatchingSeveralFieldsAndReplaceAStream() throws JsonVal
.withCatalog(expectedPersistedCatalog)
.withResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(resourceRequirements))
.withSourceCatalogId(newSourceCatalogId)
.withOperationIds(List.of(operationId, otherOperationId));
.withOperationIds(List.of(operationId, otherOperationId))
.withGeography(Geography.EU);

when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync);

Expand Down

0 comments on commit 5d4b564

Please sign in to comment.