diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index 1421de6da4f9..8f93185c9410 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation 'io.temporal:temporal-sdk:1.8.1' implementation 'org.apache.ant:ant:1.10.10' implementation 'org.apache.commons:commons-text:1.10.0' + implementation libs.bundles.datadog implementation project(':airbyte-api') implementation project(':airbyte-commons-protocol') diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationWorker.java index abc454b28787..111b4041ae55 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationWorker.java @@ -4,14 +4,21 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.OperatorDbtInput; import io.airbyte.config.ResourceRequirements; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.workers.Worker; import io.airbyte.workers.exception.WorkerException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +47,12 @@ public DbtTransformationWorker(final String jobId, this.cancelled = new AtomicBoolean(false); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) throws WorkerException { final long startTime = System.currentTimeMillis(); LineGobbler.startSection("DBT TRANSFORMATION"); + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot)); try (dbtTransformationRunner) { LOGGER.info("Running dbt transformation."); @@ -59,6 +68,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr throw new WorkerException("DBT Transformation Failed."); } } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); throw new WorkerException("Dbt Transformation Failed.", e); } if (cancelled.get()) { @@ -72,6 +82,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr return null; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { LOGGER.info("Cancelling Dbt Transformation runner..."); @@ -79,7 +90,8 @@ public void cancel() { cancelled.set(true); dbtTransformationRunner.close(); } catch (final Exception e) { - e.printStackTrace(); + ApmTraceUtils.addExceptionToTrace(e); + LOGGER.error("Unable to cancel Dbt Transformation runner.", e); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index c29af2c84a96..66bc2d271b24 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -4,6 +4,10 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; @@ -13,6 +17,7 @@ import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -52,10 +57,11 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche this(integrationLauncher, new DefaultAirbyteStreamFactory()); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException { LineGobbler.startSection("CHECK"); - + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot)); try { process = integrationLauncher.check( jobRoot, @@ -95,11 +101,13 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa } } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Unexpected error while checking connection: ", e); throw new WorkerException("Unexpected error while getting checking connection.", e); } } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { WorkerUtils.cancelProcess(process); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 209fdbcdb559..8c5a961f1fdd 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -4,6 +4,12 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTOR_VERSION_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -11,6 +17,7 @@ import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -23,6 +30,7 @@ import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -56,8 +64,10 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository, this(configRepository, integrationLauncher, new DefaultAirbyteStreamFactory()); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException { + ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot)); try { process = integrationLauncher.discover( jobRoot, @@ -101,12 +111,32 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI String.format("Discover job subprocess finished with exit code %s", exitCode)); } } catch (final WorkerException e) { + ApmTraceUtils.addExceptionToTrace(e); throw e; } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); throw new WorkerException("Error while discovering schema", e); } } + private Map generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) { + final Map tags = new HashMap<>(); + + tags.put(JOB_ROOT_KEY, jobRoot); + + if (discoverSchemaInput != null) { + if (discoverSchemaInput.getSourceId() != null) { + tags.put(SOURCE_ID_KEY, discoverSchemaInput.getSourceId()); + } + if (discoverSchemaInput.getConnectorVersion() != null) { + tags.put(CONNECTOR_VERSION_KEY, discoverSchemaInput.getConnectorVersion()); + } + } + + return tags; + } + + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { WorkerUtils.cancelProcess(process); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index cf6fbbb417a7..867093e23e22 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -4,11 +4,17 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConnectorSpecification; @@ -47,8 +53,10 @@ public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher) { this(integrationLauncher, new DefaultAirbyteStreamFactory()); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) throws WorkerException { + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, config.getDockerImage())); try { process = integrationLauncher.spec(jobRoot); @@ -90,6 +98,7 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { WorkerUtils.cancelProcess(process); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java index d48c196b9ff3..00a4a45b3e94 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java @@ -4,11 +4,17 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + +import datadog.trace.api.Trace; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.FailureReason; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; @@ -19,6 +25,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -50,10 +57,13 @@ public DefaultNormalizationWorker(final String jobId, this.cancelled = new AtomicBoolean(false); } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public NormalizationSummary run(final NormalizationInput input, final Path jobRoot) throws WorkerException { final long startTime = System.currentTimeMillis(); + ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId, JOB_ROOT_KEY, jobRoot)); + try (normalizationRunner) { LineGobbler.startSection("DEFAULT NORMALIZATION"); normalizationRunner.start(); @@ -69,6 +79,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo buildFailureReasonsAndSetFailure(); } } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); buildFailureReasonsAndSetFailure(); } @@ -105,6 +116,7 @@ private void buildFailureReasonsAndSetFailure() { failed = true; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { LOGGER.info("Cancelling normalization runner..."); @@ -112,7 +124,8 @@ public void cancel() { cancelled.set(true); normalizationRunner.close(); } catch (final Exception e) { - e.printStackTrace(); + ApmTraceUtils.addExceptionToTrace(e); + LOGGER.error("Unable to cancel normalization runner.", e); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 68f433a7072e..6ab4ef51bab7 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -4,7 +4,13 @@ package io.airbyte.workers.general; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; + import com.fasterxml.jackson.databind.ObjectMapper; +import datadog.trace.api.Trace; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.FailureReason; import io.airbyte.config.ReplicationAttemptSummary; @@ -16,6 +22,7 @@ import io.airbyte.config.SyncStats; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -120,6 +127,7 @@ public DefaultReplicationWorker(final String jobId, * @return output of the replication attempt (including state) * @throws WorkerException */ + @Trace(operationName = WORKER_OPERATION_NAME) @Override public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt); @@ -146,6 +154,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path final Map mdc = MDC.getCopyOfContextMap(); + ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot)); + // note: resources are closed in the opposite order in which they are declared. thus source will be // closed first (which is what we want). try (destination; source) { @@ -195,6 +205,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path } catch (final Exception e) { hasFailed.set(true); + ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Sync worker failed.", e); } finally { executors.shutdownNow(); @@ -321,6 +332,7 @@ else if (hasFailed.get()) { LineGobbler.endSection("REPLICATION"); return output; } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); throw new WorkerException("Sync failed", e); } @@ -489,6 +501,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de }; } + @Trace(operationName = WORKER_OPERATION_NAME) @Override public void cancel() { // Resources are closed in the opposite order they are declared. @@ -496,7 +509,8 @@ public void cancel() { try { executors.awaitTermination(10, TimeUnit.SECONDS); } catch (final InterruptedException e) { - e.printStackTrace(); + ApmTraceUtils.addExceptionToTrace(e); + LOGGER.error("Unable to cancel due to interruption.", e); } cancelled.set(true); @@ -504,6 +518,7 @@ public void cancel() { try { destination.cancel(); } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); LOGGER.info("Error cancelling destination: ", e); } @@ -511,11 +526,27 @@ public void cancel() { try { source.cancel(); } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); LOGGER.info("Error cancelling source: ", e); } } + private Map generateTraceTags(final WorkerDestinationConfig destinationConfig, final Path jobRoot) { + final Map tags = new HashMap<>(); + + tags.put(JOB_ID_KEY, jobId); + tags.put(JOB_ROOT_KEY, jobRoot); + + if (destinationConfig != null) { + if (destinationConfig.getConnectionId() != null) { + tags.put(CONNECTION_ID_KEY, destinationConfig.getConnectionId()); + } + } + + return tags; + } + private static class SourceException extends RuntimeException { SourceException(final String message) { diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java index 7911a0545dc7..fe3d6da7dab7 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/DbtJobOrchestrator.java @@ -4,9 +4,9 @@ package io.airbyte.container_orchestrator; -import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.TemporalUtils; diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java index dbb4ada1f4c8..1c602bb35c1b 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/NormalizationJobOrchestrator.java @@ -4,9 +4,9 @@ package io.airbyte.container_orchestrator; -import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java index b7f093434a68..c33c7ccabc80 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java @@ -4,10 +4,10 @@ package io.airbyte.container_orchestrator; -import static io.airbyte.container_orchestrator.TraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.container_orchestrator.TraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.JOB_ORCHESTRATOR_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.features.FeatureFlags; diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/TraceConstants.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/TraceConstants.java deleted file mode 100644 index 00c5263c171d..000000000000 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/TraceConstants.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.container_orchestrator; - -/** - * Collection of constants for APM tracing. - */ -public class TraceConstants { - - public static final String JOB_ORCHESTRATOR_OPERATION_NAME = "job.orchestrator"; - - private TraceConstants() {} - - /** - * Trace tag constants. - */ - public static final class Tags { - - /** - * Name of the APM trace tag that holds the destination Docker image value associated with the - * trace. - */ - public static final String DESTINATION_DOCKER_IMAGE_KEY = "destination.docker_image"; - - /** - * Name of the APM trace tag that holds the job ID value associated with the trace. - */ - public static final String JOB_ID_KEY = "job_id"; - - /** - * Name of the APM trace tag that holds the source Docker image value associated with the trace. - */ - public static final String SOURCE_DOCKER_IMAGE_KEY = "source.docker_image"; - - private Tags() {} - - } - -} diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java index 32062cabfe46..d642345e8050 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java @@ -167,8 +167,7 @@ public static SSLContext createContextFromCaCert(String caCertificate) { try { CertificateFactory factory = CertificateFactory.getInstance(X509); Certificate trustedCa = factory.generateCertificate( - new ByteArrayInputStream(caCertificate.getBytes(StandardCharsets.UTF_8)) - ); + new ByteArrayInputStream(caCertificate.getBytes(StandardCharsets.UTF_8))); KeyStore trustStore = KeyStore.getInstance(PKCS_12); trustStore.load(null, null); trustStore.setCertificateEntry("ca", trustedCa); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/trace/TemporalTraceConstants.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java similarity index 50% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/trace/TemporalTraceConstants.java rename to airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java index 80f6aa6b8c19..c636a9d935d6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/trace/TemporalTraceConstants.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java @@ -2,24 +2,34 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.trace; +package io.airbyte.metrics.lib; /** - * Collection of constants for APM tracing of Temporal activities and workflows. + * Collection of constants for APM tracing. */ -public final class TemporalTraceConstants { +public final class ApmTraceConstants { /** * Operation name for an APM trace of a Temporal activity. */ public static final String ACTIVITY_TRACE_OPERATION_NAME = "activity"; + /** + * Operation name for an APM trace of a job orchestrator. + */ + public static final String JOB_ORCHESTRATOR_OPERATION_NAME = "job.orchestrator"; + + /** + * Operation name for an APM trace of a worker implementation. + */ + public static final String WORKER_OPERATION_NAME = "worker"; + /** * Operation name for an APM trace of a Temporal workflow. */ public static final String WORKFLOW_TRACE_OPERATION_NAME = "workflow"; - private TemporalTraceConstants() {} + private ApmTraceConstants() {} /** * Trace tag constants. @@ -27,10 +37,16 @@ private TemporalTraceConstants() {} public static final class Tags { /** - * Name of the APM trace tag that holds the connection ID value associated with the trace. + * Name of the APM trace tag that holds the destination Docker image value associated with the + * trace. */ public static final String CONNECTION_ID_KEY = "connection_id"; + /** + * Name of the APM trace tag that holds the connector version value associated with the trace. + */ + public static final String CONNECTOR_VERSION_KEY = "connector_version"; + /** * Name of the APM trace tag that holds the destination Docker image value associated with the * trace. @@ -47,11 +63,26 @@ public static final class Tags { */ public static final String JOB_ID_KEY = "job_id"; + /** + * Name of the APM trace tag that holds the job root value associated with the trace. + */ + public static final String JOB_ROOT_KEY = "job_root"; + /** * Name of the APM trace tag that holds the source Docker image value associated with the trace. */ public static final String SOURCE_DOCKER_IMAGE_KEY = "source.docker_image"; + /** + * Name of the APM trace tag that holds the source ID value associated with the trace. + */ + public static final String SOURCE_ID_KEY = "source.id"; + + /** + * Name of the APM trace tag that holds the webhook config ID value associated with the trace. + */ + public static final String WEBHOOK_CONFIG_ID_KEY = "webhook.config_id"; + private Tags() {} } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java index e7dd884a3a17..ab1ea21964cf 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java @@ -5,6 +5,8 @@ package io.airbyte.metrics.lib; import io.opentracing.Span; +import io.opentracing.log.Fields; +import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; import java.util.Map; @@ -45,7 +47,7 @@ public static void addTagsToTrace(final Map tags, final String t } /** - * Adds all the provided tags to the currently active span, if one exists. + * Adds all the provided tags to the provided span, if one exists. * * @param span The {@link Span} that will be associated with the tags. * @param tags A map of tags to be added to the currently active span. @@ -59,4 +61,26 @@ public static void addTagsToTrace(final Span span, final Map tag } } + /** + * Adds an exception to the currently active span, if one exists. + * + * @param e The {@link Exception} to be added to the currently active span. + */ + public static void addExceptionToTrace(final Exception e) { + addExceptionToTrace(GlobalTracer.get().activeSpan(), e); + } + + /** + * Adds an exception to the provided span, if one exists. + * + * @param span The {@link Span} that will be associated with the exception. + * @param e The {@link Exception} to be added to the provided span. + */ + public static void addExceptionToTrace(final Span span, final Exception e) { + if (span != null) { + span.setTag(Tags.ERROR, true); + span.log(Map.of(Fields.ERROR_OBJECT, e)); + } + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 937935134f19..608c50630095 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.check.connection; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java index a6f24a32c835..5a00a44f6c11 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.check.connection; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 94da4bc7e348..4626ffc1aebb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.discover.catalog; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java index fafb9ca380c5..127ed8162d96 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.discover.catalog; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.scheduling.DiscoverCatalogWorkflow; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index a6e662a205fb..b35e1680a4ea 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.scheduling; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java index b07f704e3e8d..661b487fa86b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java @@ -4,11 +4,11 @@ package io.airbyte.workers.temporal.scheduling.activities; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.persistence.job.JobNotifier.CONNECTION_DISABLED_NOTIFICATION; import static io.airbyte.persistence.job.JobNotifier.CONNECTION_DISABLED_WARNING_NOTIFICATION; import static io.airbyte.persistence.job.models.Job.REPLICATION_TYPES; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; import static java.time.temporal.ChronoUnit.DAYS; import datadog.trace.api.Trace; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 11b1b9f7f75b..5005526cb222 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.config.WorkerMode; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java index 3d59402bbdda..28c00e99fe0e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.config.WorkerMode; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index eae169f52d11..3633af029aed 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 0dd4cffe1c27..22bd8ac5dc8f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -5,10 +5,10 @@ package io.airbyte.workers.temporal.scheduling.activities; import static io.airbyte.config.JobConfig.ConfigType.SYNC; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.persistence.job.models.AttemptStatus.FAILED; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; import com.google.common.collect.Lists; import datadog.trace.api.Trace; @@ -380,24 +380,24 @@ public void ensureCleanJobState(final EnsureCleanJobStateInput input) { } @Override - public boolean isLastJobOrAttemptFailure(JobCheckFailureInput input) { + public boolean isLastJobOrAttemptFailure(final JobCheckFailureInput input) { final int limit = 2; boolean lastAttemptCheck = false; boolean lastJobCheck = false; - Set configTypes = new HashSet<>(); + final Set configTypes = new HashSet<>(); configTypes.add(SYNC); try { - List jobList = jobPersistence.listJobsIncludingId(configTypes, input.getConnectionId().toString(), input.getJobId(), limit); - Optional optionalActiveJob = jobList.stream().filter(job -> job.getId() == input.getJobId()).findFirst(); + final List jobList = jobPersistence.listJobsIncludingId(configTypes, input.getConnectionId().toString(), input.getJobId(), limit); + final Optional optionalActiveJob = jobList.stream().filter(job -> job.getId() == input.getJobId()).findFirst(); if (optionalActiveJob.isPresent()) { lastAttemptCheck = checkActiveJobPreviousAttempt(optionalActiveJob.get(), input.getAttemptId()); } - OptionalLong previousJobId = getPreviousJobId(input.getJobId(), jobList.stream().map(Job::getId).toList()); + final OptionalLong previousJobId = getPreviousJobId(input.getJobId(), jobList.stream().map(Job::getId).toList()); if (previousJobId.isPresent()) { - Optional optionalPreviousJob = jobList.stream().filter(job -> job.getId() == previousJobId.getAsLong()).findFirst(); + final Optional optionalPreviousJob = jobList.stream().filter(job -> job.getId() == previousJobId.getAsLong()).findFirst(); if (optionalPreviousJob.isPresent()) { lastJobCheck = optionalPreviousJob.get().getStatus().equals(io.airbyte.persistence.job.models.JobStatus.FAILED); } @@ -409,18 +409,18 @@ public boolean isLastJobOrAttemptFailure(JobCheckFailureInput input) { } } - private OptionalLong getPreviousJobId(Long activeJobId, List jobIdsList) { + private OptionalLong getPreviousJobId(final Long activeJobId, final List jobIdsList) { return jobIdsList.stream() .filter(jobId -> !Objects.equals(jobId, activeJobId)) .mapToLong(jobId -> jobId).max(); } - private boolean checkActiveJobPreviousAttempt(Job activeJob, int attemptId) { + private boolean checkActiveJobPreviousAttempt(final Job activeJob, final int attemptId) { final int minAttemptSize = 1; boolean result = false; if (activeJob.getAttempts().size() > minAttemptSize) { - Optional optionalAttempt = activeJob.getAttempts().stream() + final Optional optionalAttempt = activeJob.getAttempts().stream() .filter(attempt -> attempt.getId() == (attemptId - 1)).findFirst(); result = optionalAttempt.isPresent() && optionalAttempt.get().getStatus().equals(FAILED); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index 7c7708135f74..3d82095a5224 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.config.WorkerMode; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java index 95dd69e976b6..30cbacb97c55 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.exception.RetryableException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java index a879ad7fe669..a94f7c7c3e4c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.StreamResetRecordsHelper; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java index e77d9f48562d..4ddfda6be5c1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.config.WorkerMode; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 8e5cd32e486e..a80a1389cd52 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.spec; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java index 76805d794f4f..a13493f56ae1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.spec; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.scheduling.SpecWorkflow; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index cab8ad5ff8b7..df06d31662c0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.sync; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 4d954c8e2735..71d058051e0a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.sync; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index c46243c1e6ac..8d508538f214 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.sync; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.metrics.lib.ApmTraceUtils; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index e64339911be3..40c37265b79f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -5,9 +5,9 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.config.helpers.StateMessageHelper.isMigration; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import static io.airbyte.workers.helper.StateConverter.convertClientStateTypeToInternal; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index f0fbc5b075f3..f5ebbd7fafc5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -4,10 +4,10 @@ package io.airbyte.workers.temporal.sync; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; import datadog.trace.api.Trace; import edu.umd.cs.findbugs.annotations.Nullable; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index f2c6b3f021d1..8270db6c7493 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -4,11 +4,11 @@ package io.airbyte.workers.temporal.sync; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; -import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DESTINATION_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/WebhookOperationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/WebhookOperationActivityImpl.java index 8e55be47e09c..d4aaf732112f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/WebhookOperationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/WebhookOperationActivityImpl.java @@ -4,17 +4,23 @@ package io.airbyte.workers.temporal.sync; +import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.WEBHOOK_CONFIG_ID_KEY; + import com.fasterxml.jackson.databind.JsonNode; +import datadog.trace.api.Trace; import io.airbyte.commons.json.Jsons; import io.airbyte.config.OperatorWebhookInput; import io.airbyte.config.WebhookConfig; import io.airbyte.config.WebhookOperationConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.metrics.lib.ApmTraceUtils; import jakarta.inject.Singleton; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.Map; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +41,9 @@ public WebhookOperationActivityImpl(final HttpClient httpClient, final SecretsHy } + @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override - public boolean invokeWebhook(OperatorWebhookInput input) { + public boolean invokeWebhook(final OperatorWebhookInput input) { LOGGER.debug("Webhook operation input: {}", input); LOGGER.debug("Found webhook config: {}", input.getWorkspaceWebhookConfigs()); final JsonNode fullWebhookConfigJson = secretsHydrator.hydrate(input.getWorkspaceWebhookConfigs()); @@ -47,6 +54,7 @@ public boolean invokeWebhook(OperatorWebhookInput input) { throw new RuntimeException(String.format("Cannot find webhook config %s", input.getWebhookConfigId().toString())); } + ApmTraceUtils.addTagsToTrace(Map.of(WEBHOOK_CONFIG_ID_KEY, input.getWebhookConfigId())); LOGGER.info("Invoking webhook operation {}", webhookConfig.get().getName()); final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() @@ -64,14 +72,14 @@ public boolean invokeWebhook(OperatorWebhookInput input) { // TODO(mfsiega-airbyte): replace this loop with retries configured on the HttpClient impl. for (int i = 0; i < MAX_RETRIES; i++) { try { - HttpResponse response = this.httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + final HttpResponse response = this.httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); LOGGER.debug("Webhook response: {}", response == null ? null : response.body()); LOGGER.info("Webhook response status: {}", response == null ? "empty response" : response.statusCode()); // Return true if the request was successful. - boolean isSuccessful = response != null && response.statusCode() >= 200 && response.statusCode() <= 300; + final boolean isSuccessful = response != null && response.statusCode() >= 200 && response.statusCode() <= 300; LOGGER.info("Webhook {} execution status {}", webhookConfig.get().getName(), isSuccessful ? "successful" : "failed"); return isSuccessful; - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn(e.getMessage()); finalException = e; }