Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove config repository from config fetch activity impl scheduling #20908

Merged
merged 10 commits into from
Jan 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionSchedule;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataBasicSchedule.TimeUnitEnum;
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand All @@ -41,6 +43,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -106,48 +109,48 @@ public StandardSync getStandardSync(final UUID connectionId) throws JsonValidati
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));
final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId());

if (standardSync.getScheduleType() != null) {
return this.getTimeToWaitFromScheduleType(standardSync, input.getConnectionId());
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(input.getConnectionId());
final ConnectionRead connectionRead = connectionApi.getConnection(connectionIdRequestBody);
if (connectionRead.getScheduleType() != null) {
return this.getTimeToWaitFromScheduleType(connectionRead, input.getConnectionId());
}
return this.getTimeToWaitFromLegacy(standardSync, input.getConnectionId());
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
return this.getTimeToWaitFromLegacy(connectionRead, input.getConnectionId());
} catch (final IOException | ApiException e) {
throw new RetryableException(e);
}
}

/**
* @param standardSync
* @param connectionRead
* @param connectionId
* @return
* @throws IOException
*
* This method consumes the `scheduleType` and `scheduleData` fields.
*/
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync standardSync, final UUID connectionId) throws IOException {
if (standardSync.getScheduleType() == ScheduleType.MANUAL || standardSync.getStatus() != Status.ACTIVE) {
private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
if (connectionRead.getScheduleType() == ConnectionScheduleType.MANUAL || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);

if (standardSync.getScheduleType() == ScheduleType.BASIC_SCHEDULE) {
if (connectionRead.getScheduleType() == ConnectionScheduleType.BASIC) {
if (previousJobOptional.isEmpty()) {
// Basic schedules don't wait for their first run.
return new ScheduleRetrieverOutput(Duration.ZERO);
}
final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());
final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getScheduleData().getBasicSchedule());
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getScheduleData().getBasicSchedule());
final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
return new ScheduleRetrieverOutput(timeToWait);
}

else { // standardSync.getScheduleType() == ScheduleType.CRON
final Cron scheduleCron = standardSync.getScheduleData().getCron();
else { // connectionRead.getScheduleType() == ConnectionScheduleType.CRON
final ConnectionScheduleDataCron scheduleCron = connectionRead.getScheduleData().getCron();
final TimeZone timeZone = DateTimeZone.forID(scheduleCron.getCronTimeZone()).toTimeZone();
try {
final CronExpression cronExpression = new CronExpression(scheduleCron.getCronExpression());
Expand All @@ -164,18 +167,18 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync
Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart.getTime() / MS_PER_SECOND - currentSecondsSupplier.get()));

timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, standardSync);
timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, connectionRead);
return new ScheduleRetrieverOutput(timeToWait);
} catch (final ParseException e) {
throw (DateTimeException) new DateTimeException(e.getMessage()).initCause(e);
}
}
}

private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, StandardSync standardSync) {
private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) {
final UUID workspaceId;
try {
workspaceId = workspaceHelper.getWorkspaceForConnectionId(standardSync.getConnectionId());
workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId());
} catch (JsonValidationException | ConfigNotFoundException e) {
// We tolerate exceptions and fail open by doing nothing.
return timeToWait;
Expand All @@ -184,7 +187,7 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
// Only apply to a specific set of workspaces.
return timeToWait;
}
if (!standardSync.getScheduleType().equals(ScheduleType.CRON)) {
if (!connectionRead.getScheduleType().equals(ConnectionScheduleType.CRON)) {
// Only apply noise to cron connections.
return timeToWait;
}
Expand All @@ -197,30 +200,30 @@ private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait,
}

/**
* @param standardSync
* @param connectionRead
* @param connectionId
* @return
* @throws IOException
*
* This method consumes the `schedule` field.
*/
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final StandardSync standardSync, final UUID connectionId) throws IOException {
if (standardSync.getSchedule() == null || standardSync.getStatus() != Status.ACTIVE) {
private ScheduleRetrieverOutput getTimeToWaitFromLegacy(final ConnectionRead connectionRead, final UUID connectionId) throws IOException {
if (connectionRead.getSchedule() == null || connectionRead.getStatus() != ConnectionStatus.ACTIVE) {
// Manual syncs wait for their first run
return new ScheduleRetrieverOutput(Duration.ofDays(100 * 365));
}

final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connectionId);

if (previousJobOptional.isEmpty() && standardSync.getSchedule() != null) {
if (previousJobOptional.isEmpty() && connectionRead.getSchedule() != null) {
// Non-manual syncs don't wait for their first run
return new ScheduleRetrieverOutput(Duration.ZERO);
}

final Job previousJob = previousJobOptional.get();
final long prevRunStart = previousJob.getStartedAtInSecond().orElse(previousJob.getCreatedAtInSecond());

final long nextRunStart = prevRunStart + ScheduleHelpers.getIntervalInSecond(standardSync.getSchedule());
final long nextRunStart = prevRunStart + getIntervalInSecond(connectionRead.getSchedule());

final Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart - currentSecondsSupplier.get()));
Expand Down Expand Up @@ -261,4 +264,46 @@ public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
}
}

private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

private Long getIntervalInSecond(final ConnectionSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

private Long getSecondsInUnit(final TimeUnitEnum timeUnitEnum) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this (and the similar methods surrounding it) should be defined somewhere else. Don't know where that would be currently, but they seem a little out of play in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worked on this with Parker, we did discuss this. the issue is that these methods are currently defined in ScheduleHelpers, but if we wanted to add new methods there that took ConnectionRead instead of StandardSync we'd have to add a new dependency for airbyte-config to depend on airbyte-api. Not sure if possibly a better solution would be to move ScheduleHelpers elsewhere?

switch (timeUnitEnum) {
case MINUTES:
return TimeUnit.MINUTES.toSeconds(1);
case HOURS:
return TimeUnit.HOURS.toSeconds(1);
case DAYS:
return TimeUnit.DAYS.toSeconds(1);
case WEEKS:
return TimeUnit.DAYS.toSeconds(1) * 7;
case MONTHS:
return TimeUnit.DAYS.toSeconds(1) * 30;
default:
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum);
}
}

private Long getSecondsInUnit(final ConnectionSchedule.TimeUnitEnum timeUnitEnum) {
switch (timeUnitEnum) {
case MINUTES:
return TimeUnit.MINUTES.toSeconds(1);
case HOURS:
return TimeUnit.HOURS.toSeconds(1);
case DAYS:
return TimeUnit.DAYS.toSeconds(1);
case WEEKS:
return TimeUnit.DAYS.toSeconds(1) * 7;
case MONTHS:
return TimeUnit.DAYS.toSeconds(1) * 30;
default:
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnitEnum);
}
}

}
Loading