Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose cron scheduling in the Connections APIs #15253

Merged
merged 16 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3218,6 +3218,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3257,6 +3261,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3298,6 +3306,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3335,6 +3347,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3386,6 +3402,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
resourceRequirements:
Expand Down Expand Up @@ -3416,6 +3436,10 @@ components:
$ref: "#/components/schemas/DestinationId"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
Expand Down Expand Up @@ -3445,6 +3469,10 @@ components:
$ref: "#/components/schemas/DestinationId"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
Expand All @@ -3467,6 +3495,8 @@ components:
- active
- inactive
- deprecated
# TODO(https://github.com/airbytehq/airbyte/issues/11432): remove.
# Prefer the ConnectionScheduleType and ConnectionScheduleData properties.
ConnectionSchedule:
description: if null, then no schedule is set.
type: object
Expand All @@ -3485,6 +3515,46 @@ components:
- days
- weeks
- months
ConnectionScheduleType:
description: determine how the schedule data should be interpreted
type: string
enum:
- manual
- basic
- cron
ConnectionScheduleData:
description: schedule for when the the connection should run, per the schedule type
type: object
properties:
# This should be populated when schedule type is basic.
basicSchedule:
type: object
required:
- timeUnit
- units
properties:
timeUnit:
type: string
enum:
- minutes
- hours
- days
- weeks
- months
units:
type: integer
format: int64
# This should be populated when schedule type is cron.
cron:
type: object
required:
- cronExpression
- cronTimeZone
properties:
cronExpression:
type: string
cronTimeZone:
type: string
NamespaceDefinitionType:
type: string
description: Method used for computing final namespace in destination
Expand Down Expand Up @@ -4564,6 +4634,10 @@ components:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
$ref: "#/components/schemas/ConnectionSchedule"
scheduleType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not encapsulated in one object?

Schedule:
  type:
  data:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion the deeper nesting would make it a bit more unwieldy without too much benefit - encapsulating it under an object doesn't really reduce the cognitive load since every user has to figure out what to do with schedule data anyway whether it's top-level or nested.

I did consider the approach you propose, and I'm not really opposed to it. It comes with the wrinkle that during the migration we have this schedule object that contains both the old and new schemas. (Or - we have some transition period where we have the new schema with some new name, remove the old schema, and then migrate again the new schema to the existing Schedule name.) All in all this felt simpler; but open to feedback here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion on my end. However everything related to migration complexity is a non-issue to me since we are v0. I prefer to have something clean that we want than to make design tradoffs based on backward comp or migrations.

$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
operationIds:
Expand Down
1 change: 1 addition & 0 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation 'org.glassfish.jersey.inject:jersey-hk2'
implementation 'org.glassfish.jersey.media:jersey-media-json-jackson'
implementation 'org.glassfish.jersey.ext:jersey-bean-validation'
implementation 'org.quartz-scheduler:quartz:2.3.2'


testImplementation project(':airbyte-test-utils')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

package io.airbyte.server.converters;

import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleData;
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.JobType;
Expand All @@ -17,7 +21,10 @@
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.server.handlers.helpers.ConnectionScheduleHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.util.stream.Collectors;

public class ApiPojoConverters {
Expand Down Expand Up @@ -78,7 +85,7 @@ public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.co
.memoryLimit(resourceReqs.getMemoryLimit());
}

public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) {
public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) throws JsonValidationException {

final StandardSync newConnection = new StandardSync()
.withNamespaceDefinition(Enums.convertTo(update.getNamespaceDefinition(), NamespaceDefinitionType.class))
Expand All @@ -99,7 +106,9 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
}

// update sync schedule
if (update.getSchedule() != null) {
if (update.getScheduleType() != null) {
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(newConnection, update.getScheduleType(), update.getScheduleData());
} else if (update.getSchedule() != null) {
final Schedule newSchedule = new Schedule()
.withTimeUnit(toPersistenceTimeUnit(update.getSchedule().getTimeUnit()))
.withUnits(update.getSchedule().getUnits());
Expand All @@ -112,21 +121,12 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
}

public static ConnectionRead internalToConnectionRead(final StandardSync standardSync) {
ConnectionSchedule apiSchedule = null;

if (!standardSync.getManual()) {
apiSchedule = new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits());
}

final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(standardSync.getConnectionId())
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
.status(toApiStatus(standardSync.getStatus()))
.schedule(apiSchedule)
.name(standardSync.getName())
.namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), io.airbyte.api.model.generated.NamespaceDefinitionType.class))
.namespaceFormat(standardSync.getNamespaceFormat())
Expand All @@ -138,6 +138,8 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
connectionRead.resourceRequirements(resourceRequirementsToApi(standardSync.getResourceRequirements()));
}

populateConnectionReadSchedule(standardSync, connectionRead);

return connectionRead;
}

Expand All @@ -149,10 +151,15 @@ public static io.airbyte.config.JobTypeResourceLimit.JobType toInternalJobType(f
return Enums.convertTo(jobType, io.airbyte.config.JobTypeResourceLimit.JobType.class);
}

// TODO(https://github.com/airbytehq/airbyte/issues/11432): remove these helpers.
public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final Schedule.TimeUnit apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class);
}

public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final BasicSchedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionSchedule.TimeUnitEnum.class);
}

public static ConnectionStatus toApiStatus(final StandardSync.Status status) {
return Enums.convertTo(status, ConnectionStatus.class);
}
Expand All @@ -169,4 +176,75 @@ public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionSch
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
}

public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionScheduleDataBasicSchedule.TimeUnitEnum apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
}

public static ConnectionScheduleDataBasicSchedule.TimeUnitEnum toApiBasicScheduleTimeUnit(final BasicSchedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionScheduleDataBasicSchedule.TimeUnitEnum.class);
}

public static ConnectionScheduleDataBasicSchedule.TimeUnitEnum toApiBasicScheduleTimeUnit(final Schedule.TimeUnit timeUnit) {
return Enums.convertTo(timeUnit, ConnectionScheduleDataBasicSchedule.TimeUnitEnum.class);
}

public static void populateConnectionReadSchedule(final StandardSync standardSync, final ConnectionRead connectionRead) {
// TODO(https://github.com/airbytehq/airbyte/issues/11432): only return new schema once frontend is
// ready.
if (standardSync.getScheduleType() != null) {
// Populate everything based on the new schema.
switch (standardSync.getScheduleType()) {
case MANUAL -> {
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.MANUAL);
}
case BASIC_SCHEDULE -> {
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.BASIC);
connectionRead.scheduleData(new ConnectionScheduleData()
.basicSchedule(new ConnectionScheduleDataBasicSchedule()
.timeUnit(toApiBasicScheduleTimeUnit(standardSync.getScheduleData().getBasicSchedule().getTimeUnit()))
.units(standardSync.getScheduleData().getBasicSchedule().getUnits())));
connectionRead.schedule(new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getScheduleData().getBasicSchedule().getTimeUnit()))
.units(standardSync.getScheduleData().getBasicSchedule().getUnits()));
}
case CRON -> {
// We don't populate any legacy data here.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.CRON);
connectionRead.scheduleData(new ConnectionScheduleData()
.cron(new ConnectionScheduleDataCron()
.cronExpression(standardSync.getScheduleData().getCron().getCronExpression())
.cronTimeZone(standardSync.getScheduleData().getCron().getCronTimeZone())));
}
}
} else if (standardSync.getManual()) {
// Legacy schema, manual sync.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.MANUAL);
} else {
// Legacy schema, basic schedule.
connectionRead.scheduleType(io.airbyte.api.model.generated.ConnectionScheduleType.BASIC);
connectionRead.schedule(new ConnectionSchedule()
.timeUnit(toApiTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits()));
connectionRead.scheduleData(new ConnectionScheduleData()
.basicSchedule(new ConnectionScheduleDataBasicSchedule()
.timeUnit(toApiBasicScheduleTimeUnit(standardSync.getSchedule().getTimeUnit()))
.units(standardSync.getSchedule().getUnits())));
}
}

public static ConnectionScheduleType toApiScheduleType(final ScheduleType scheduleType) {
switch (scheduleType) {
case MANUAL -> {
return ConnectionScheduleType.MANUAL;
}
case BASIC_SCHEDULE -> {
return ConnectionScheduleType.BASIC;
}
case CRON -> {
return ConnectionScheduleType.CRON;
}
}
throw new RuntimeException("Unexpected schedule type");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.airbyte.server.converters.CatalogDiffConverters;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.server.handlers.helpers.ConnectionMatcher;
import io.airbyte.server.handlers.helpers.ConnectionScheduleHelper;
import io.airbyte.server.handlers.helpers.DestinationMatcher;
import io.airbyte.server.handlers.helpers.SourceMatcher;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -140,6 +141,34 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
}

if (connectionCreate.getSchedule() != null && connectionCreate.getScheduleType() != null) {
throw new JsonValidationException("supply old or new schedule schema but not both");
terencecho marked this conversation as resolved.
Show resolved Hide resolved
}

if (connectionCreate.getScheduleType() != null) {
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(standardSync, connectionCreate.getScheduleType(),
connectionCreate.getScheduleData());
} else {
populateSyncFromLegacySchedule(standardSync, connectionCreate);
}

configRepository.writeStandardSync(standardSync);

trackNewConnection(standardSync);

try {
LOGGER.info("Starting a connection manager workflow");
eventRunner.createConnectionManagerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
throw e;
}

return buildConnectionRead(connectionId);
}

private void populateSyncFromLegacySchedule(final StandardSync standardSync, final ConnectionCreate connectionCreate) {
if (connectionCreate.getSchedule() != null) {
final Schedule schedule = new Schedule()
.withTimeUnit(ApiPojoConverters.toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
Expand All @@ -159,21 +188,6 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
standardSync.withManual(true);
standardSync.withScheduleType(ScheduleType.MANUAL);
}

configRepository.writeStandardSync(standardSync);

trackNewConnection(standardSync);

try {
LOGGER.info("Starting a connection manager workflow");
eventRunner.createConnectionManagerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
throw e;
}

return buildConnectionRead(connectionId);
}

private void trackNewConnection(final StandardSync standardSync) {
Expand Down
Loading