Skip to content

Commit

Permalink
Fix Job Tracking Issue (#1071)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Nov 26, 2020
1 parent 9b2c946 commit 4b7c5b2
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public static Long getSecondsInUnit(Schedule.TimeUnit timeUnitEnum) {
}
}

public static Long getIntervalInSecond(Schedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,21 @@ public void writeStandardDestinationDefinition(final StandardDestinationDefiniti
destinationDefinition);
}

public SourceConnection getSourceConnection(final UUID sourceId)
throws JsonValidationException, IOException, ConfigNotFoundException {
public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.SOURCE_CONNECTION,
sourceId.toString(),
SourceConnection.class);
}

public void writeSourceConnection(final SourceConnection source)
throws JsonValidationException, IOException {
public void writeSourceConnection(final SourceConnection source) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.SOURCE_CONNECTION,
source.getSourceId().toString(),
source);
}

public List<SourceConnection> listSourceConnection()
throws JsonValidationException, IOException, ConfigNotFoundException {
public List<SourceConnection> listSourceConnection() throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.listConfigs(
ConfigSchema.SOURCE_CONNECTION,
SourceConnection.class);
Expand All @@ -173,44 +170,38 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
DestinationConnection.class);
}

public void writeDestinationConnection(DestinationConnection destinationConnection)
throws JsonValidationException, IOException {
public void writeDestinationConnection(DestinationConnection destinationConnection) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.DESTINATION_CONNECTION,
destinationConnection.getDestinationId().toString(),
destinationConnection);
}

public List<DestinationConnection> listDestinationConnection()
throws JsonValidationException, IOException, ConfigNotFoundException {
public List<DestinationConnection> listDestinationConnection() throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.listConfigs(
ConfigSchema.DESTINATION_CONNECTION,
DestinationConnection.class);
}

public StandardSync getStandardSync(final UUID connectionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.STANDARD_SYNC,
connectionId.toString(),
StandardSync.class);
}

public void writeStandardSync(final StandardSync standardSync)
throws JsonValidationException, IOException {
public void writeStandardSync(final StandardSync standardSync) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
standardSync);
}

public List<StandardSync> listStandardSyncs()
throws ConfigNotFoundException, IOException, JsonValidationException {
public List<StandardSync> listStandardSyncs() throws ConfigNotFoundException, IOException, JsonValidationException {
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class);
}

public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.STANDARD_SYNC_SCHEDULE,
connectionId.toString(),
Expand Down
148 changes: 104 additions & 44 deletions airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,29 @@

package io.airbyte.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -69,7 +78,7 @@ public void run() {
Optional<Job> oldestPendingJob = persistence.getOldestPendingJob();

oldestPendingJob.ifPresent(job -> {
track(job, configRepository);
trackSubmission(job);
submitJob(job);
});

Expand All @@ -79,7 +88,8 @@ public void run() {
}
}

private void submitJob(Job job) {
@VisibleForTesting
void submitJob(Job job) {
final WorkerRun workerRun = workerRunFactory.create(job);
threadPool.submit(new LifecycledCallable.Builder<>(workerRun)
.setOnStart(() -> {
Expand All @@ -96,61 +106,111 @@ private void submitJob(Job job) {
persistence.writeOutput(job.getId(), output.getOutput().get());
}
persistence.updateStatus(job.getId(), getStatus(output));
trackCompletion(job, output.getStatus());
})
.setOnException(noop -> {
persistence.updateStatus(job.getId(), JobStatus.FAILED);
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
})
.setOnException(noop -> persistence.updateStatus(job.getId(), JobStatus.FAILED))
.setOnFinish(MDC::clear)
.build());
}

private void track(Job job, ConfigRepository configRepository) {
@VisibleForTesting
void trackSubmission(Job job) {
try {
final Builder<String, Object> metadata = ImmutableMap.builder();
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE, DISCOVER_SCHEMA -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromSource(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("source_definition_name", sourceDefinition.getName());
metadata.put("source_definition_id", sourceDefinition.getSourceDefinitionId());
}
case CHECK_CONNECTION_DESTINATION -> {
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromDestination(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("destination_definition_name", destinationDefinition.getName());
metadata.put("destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
case GET_SPEC -> {
// no op because this will be noisy as heck.
}
case SYNC -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromConnection(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("source_definition_name", sourceDefinition.getName());
metadata.put("source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("destination_definition_name", destinationDefinition.getName());
metadata.put("destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
}
final Builder<String, Object> metadataBuilder = generateMetadata(job);
metadataBuilder.put("attempt_stage", "STARTED");
track(metadataBuilder.build());
} catch (Exception e) {
LOGGER.error("failed while reporting usage.");
}
}

TrackingClientSingleton.get().track("job", metadata.build());
@VisibleForTesting
void trackCompletion(Job job, io.airbyte.workers.JobStatus status) {
try {
final Builder<String, Object> metadataBuilder = generateMetadata(job);
metadataBuilder.put("attempt_stage", "ENDED");
metadataBuilder.put("attempt_completion_status", status);
track(metadataBuilder.build());
} catch (Exception e) {
LOGGER.error("failed while reporting usage.");
}
}

private JobStatus getStatus(OutputAndStatus<?> output) {
switch (output.getStatus()) {
case SUCCEEDED:
return JobStatus.COMPLETED;
case FAILED:
return JobStatus.FAILED;
default:
throw new RuntimeException("Unknown state " + output.getStatus());
private void track(Map<String, Object> metadata) {
// do not track get spec. it is done frequently and not terribly interesting.
if (metadata.get("job_type").equals("GET_SPEC")) {
return;
}

TrackingClientSingleton.get().track("Connector Jobs", metadata);
}

private ImmutableMap.Builder<String, Object> generateMetadata(Job job) throws ConfigNotFoundException, IOException, JsonValidationException {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("job_type", job.getConfig().getConfigType());
metadata.put("job_id", job.getId());
metadata.put("attempt_id", job.getAttempts());
// build deterministic job and attempt uuids based off of the scope,which should be unique across
// all instances of airbyte installed everywhere).
final UUID jobUuid = UUID.nameUUIDFromBytes((job.getScope() + job.getId() + job.getAttempts()).getBytes(Charsets.UTF_8));
final UUID attemptUuid = UUID.nameUUIDFromBytes((job.getScope() + job.getId() + job.getAttempts()).getBytes(Charsets.UTF_8));
metadata.put("job_uuid", jobUuid);
metadata.put("attempt_uuid", attemptUuid);

switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE, DISCOVER_SCHEMA -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromSource(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
}
case CHECK_CONNECTION_DESTINATION -> {
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromDestination(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
case GET_SPEC -> {
// no op because this will be noisy as heck.
}
case SYNC -> {
final UUID connectionId = UUID.fromString(ScopeHelper.getConfigId(job.getScope()));
final StandardSyncSchedule schedule = configRepository.getStandardSyncSchedule(connectionId);
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromConnection(connectionId);

metadata.put("connection_id", connectionId);
metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());

String frequencyString;
if (schedule.getManual()) {
frequencyString = "manual";
} else {
final long intervalInMinutes = TimeUnit.SECONDS.toMinutes(ScheduleHelpers.getIntervalInSecond(schedule.getSchedule()));
frequencyString = intervalInMinutes + " min";
}
metadata.put("frequency", frequencyString);
}
}
return metadata;
}

private static JobStatus getStatus(OutputAndStatus<?> output) {
return switch (output.getStatus()) {
case SUCCEEDED -> JobStatus.COMPLETED;
case FAILED -> JobStatus.FAILED;
default -> throw new IllegalStateException("Unknown state " + output.getStatus());
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package io.airbyte.scheduler;

import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import java.time.Instant;
Expand Down Expand Up @@ -79,12 +78,8 @@ private boolean isTimeForNewJob(Optional<Job> previousJobOptional, StandardSyncS
}

final Job previousJob = previousJobOptional.get();
long nextRunStart = previousJob.getUpdatedAtInSecond() + getIntervalInSecond(standardSyncSchedule.getSchedule());
long nextRunStart = previousJob.getUpdatedAtInSecond() + ScheduleHelpers.getIntervalInSecond(standardSyncSchedule.getSchedule());
return nextRunStart < timeSupplier.get().getEpochSecond();
}

private static Long getIntervalInSecond(Schedule schedule) {
return ScheduleHelpers.getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

}
Loading

0 comments on commit 4b7c5b2

Please sign in to comment.