Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Job Tracking Issue #1071

Merged
merged 11 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public static Long getSecondsInUnit(Schedule.TimeUnit timeUnitEnum) {
}
}

public static Long getIntervalInSecond(Schedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,21 @@ public void writeStandardDestinationDefinition(final StandardDestinationDefiniti
destinationDefinition);
}

public SourceConnection getSourceConnection(final UUID sourceId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank space changes.

throws JsonValidationException, IOException, ConfigNotFoundException {
public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.SOURCE_CONNECTION,
sourceId.toString(),
SourceConnection.class);
}

public void writeSourceConnection(final SourceConnection source)
throws JsonValidationException, IOException {
public void writeSourceConnection(final SourceConnection source) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.SOURCE_CONNECTION,
source.getSourceId().toString(),
source);
}

public List<SourceConnection> listSourceConnection()
throws JsonValidationException, IOException, ConfigNotFoundException {
public List<SourceConnection> listSourceConnection() throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.listConfigs(
ConfigSchema.SOURCE_CONNECTION,
SourceConnection.class);
Expand All @@ -173,44 +170,38 @@ public DestinationConnection getDestinationConnection(final UUID destinationId)
DestinationConnection.class);
}

public void writeDestinationConnection(DestinationConnection destinationConnection)
throws JsonValidationException, IOException {
public void writeDestinationConnection(DestinationConnection destinationConnection) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.DESTINATION_CONNECTION,
destinationConnection.getDestinationId().toString(),
destinationConnection);
}

public List<DestinationConnection> listDestinationConnection()
throws JsonValidationException, IOException, ConfigNotFoundException {
public List<DestinationConnection> listDestinationConnection() throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.listConfigs(
ConfigSchema.DESTINATION_CONNECTION,
DestinationConnection.class);
}

public StandardSync getStandardSync(final UUID connectionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.STANDARD_SYNC,
connectionId.toString(),
StandardSync.class);
}

public void writeStandardSync(final StandardSync standardSync)
throws JsonValidationException, IOException {
public void writeStandardSync(final StandardSync standardSync) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
standardSync);
}

public List<StandardSync> listStandardSyncs()
throws ConfigNotFoundException, IOException, JsonValidationException {
public List<StandardSync> listStandardSyncs() throws ConfigNotFoundException, IOException, JsonValidationException {
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class);
}

public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId)
throws JsonValidationException, IOException, ConfigNotFoundException {
public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
ConfigSchema.STANDARD_SYNC_SCHEDULE,
connectionId.toString(),
Expand Down
148 changes: 104 additions & 44 deletions airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,29 @@

package io.airbyte.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -69,7 +78,7 @@ public void run() {
Optional<Job> oldestPendingJob = persistence.getOldestPendingJob();

oldestPendingJob.ifPresent(job -> {
track(job, configRepository);
trackSubmission(job);
submitJob(job);
});

Expand All @@ -79,7 +88,8 @@ public void run() {
}
}

private void submitJob(Job job) {
@VisibleForTesting
void submitJob(Job job) {
final WorkerRun workerRun = workerRunFactory.create(job);
threadPool.submit(new LifecycledCallable.Builder<>(workerRun)
.setOnStart(() -> {
Expand All @@ -96,61 +106,111 @@ private void submitJob(Job job) {
persistence.writeOutput(job.getId(), output.getOutput().get());
}
persistence.updateStatus(job.getId(), getStatus(output));
trackCompletion(job, output.getStatus());
})
.setOnException(noop -> {
persistence.updateStatus(job.getId(), JobStatus.FAILED);
trackCompletion(job, io.airbyte.workers.JobStatus.FAILED);
})
.setOnException(noop -> persistence.updateStatus(job.getId(), JobStatus.FAILED))
.setOnFinish(MDC::clear)
.build());
}

private void track(Job job, ConfigRepository configRepository) {
@VisibleForTesting
void trackSubmission(Job job) {
try {
final Builder<String, Object> metadata = ImmutableMap.builder();
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE, DISCOVER_SCHEMA -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromSource(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("source_definition_name", sourceDefinition.getName());
metadata.put("source_definition_id", sourceDefinition.getSourceDefinitionId());
}
case CHECK_CONNECTION_DESTINATION -> {
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromDestination(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("destination_definition_name", destinationDefinition.getName());
metadata.put("destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
case GET_SPEC -> {
// no op because this will be noisy as heck.
}
case SYNC -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromConnection(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("source_definition_name", sourceDefinition.getName());
metadata.put("source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("destination_definition_name", destinationDefinition.getName());
metadata.put("destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
}
final Builder<String, Object> metadataBuilder = generateMetadata(job);
metadataBuilder.put("attempt_stage", "STARTED");
track(metadataBuilder.build());
} catch (Exception e) {
LOGGER.error("failed while reporting usage.");
}
}

TrackingClientSingleton.get().track("job", metadata.build());
@VisibleForTesting
void trackCompletion(Job job, io.airbyte.workers.JobStatus status) {
try {
final Builder<String, Object> metadataBuilder = generateMetadata(job);
metadataBuilder.put("attempt_stage", "ENDED");
metadataBuilder.put("attempt_completion_status", status);
track(metadataBuilder.build());
} catch (Exception e) {
LOGGER.error("failed while reporting usage.");
}
}

private JobStatus getStatus(OutputAndStatus<?> output) {
switch (output.getStatus()) {
case SUCCEEDED:
return JobStatus.COMPLETED;
case FAILED:
return JobStatus.FAILED;
default:
throw new RuntimeException("Unknown state " + output.getStatus());
private void track(Map<String, Object> metadata) {
// do not track get spec. it is done frequently and not terribly interesting.
if (metadata.get("job_type").equals("GET_SPEC")) {
return;
}

TrackingClientSingleton.get().track("Connector Jobs", metadata);
}

private ImmutableMap.Builder<String, Object> generateMetadata(Job job) throws ConfigNotFoundException, IOException, JsonValidationException {
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("job_type", job.getConfig().getConfigType());
metadata.put("job_id", job.getId());
metadata.put("attempt_id", job.getAttempts());
// build deterministic job and attempt uuids based off of the scope,which should be unique across
// all instances of airbyte installed everywhere).
final UUID jobUuid = UUID.nameUUIDFromBytes((job.getScope() + job.getId() + job.getAttempts()).getBytes(Charsets.UTF_8));
final UUID attemptUuid = UUID.nameUUIDFromBytes((job.getScope() + job.getId() + job.getAttempts()).getBytes(Charsets.UTF_8));
metadata.put("job_uuid", jobUuid);
metadata.put("attempt_uuid", attemptUuid);

switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE, DISCOVER_SCHEMA -> {
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromSource(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
}
case CHECK_CONNECTION_DESTINATION -> {
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromDestination(UUID.fromString(ScopeHelper.getConfigId(job.getScope())));

metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());
}
case GET_SPEC -> {
// no op because this will be noisy as heck.
}
case SYNC -> {
final UUID connectionId = UUID.fromString(ScopeHelper.getConfigId(job.getScope()));
final StandardSyncSchedule schedule = configRepository.getStandardSyncSchedule(connectionId);
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromConnection(connectionId);

metadata.put("connection_id", connectionId);
metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());

String frequencyString;
if (schedule.getManual()) {
frequencyString = "manual";
} else {
final long intervalInMinutes = TimeUnit.SECONDS.toMinutes(ScheduleHelpers.getIntervalInSecond(schedule.getSchedule()));
frequencyString = intervalInMinutes + " min";
}
metadata.put("frequency", frequencyString);
}
}
return metadata;
}

private static JobStatus getStatus(OutputAndStatus<?> output) {
return switch (output.getStatus()) {
case SUCCEEDED -> JobStatus.COMPLETED;
case FAILED -> JobStatus.FAILED;
default -> throw new IllegalStateException("Unknown state " + output.getStatus());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably log instead of throw an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb -- I misread. This is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thoguth this was only used in tracking code not in business logic. Wouldn't have wanted to fail over tracking issues.

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package io.airbyte.scheduler;

import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import java.time.Instant;
Expand Down Expand Up @@ -79,12 +78,8 @@ private boolean isTimeForNewJob(Optional<Job> previousJobOptional, StandardSyncS
}

final Job previousJob = previousJobOptional.get();
long nextRunStart = previousJob.getUpdatedAtInSecond() + getIntervalInSecond(standardSyncSchedule.getSchedule());
long nextRunStart = previousJob.getUpdatedAtInSecond() + ScheduleHelpers.getIntervalInSecond(standardSyncSchedule.getSchedule());
return nextRunStart < timeSupplier.get().getEpochSecond();
}

private static Long getIntervalInSecond(Schedule schedule) {
return ScheduleHelpers.getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}

}
Loading