Skip to content

Commit

Permalink
Add organization table to config db (#7252)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencecho committed Jun 28, 2023
1 parent 474d61a commit 1d32833
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.4.002";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.5.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
private static final String CDK_VERSION = "1.2.3";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OrganizationConfiguration.yaml
title: Organization
description: organization configuration
type: object
required:
- organizationId
- name
- userId
- email
additionalProperties: true
properties:
organizationId:
type: string
format: uuid
userId:
type: string
format: uuid
name:
type: string
email:
type: string
format: email
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.airbyte.db.instance.configs.jooq.generated.Tables.DECLARATIVE_MANIFEST;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.NOTIFICATION_CONFIGURATION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.OPERATION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ORGANIZATION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.SCHEMA_MANAGEMENT;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE_SERVICE_ACCOUNT;
Expand Down Expand Up @@ -57,6 +58,7 @@
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.Organization;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -154,7 +156,9 @@ public record StandardSyncsQueryPaginated(
List<UUID> destinationId,
boolean includeDeleted,
int pageSize,
int rowOffset) {}
int rowOffset) {

}

/**
* Query object for paginated querying of sources/destinations in multiple workspaces.
Expand All @@ -168,7 +172,25 @@ public record ResourcesQueryPaginated(
@Nonnull List<UUID> workspaceIds,
boolean includeDeleted,
int pageSize,
int rowOffset) {}
int rowOffset) {

}

/**
* Query object for paginated querying of resource in an organization.
*
* @param organizationId organization to fetch resources for
* @param includeDeleted include tombstoned resources
* @param pageSize limit
* @param rowOffset offset
*/
public record ResourcesByOrganizationQueryPaginated(
@Nonnull UUID organizationId,
boolean includeDeleted,
int pageSize,
int rowOffset) {

}

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
private static final String OPERATION_IDS_AGG_FIELD = "operation_ids_agg";
Expand Down Expand Up @@ -217,6 +239,95 @@ public boolean healthCheck() {
return true;
}

/**
* Get organization.
*
* @param organizationId id to use to find the organization
* @return organization, if present.
* @throws IOException - you never know when you IO
*/
public Optional<Organization> getOrganization(final UUID organizationId) throws IOException {
final Result<Record> result;
result = database.query(ctx -> ctx.select(ORGANIZATION.asterisk())
.from(ORGANIZATION)
.where(ORGANIZATION.ID.eq(organizationId))).fetch();

return result.stream().findFirst().map(DbConverter::buildOrganization);
}

/**
* Write an Organization to the database.
*
* @param organization - The configuration of the organization
* @throws IOException - you never know when you IO
*/
public void writeOrganization(final Organization organization) throws IOException {
database.transaction(ctx -> {
final OffsetDateTime timestamp = OffsetDateTime.now();
final boolean isExistingConfig = ctx.fetchExists(select()
.from(ORGANIZATION)
.where(ORGANIZATION.ID.eq(organization.getOrganizationId())));

if (isExistingConfig) {
ctx.update(ORGANIZATION)
.set(ORGANIZATION.ID, organization.getOrganizationId())
.set(ORGANIZATION.NAME, organization.getName())
.set(ORGANIZATION.EMAIL, organization.getEmail())
.set(ORGANIZATION.UPDATED_AT, timestamp)
.where(ORGANIZATION.ID.eq(organization.getOrganizationId()))
.execute();
} else {
ctx.insertInto(ORGANIZATION)
.set(ORGANIZATION.ID, organization.getOrganizationId())
.set(ORGANIZATION.NAME, organization.getName())
.set(ORGANIZATION.EMAIL, organization.getEmail())
.set(WORKSPACE.CREATED_AT, timestamp)
.set(WORKSPACE.UPDATED_AT, timestamp)
.execute();
}
return null;
});
}

/**
* List organizations.
*
* @return organizations
* @throws IOException - you never know when you IO
*/
public List<Organization> listOrganizations() throws IOException {
return listOrganizationQuery(Optional.empty()).toList();
}

private Stream<Organization> listOrganizationQuery(final Optional<UUID> organizationId) throws IOException {
return database.query(ctx -> ctx.select(ORGANIZATION.asterisk())
.from(ORGANIZATION)
.where(organizationId.map(ORGANIZATION.ID::eq).orElse(noCondition()))
.fetch())
.stream()
.map(DbConverter::buildOrganization);
}

/**
* List organizations (paginated).
*
* @param resourcesByOrganizationQueryPaginated - contains all the information we need to paginate
* @return A List of organizations objectjs
* @throws IOException you never know when you IO
*/
public List<Organization> listOrganizationsPaginated(final ResourcesByOrganizationQueryPaginated resourcesByOrganizationQueryPaginated)
throws IOException {
return database.query(ctx -> ctx.select(ORGANIZATION.asterisk())
.from(ORGANIZATION)
.where(ORGANIZATION.ID.in(resourcesByOrganizationQueryPaginated.organizationId()))
.limit(resourcesByOrganizationQueryPaginated.pageSize())
.offset(resourcesByOrganizationQueryPaginated.rowOffset())
.fetch())
.stream()
.map(DbConverter::buildOrganization)
.toList();
}

/**
* Get workspace.
*
Expand Down Expand Up @@ -314,7 +425,7 @@ public List<StandardWorkspace> listStandardWorkspacesPaginated(final ResourcesQu

/**
* MUST NOT ACCEPT SECRETS - Should only be called from { @link SecretsRepositoryWriter }.
*
* <p>
* Write a StandardWorkspace to the database.
*
* @param workspace - The configuration of the workspace
Expand Down Expand Up @@ -1077,7 +1188,7 @@ public SourceConnection getSourceConnection(final UUID sourceId) throws JsonVali

/**
* MUST NOT ACCEPT SECRETS - Should only be called from { @link SecretsRepositoryWriter }
*
* <p>
* Write a SourceConnection to the database. The configuration of the Source will be a partial
* configuration (no secrets, just pointer to the secrets store).
*
Expand Down Expand Up @@ -1208,7 +1319,7 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)

/**
* MUST NOT ACCEPT SECRETS - Should only be called from { @link SecretsRepositoryWriter }
*
* <p>
* Write a DestinationConnection to the database. The configuration of the Destination will be a
* partial configuration (no secrets, just pointer to the secrets store).
*
Expand Down Expand Up @@ -2030,7 +2141,7 @@ private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, fi

/**
* Pair of source and its associated definition.
*
* <p>
* Data-carrier records to hold combined result of query for a Source or Destination and its
* corresponding Definition. This enables the API layer to process combined information about a
* Source/Destination/Definition pair without requiring two separate queries and in-memory join
Expand Down Expand Up @@ -2130,7 +2241,7 @@ public ActorCatalog getActorCatalogById(final UUID actorCatalogId)

/**
* Store an Airbyte catalog in DB if it is not present already.
*
* <p>
* Checks in the config DB if the catalog is present already, if so returns it identifier. It is not
* present, it is inserted in DB with a new identifier and that identifier is returned.
*
Expand Down Expand Up @@ -2269,7 +2380,7 @@ where actor_id in ({0})

/**
* Stores source catalog information.
*
* <p>
* This function is called each time the schema of a source is fetched. This can occur because the
* source is set up for the first time, because the configuration or version of the connector
* changed or because the user explicitly requested a schema refresh. Schemas are stored separately
Expand Down Expand Up @@ -2751,7 +2862,7 @@ public void deleteManifestDraftForActorDefinition(final UUID actorDefinitionId,
/**
* Write name and draft of a builder project. The actor_definition is also updated to match the new
* builder project name.
*
* <p>
* Actor definition updated this way should always be private (i.e. public=false). As an additional
* protection, we want to shield ourselves from users updating public actor definition and
* therefore, the name of the actor definition won't be updated if the actor definition is not
Expand Down Expand Up @@ -2800,7 +2911,7 @@ public void assignActorDefinitionToConnectorBuilderProject(final UUID builderPro

/**
* Update an actor_definition, active_declarative_manifest and create declarative_manifest.
*
* <p>
* Note that based on this signature, two problems might occur if the user of this method is not
* diligent. This was done because we value more separation of concerns than consistency of the API
* of this method. The problems are:
Expand All @@ -2811,17 +2922,17 @@ public void assignActorDefinitionToConnectorBuilderProject(final UUID builderPro
* <li>DeclarativeManifest.spec could be different from ConnectorSpecification.connectionSpecification</li>
* </ul>
* </pre>
*
* <p>
* Since we decided not to validate this using the signature of the method, we will validate that
* runtime and IllegalArgumentException if there is a mismatch.
*
* <p>
* The reasoning behind this reasoning is the following: Benefits: Alignment with platform's
* definition of the repository. Drawbacks: We will need a method
* configRepository.setDeclarativeSourceActiveVersion(sourceDefinitionId, version, manifest, spec);
* where version and (manifest, spec) might not be consistent i.e. that a user of this method could
* call it with configRepository.setDeclarativeSourceActiveVersion(sourceDefinitionId, version_10,
* manifest_of_version_7, spec_of_version_12); However, we agreed that this was very unlikely.
*
* <p>
* Note that this is all in the context of data consistency i.e. that we want to do this in one
* transaction. When we split this in many services, we will need to rethink data consistency.
*
Expand Down Expand Up @@ -2878,7 +2989,7 @@ private void upsertActiveDeclarativeManifest(final ActiveDeclarativeManifest act

/**
* Update an actor_definition, active_declarative_manifest and create declarative_manifest.
*
* <p>
* Note that based on this signature, two problems might occur if the user of this method is not
* diligent. This was done because we value more separation of concerns than consistency of the API
* of this method. The problems are:
Expand All @@ -2889,16 +3000,16 @@ private void upsertActiveDeclarativeManifest(final ActiveDeclarativeManifest act
* <li>DeclarativeManifest.spec could be different from ConnectorSpecification.connectionSpecification</li>
* </ul>
* </pre>
*
* <p>
* At that point, we can only hope the user won't cause data consistency issue using this method
*
* <p>
* The reasoning behind this reasoning is the following: Benefits: Alignment with platform's
* definition of the repository. Drawbacks: We will need a method
* configRepository.setDeclarativeSourceActiveVersion(sourceDefinitionId, version, manifest, spec);
* where version and (manifest, spec) might not be consistent i.e. that a user of this method could
* call it with configRepository.setDeclarativeSourceActiveVersion(sourceDefinitionId, version_10,
* manifest_of_version_7, spec_of_version_12); However, we agreed that this was very unlikely.
*
* <p>
* Note that this is all in the context of data consistency i.e. that we want to do this in one
* transaction. When we split this in many services, we will need to rethink data consistency.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTOR_BUILDER_PROJECT;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.DECLARATIVE_MANIFEST;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ORGANIZATION;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.SCHEMA_MANAGEMENT;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE_SERVICE_ACCOUNT;
Expand All @@ -39,6 +40,7 @@
import io.airbyte.config.NormalizationDestinationDefinitionConfig;
import io.airbyte.config.Notification;
import io.airbyte.config.NotificationSettings;
import io.airbyte.config.Organization;
import io.airbyte.config.ReleaseStage;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -176,6 +178,20 @@ public static StandardWorkspace buildStandardWorkspace(final Record record) {
: Jsons.deserialize(record.get(WORKSPACE.WEBHOOK_OPERATION_CONFIGS).data()));
}

/**
* Build organization from db record.
*
* @param record db record
* @return organization
*/
public static Organization buildOrganization(final Record record) {
return new Organization()
.withOrganizationId(record.get(ORGANIZATION.ID))
.withName(record.get(ORGANIZATION.NAME))
.withUserId(record.get(ORGANIZATION.USER_ID))
.withEmail(record.get(ORGANIZATION.EMAIL));
}

/**
* Build source from db record.
*
Expand Down
Loading

0 comments on commit 1d32833

Please sign in to comment.