Skip to content

Commit

Permalink
Rbroughan/stream status persistence server (#6235)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed May 2, 2023
1 parent d7f36bd commit 170af5c
Show file tree
Hide file tree
Showing 10 changed files with 665 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -35,7 +34,6 @@
* Helpers for interacting with Workspaces.
*/
@SuppressWarnings("PMD.AvoidCatchingThrowable")
@Singleton
public class WorkspaceHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkspaceHelper.class);
Expand Down
2 changes: 2 additions & 0 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut
implementation libs.bundles.micronaut.data.jpa
implementation libs.micronaut.jaxrs.server
implementation libs.flyway.core
implementation libs.s3
Expand All @@ -37,6 +38,7 @@ dependencies {
implementation project(":airbyte-featureflag")
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-db:jooq')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-notification')
implementation project(':airbyte-oauth')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.db.Database;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseCheckFactory;
import io.airbyte.db.instance.DatabaseConstants;
import io.airbyte.featureflag.FeatureFlagClient;
Expand All @@ -18,13 +19,16 @@
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import io.micronaut.flyway.FlywayConfigurationProperties;
import io.micronaut.transaction.jdbc.DelegatingDataSource;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.Flyway;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DataSourceConnectionProvider;

/**
* Micronaut bean factory for database-related singletons.
Expand All @@ -42,7 +46,7 @@ public class DatabaseBeanFactory {
@Singleton
@Named("configDatabase")
public Database configDatabase(@Named("config") final DSLContext dslContext) throws IOException {
return new Database(dslContext);
return new Database(unwrapContext(dslContext));
}

@Singleton
Expand All @@ -51,7 +55,7 @@ public Flyway configFlyway(@Named("config") final FlywayConfigurationProperties
@Named("config") final DataSource configDataSource,
@Value("${airbyte.flyway.configs.minimum-migration-version}") final String baselineVersion) {
return configFlywayConfigurationProperties.getFluentConfiguration()
.dataSource(configDataSource)
.dataSource(unwrapDataSource(configDataSource))
.baselineVersion(baselineVersion)
.baselineDescription(BASELINE_DESCRIPTION)
.baselineOnMigrate(BASELINE_ON_MIGRATION)
Expand All @@ -66,7 +70,7 @@ public Flyway jobsFlyway(@Named("jobs") final FlywayConfigurationProperties jobs
@Named("config") final DataSource jobsDataSource,
@Value("${airbyte.flyway.jobs.minimum-migration-version}") final String baselineVersion) {
return jobsFlywayConfigurationProperties.getFluentConfiguration()
.dataSource(jobsDataSource)
.dataSource(unwrapDataSource(jobsDataSource))
.baselineVersion(baselineVersion)
.baselineDescription(BASELINE_DESCRIPTION)
.baselineOnMigrate(BASELINE_ON_MIGRATION)
Expand Down Expand Up @@ -99,7 +103,7 @@ public DatabaseMigrationCheck configsDatabaseMigrationCheck(@Named("config") fin
@Value("${airbyte.flyway.configs.initialization-timeout-ms}") final Long configsDatabaseInitializationTimeoutMs) {
log.info("Configs database configuration: {} {}", configsDatabaseMinimumFlywayMigrationVersion, configsDatabaseInitializationTimeoutMs);
return DatabaseCheckFactory
.createConfigsDatabaseMigrationCheck(dslContext, configsFlyway, configsDatabaseMinimumFlywayMigrationVersion,
.createConfigsDatabaseMigrationCheck(unwrapContext(dslContext), configsFlyway, configsDatabaseMinimumFlywayMigrationVersion,
configsDatabaseInitializationTimeoutMs);
}

Expand All @@ -110,19 +114,34 @@ public DatabaseMigrationCheck jobsDatabaseMigrationCheck(@Named("config") final
@Value("${airbyte.flyway.jobs.minimum-migration-version}") final String jobsDatabaseMinimumFlywayMigrationVersion,
@Value("${airbyte.flyway.jobs.initialization-timeout-ms}") final Long jobsDatabaseInitializationTimeoutMs) {
return DatabaseCheckFactory
.createJobsDatabaseMigrationCheck(dslContext, jobsFlyway, jobsDatabaseMinimumFlywayMigrationVersion,
.createJobsDatabaseMigrationCheck(unwrapContext(dslContext), jobsFlyway, jobsDatabaseMinimumFlywayMigrationVersion,
jobsDatabaseInitializationTimeoutMs);
}

@Singleton
@Named("jobsDatabaseAvailabilityCheck")
public JobsDatabaseAvailabilityCheck jobsDatabaseAvailabilityCheck(@Named("config") final DSLContext dslContext) {
return new JobsDatabaseAvailabilityCheck(dslContext, DatabaseConstants.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS);
return new JobsDatabaseAvailabilityCheck(unwrapContext(dslContext), DatabaseConstants.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS);
}

@Singleton
public StreamResetPersistence streamResetPersistence(@Named("configDatabase") final Database configDatabase) {
return new StreamResetPersistence(configDatabase);
}

// Micronaut-data wraps the injected data sources with transactional semantics, which don't respect
// our jooq operations and error out. If we inject an unwrapped one, it will be re-wrapped. So we
// manually unwrap them.
static DataSource unwrapDataSource(final DataSource dataSource) {
return ((DelegatingDataSource) dataSource).getTargetDataSource();
}

// For some reason, it won't let us provide an unwrapped dsl context as a bean, so we manually
// unwrap the data source here as well.
static DSLContext unwrapContext(final DSLContext context) {
final var datasource = ((DataSourceConnectionProvider) context.configuration().connectionProvider()).dataSource();

return DSLContextFactory.create(unwrapDataSource(datasource), SQLDialect.POSTGRES);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.repositories;

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATUSES;

import com.google.common.base.CaseFormat;
import io.airbyte.server.repositories.domain.StreamStatus;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.annotation.RepositoryConfiguration;
import io.micronaut.data.model.Page;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.query.builder.jpa.JpaQueryBuilder;
import io.micronaut.data.repository.PageableRepository;
import io.micronaut.data.repository.jpa.JpaSpecificationExecutor;
import io.micronaut.data.repository.jpa.criteria.PredicateSpecification;
import java.util.UUID;
import lombok.Builder;
import org.jooq.TableField;

/**
* Data Access layer for StreamStatus.
*/
@SuppressWarnings("MissingJavadocType")
@Repository
@RepositoryConfiguration(queryBuilder = JpaQueryBuilder.class) // Despite what your IDE says, we must specify the query builder
public interface StreamStatusRepository extends PageableRepository<StreamStatus, UUID>, JpaSpecificationExecutor<StreamStatus> {

/**
* Returns stream statuses filtered by the provided params.
*/
default Page<StreamStatus> findAllFiltered(final FilterParams params) {
var spec = Predicates.columnEquals(Columns.WORKSPACE_ID, params.workspaceId());
var pageable = Pageable.unpaged();

if (null != params.connectionId()) {
spec = spec.and(Predicates.columnEquals(Columns.CONNECTION_ID, params.connectionId()));
}

if (null != params.jobId()) {
spec = spec.and(Predicates.columnEquals(Columns.JOB_ID, params.jobId()));
}

if (null != params.streamNamespace()) {
spec = spec.and(Predicates.columnEquals(Columns.STREAM_NAMESPACE, params.streamNamespace()));
}

if (null != params.streamName()) {
spec = spec.and(Predicates.columnEquals(Columns.STREAM_NAME, params.streamName()));
}

if (null != params.attemptNumber()) {
spec = spec.and(Predicates.columnEquals(Columns.ATTEMPT_NUMBER, params.attemptNumber()));
}

if (null != params.pagination()) {
final var offset = params.pagination().offset();
final var size = params.pagination().size();
pageable = Pageable.from(offset, size);
}

return findAll(spec, pageable);
}

/**
* Pagination params.
*/
record Pagination(int offset,
int size) {}

/**
* Params for filtering our list functionality.
*/
@Builder
record FilterParams(UUID workspaceId,
UUID connectionId,
Long jobId,
String streamNamespace,
String streamName,
Integer attemptNumber,
Pagination pagination) {}

/**
* Predicates for dynamic query building. Portable.
*/
class Predicates {

/*
* Jooq holds onto the names of the columns in snake_case, so we have to convert to lower camelCase
* for Hibernate to do predicate filtering.
*/
static String formatJooqColumnName(final TableField<?, ?> jooqColumn) {
return CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, jooqColumn.getName());
}

static <U> PredicateSpecification<StreamStatus> columnEquals(final String columnName, final U value) {
return (root, criteriaBuilder) -> criteriaBuilder.equal(root.get(columnName), value);
}

}

/**
* Column names for StreamStatus in camel case for Hibernate. In lieu of a metamodel, we pre-create
* Hibernate-friendly column names for the already generated Jooq model.
*/
class Columns {

static String WORKSPACE_ID = Predicates.formatJooqColumnName(STREAM_STATUSES.WORKSPACE_ID);
static String CONNECTION_ID = Predicates.formatJooqColumnName(STREAM_STATUSES.CONNECTION_ID);
static String JOB_ID = Predicates.formatJooqColumnName(STREAM_STATUSES.JOB_ID);
static String STREAM_NAMESPACE = Predicates.formatJooqColumnName(STREAM_STATUSES.STREAM_NAMESPACE);
static String STREAM_NAME = Predicates.formatJooqColumnName(STREAM_STATUSES.STREAM_NAME);
static String ATTEMPT_NUMBER = Predicates.formatJooqColumnName(STREAM_STATUSES.ATTEMPT_NUMBER);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.repositories.domain;

import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStreamStatusIncompleteRunCause;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStreamStatusJobType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStreamStatusRunState;
import io.hypersistence.utils.hibernate.type.basic.PostgreSQLEnumType;
import io.micronaut.data.annotation.DateCreated;
import io.micronaut.data.annotation.DateUpdated;
import java.time.OffsetDateTime;
import java.util.UUID;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;

/**
* DTO for our data access layer.
*/
@Builder
@AllArgsConstructor // The builder uses this constructor
@NoArgsConstructor // Hibernate uses this constructor and sets the values individually
@Getter
@Setter
@EqualsAndHashCode(exclude = {"id", "createdAt", "updatedAt"})
@Entity(name = "stream_statuses")
@TypeDef(name = StreamStatus.PGSQL_ENUM,
typeClass = PostgreSQLEnumType.class)
public class StreamStatus {

static final String PGSQL_ENUM = "pgsql_enum";

@Id
@GeneratedValue
private UUID id;

private UUID workspaceId;

private UUID connectionId;

private Long jobId;

private Integer attemptNumber;

private String streamNamespace;

private String streamName;

@Enumerated(EnumType.STRING)
@Type(type = PGSQL_ENUM)
private JobStreamStatusJobType jobType;

@DateCreated
private OffsetDateTime createdAt;

@DateUpdated
private OffsetDateTime updatedAt;

@Enumerated(EnumType.STRING)
@Type(type = PGSQL_ENUM)
private JobStreamStatusRunState runState;

@Enumerated(EnumType.STRING)
@Type(type = PGSQL_ENUM)
private JobStreamStatusIncompleteRunCause incompleteRunCause;

private OffsetDateTime transitionedAt;

}
23 changes: 23 additions & 0 deletions airbyte-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,32 @@ jooq:
jackson-converter-enabled: true
sql-dialect: POSTGRES

jpa:
default:
entity-scan:
# this is where JPA/Hibernate will look for domain objects to map to
packages:
- "io.airbyte.server.repositories.domain"
# disable these since they are not managed by JPA/Hibernate
jobs:
enabled: false
config:
enabled: false

properties:
hibernate:
# Uncomment to help diagnose sql/ORM issues
# show_sql: true
dialect: org.hibernate.dialect.PostgreSQLDialect
hbm2ddl:
auto: none

logger:
levels:
# Uncomment to help resolve issues with conditional beans
# io.micronaut.context.condition: DEBUG
# Uncomment to help resolve issues with security beans
# io.micronaut.security: DEBUG
# Uncomment to help resolve issues with micronaut data
# com.zaxxer.hikari.HikariConfig: DEBUG
# com.zaxxer.hikari: TRACE
Loading

0 comments on commit 170af5c

Please sign in to comment.