Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace errors in orchestrator workers #18381

Merged
merged 8 commits into from
Oct 24, 2022
1 change: 1 addition & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-text:1.10.0'
implementation libs.bundles.datadog

implementation project(':airbyte-api')
implementation project(':airbyte-commons-protocol')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.workers.Worker;
import io.airbyte.workers.exception.WorkerException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -40,10 +47,12 @@ public DbtTransformationWorker(final String jobId,
this.cancelled = new AtomicBoolean(false);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();
LineGobbler.startSection("DBT TRANSFORMATION");
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot));

try (dbtTransformationRunner) {
LOGGER.info("Running dbt transformation.");
Expand All @@ -59,6 +68,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr
throw new WorkerException("DBT Transformation Failed.");
}
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Dbt Transformation Failed.", e);
}
if (cancelled.get()) {
Expand All @@ -79,7 +89,7 @@ public void cancel() {
cancelled.set(true);
dbtTransformationRunner.close();
} catch (final Exception e) {
e.printStackTrace();
LOGGER.error("Unable to cancel Dbt Transformation runner.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth emitting an event in DD (instead of only logging)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I can trace the cancel call as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we missing a call to ApmTraceUtils.addExceptionToTrace(e); here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@colesnodgrass pointed out the same thing. I have added a @Trace to the cancel methods and a recording of the exception where applicable.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
Expand All @@ -13,6 +17,7 @@
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -52,10 +57,11 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche
this(integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
LineGobbler.startSection("CHECK");

ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot));
try {
process = integrationLauncher.check(
jobRoot,
Expand Down Expand Up @@ -95,6 +101,7 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
}

} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unexpected error while checking connection: ", e);
throw new WorkerException("Unexpected error while getting checking connection.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTOR_VERSION_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -23,6 +30,7 @@
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -56,8 +64,10 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
this(configRepository, integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot));
try {
process = integrationLauncher.discover(
jobRoot,
Expand Down Expand Up @@ -101,12 +111,31 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
String.format("Discover job subprocess finished with exit code %s", exitCode));
}
} catch (final WorkerException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw e;
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Error while discovering schema", e);
}
}

private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

tags.put(JOB_ROOT_KEY, jobRoot);

if (discoverSchemaInput != null) {
if (discoverSchemaInput.getSourceId() != null) {
tags.put(SOURCE_ID_KEY, discoverSchemaInput.getSourceId());
}
if (discoverSchemaInput.getConnectorVersion() != null) {
tags.put(CONNECTOR_VERSION_KEY, discoverSchemaInput.getConnectorVersion());
}
}

return tags;
}

@Override
public void cancel() {
WorkerUtils.cancelProcess(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -47,8 +53,10 @@ public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, config.getDockerImage()));
try {
process = integrationLauncher.spec(jobRoot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand All @@ -19,6 +25,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,10 +57,13 @@ public DefaultNormalizationWorker(final String jobId,
this.cancelled = new AtomicBoolean(false);
}

@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public NormalizationSummary run(final NormalizationInput input, final Path jobRoot) throws WorkerException {
final long startTime = System.currentTimeMillis();

ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot));

try (normalizationRunner) {
LineGobbler.startSection("DEFAULT NORMALIZATION");
normalizationRunner.start();
Expand All @@ -69,6 +79,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
buildFailureReasonsAndSetFailure();
}
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
buildFailureReasonsAndSetFailure();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package io.airbyte.workers.general;

import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
Expand All @@ -16,6 +22,7 @@
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -120,6 +127,7 @@ public DefaultReplicationWorker(final String jobId,
* @return output of the replication attempt (including state)
* @throws WorkerException
*/
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);
Expand All @@ -146,6 +154,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path

final Map<String, String> mdc = MDC.getCopyOfContextMap();

ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot));

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
Expand Down Expand Up @@ -195,6 +205,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path

} catch (final Exception e) {
hasFailed.set(true);
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Sync worker failed.", e);
} finally {
executors.shutdownNow();
Expand Down Expand Up @@ -321,6 +332,7 @@ else if (hasFailed.get()) {
LineGobbler.endSection("REPLICATION");
return output;
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Sync failed", e);
}

Expand Down Expand Up @@ -516,6 +528,21 @@ public void cancel() {

}

private Map<String, Object> generateTraceTags(final WorkerDestinationConfig destinationConfig, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

tags.put(JOB_ID_KEY, jobId);
tags.put(JOB_ROOT_KEY, jobRoot);

if (destinationConfig != null) {
if (destinationConfig.getConnectionId() != null) {
tags.put(CONNECTION_ID_KEY, destinationConfig.getConnectionId());
}
}

return tags;
}

private static class SourceException extends RuntimeException {

SourceException(final String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package io.airbyte.container_orchestrator;

import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package io.airbyte.container_orchestrator;

import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.json.Jsons;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package io.airbyte.container_orchestrator;

import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.container_orchestrator.TraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.features.FeatureFlags;
Expand Down
Loading