diff --git a/.env b/.env
index 9ce529c0050ba..3177aa4de26ae 100644
--- a/.env
+++ b/.env
@@ -70,10 +70,9 @@ JOB_MAIN_CONTAINER_MEMORY_LIMIT=
 
 ### LOGGING/MONITORING/TRACKING ###
 TRACKING_STRATEGY=segment
+JOB_ERROR_REPORTING_STRATEGY=logging
 # Although not present as an env var, expected by Log4J configuration.
 LOG_LEVEL=INFO
-# Although not present as an env var, helps Airbyte track job healthiness.
-SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6102835"
 
 
 ### APPLICATIONS ###
diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java
index c480cefde2981..d5f7d0ab8ebb9 100644
--- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java
+++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java
@@ -466,6 +466,17 @@ public interface Configs {
    */
   TrackingStrategy getTrackingStrategy();
 
+  /**
+   * Define whether to send job failure events to Sentry or log-only. Airbyte internal use.
+   */
+  JobErrorReportingStrategy getJobErrorReportingStrategy();
+
+  /**
+   * Determines the Sentry DSN that should be used when reporting connector job failures to Sentry.
+   * Used with SENTRY error reporting strategy. Airbyte internal use.
+   */
+  String getJobErrorReportingSentryDSN();
+
   // APPLICATIONS
   // Worker
   /**
@@ -578,6 +589,11 @@ enum TrackingStrategy {
     LOGGING
   }
 
+  enum JobErrorReportingStrategy {
+    SENTRY,
+    LOGGING
+  }
+
   enum WorkerEnvironment {
     DOCKER,
     KUBERNETES
diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java
index 52fd6cc239b0b..adb6e69edec3a 100644
--- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java
+++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java
@@ -50,6 +50,8 @@ public class EnvConfigs implements Configs {
   public static final String CONFIG_ROOT = "CONFIG_ROOT";
   public static final String DOCKER_NETWORK = "DOCKER_NETWORK";
   public static final String TRACKING_STRATEGY = "TRACKING_STRATEGY";
+  public static final String JOB_ERROR_REPORTING_STRATEGY = "JOB_ERROR_REPORTING_STRATEGY";
+  public static final String JOB_ERROR_REPORTING_SENTRY_DSN = "JOB_ERROR_REPORTING_SENTRY_DSN";
   public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";
   public static final String DATABASE_USER = "DATABASE_USER";
   public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
@@ -805,6 +807,23 @@ public TrackingStrategy getTrackingStrategy() {
     });
   }
 
+  @Override
+  public JobErrorReportingStrategy getJobErrorReportingStrategy() {
+    return getEnvOrDefault(JOB_ERROR_REPORTING_STRATEGY, JobErrorReportingStrategy.LOGGING, s -> {
+      try {
+        return JobErrorReportingStrategy.valueOf(s.toUpperCase());
+      } catch (final IllegalArgumentException e) {
+        LOGGER.info(s + " not recognized, defaulting to " + JobErrorReportingStrategy.LOGGING);
+        return JobErrorReportingStrategy.LOGGING;
+      }
+    });
+  }
+
+  @Override
+  public String getJobErrorReportingSentryDSN() {
+    return getEnvOrDefault(JOB_ERROR_REPORTING_SENTRY_DSN, "");
+  }
+
   // APPLICATIONS
   // Worker
   @Override
diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java
index ff9ff1cbe6fe9..d44510c868fb1 100644
--- a/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java
+++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java
@@ -8,6 +8,7 @@
 
 import io.airbyte.commons.version.AirbyteVersion;
 import io.airbyte.config.Configs.DeploymentMode;
+import io.airbyte.config.Configs.JobErrorReportingStrategy;
 import io.airbyte.config.Configs.WorkerEnvironment;
 import java.nio.file.Paths;
 import java.util.HashMap;
@@ -178,6 +179,27 @@ void testTrackingStrategy() {
     assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
   }
 
+  @Test
+  void testErrorReportingStrategy() {
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, null);
+    assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());
+
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "abc");
+    assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());
+
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "logging");
+    assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());
+
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "sentry");
+    assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());
+
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "LOGGING");
+    assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());
+
+    envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "SENTRY");
+    assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());
+  }
+
   @Test
   void testDeploymentMode() {
     envMap.put(EnvConfigs.DEPLOYMENT_MODE, null);
diff --git a/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java b/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java
index a25d56451edbf..ed2f1de729d96 100644
--- a/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java
+++ b/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java
@@ -109,6 +109,14 @@ public Optional<JobOutput> getSuccessOutput() {
     return getSuccessfulAttempt().flatMap(Attempt::getOutput);
   }
 
+  public Optional<Attempt> getLastFailedAttempt() {
+    return getAttempts()
+        .stream()
+        .sorted(Comparator.comparing(Attempt::getCreatedAtInSecond).reversed())
+        .filter(a -> a.getStatus() == AttemptStatus.FAILED)
+        .findFirst();
+  }
+
   public Optional<Attempt> getLastAttemptWithOutput() {
     return getAttempts()
         .stream()
diff --git a/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java b/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java
index 8fde2d1e75d95..e81a15bf58f21 100644
--- a/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java
+++ b/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java
@@ -10,9 +10,9 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.junit.jupiter.api.Test;
 
 class JobTest {
@@ -42,8 +42,8 @@ void testHasRunningAttempt() {
   }
 
   private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) {
-    final List<Attempt> attempts = Arrays.stream(attemptStatuses)
-        .map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null))
+    final List<Attempt> attempts = IntStream.range(0, attemptStatuses.length)
+        .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, idx, 0L, null))
         .collect(Collectors.toList());
     return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
   }
@@ -60,6 +60,17 @@ void testGetSuccessfulAttempt() {
     assertEquals(job.getAttempts().get(1), job.getSuccessfulAttempt().get());
   }
 
+  @Test
+  void testGetLastFailedAttempt() {
+    assertTrue(jobWithAttemptWithStatus().getLastFailedAttempt().isEmpty());
+    assertTrue(jobWithAttemptWithStatus(AttemptStatus.SUCCEEDED).getLastFailedAttempt().isEmpty());
+    assertTrue(jobWithAttemptWithStatus(AttemptStatus.FAILED).getLastFailedAttempt().isPresent());
+
+    final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED);
+    assertTrue(job.getLastFailedAttempt().isPresent());
+    assertEquals(2, job.getLastFailedAttempt().get().getId());
+  }
+
   @Test
   void testValidateStatusTransitionFromPending() {
     final Job pendingJob = jobWithStatus(JobStatus.PENDING);
diff --git a/airbyte-scheduler/scheduler-persistence/build.gradle b/airbyte-scheduler/scheduler-persistence/build.gradle
index ef970f1890299..c40c4355a6ae9 100644
--- a/airbyte-scheduler/scheduler-persistence/build.gradle
+++ b/airbyte-scheduler/scheduler-persistence/build.gradle
@@ -3,6 +3,8 @@ plugins {
 }
 
 dependencies {
+    implementation 'io.sentry:sentry:6.1.0'
+
     implementation project(':airbyte-analytics')
     implementation project(':airbyte-commons-docker')
     implementation project(':airbyte-config:config-models')
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java
new file mode 100644
index 0000000000000..c82cae5dcd955
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.config.AttemptFailureSummary;
+import io.airbyte.config.Configs.DeploymentMode;
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.FailureReason.FailureOrigin;
+import io.airbyte.config.JobSyncConfig;
+import io.airbyte.config.StandardDestinationDefinition;
+import io.airbyte.config.StandardSourceDefinition;
+import io.airbyte.config.StandardWorkspace;
+import io.airbyte.config.persistence.ConfigRepository;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobErrorReporter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobErrorReporter.class);
+
+  private static final String FROM_TRACE_MESSAGE = "from_trace_message";
+  private static final String DEPLOYMENT_MODE_META_KEY = "deployment_mode";
+  private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version";
+  private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
+  private static final String FAILURE_TYPE_META_KEY = "failure_type";
+  private static final String CONNECTION_ID_META_KEY = "connection_id";
+  private static final String CONNECTOR_NAME_META_KEY = "connector_name";
+  private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
+  private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
+
+  private final ConfigRepository configRepository;
+  private final DeploymentMode deploymentMode;
+  private final String airbyteVersion;
+  private final JobErrorReportingClient jobErrorReportingClient;
+
+  public JobErrorReporter(final ConfigRepository configRepository,
+                          final DeploymentMode deploymentMode,
+                          final String airbyteVersion,
+                          final JobErrorReportingClient jobErrorReportingClient) {
+
+    this.configRepository = configRepository;
+    this.deploymentMode = deploymentMode;
+    this.airbyteVersion = airbyteVersion;
+    this.jobErrorReportingClient = jobErrorReportingClient;
+  }
+
+  /**
+   * Reports a Sync Job's connector-caused FailureReasons to the JobErrorReportingClient
+   *
+   * @param connectionId - connection that had the failure
+   * @param failureSummary - final attempt failure summary
+   * @param jobSyncConfig - config for the sync job
+   */
+  public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) {
+    final List<FailureReason> traceMessageFailures = failureSummary.getFailures().stream()
+        .filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE))
+        .toList();
+
+    final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true);
+
+    for (final FailureReason failureReason : traceMessageFailures) {
+      final FailureOrigin failureOrigin = failureReason.getFailureOrigin();
+
+      final HashMap<String, String> metadata = new HashMap<>();
+      metadata.put(CONNECTION_ID_META_KEY, connectionId.toString());
+      metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion);
+      metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name());
+      metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value());
+      metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());
+
+      try {
+        if (failureOrigin == FailureOrigin.SOURCE) {
+          final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
+          final String dockerImage = jobSyncConfig.getSourceDockerImage();
+
+          metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString());
+          metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName());
+          metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value());
+
+          jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
+        } else if (failureOrigin == FailureOrigin.DESTINATION) {
+          final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
+          final String dockerImage = jobSyncConfig.getDestinationDockerImage();
+
+          metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString());
+          metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName());
+          metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value());
+
+          jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
+        }
+      } catch (final Exception e) {
+        LOGGER.error("Error when reporting job failure reason: {}", failureReason, e);
+      }
+    }
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java
new file mode 100644
index 0000000000000..3d52f558b667e
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.StandardWorkspace;
+import java.util.Map;
+
+/**
+ * A generic interface for a client that reports errors
+ */
+public interface JobErrorReportingClient {
+
+  /**
+   * Report a job failure reason
+   */
+  void reportJobFailureReason(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map<String, String> metadata);
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java
new file mode 100644
index 0000000000000..e24586781fc75
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.config.Configs;
+import io.airbyte.config.Configs.JobErrorReportingStrategy;
+
+public class JobErrorReportingClientFactory {
+
+  /**
+   * Creates an error reporting client based on the desired strategy to use
+   *
+   * @param strategy - which type of error reporting client should be created
+   * @return JobErrorReportingClient
+   */
+  public static JobErrorReportingClient getClient(final JobErrorReportingStrategy strategy, final Configs configs) {
+    return switch (strategy) {
+      case SENTRY -> new SentryJobErrorReportingClient(configs.getJobErrorReportingSentryDSN(), new SentryExceptionHelper());
+      case LOGGING -> new LoggingJobErrorReportingClient();
+    };
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java
new file mode 100644
index 0000000000000..cf1cebf1404b7
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.StandardWorkspace;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingJobErrorReportingClient implements JobErrorReportingClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LoggingJobErrorReportingClient.class);
+
+  @Override
+  public void reportJobFailureReason(final StandardWorkspace workspace,
+                                     final FailureReason reason,
+                                     final String dockerImage,
+                                     final Map<String, String> metadata) {
+    LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}",
+        workspace.getWorkspaceId(),
+        dockerImage,
+        reason,
+        metadata);
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java
new file mode 100644
index 0000000000000..1fe083490c120
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.commons.lang.Exceptions;
+import io.sentry.protocol.SentryException;
+import io.sentry.protocol.SentryStackFrame;
+import io.sentry.protocol.SentryStackTrace;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SentryExceptionHelper {
+
+  /**
+   * Processes a raw stacktrace string into structured SentryExceptions
+   * <p>
+   * Currently, Java and Python stacktraces are supported. If an unsupported stacktrace format is
+   * encountered, an empty optional will be returned, in which case we can fall back to alternate
+   * grouping.
+   */
+  public Optional<List<SentryException>> buildSentryExceptions(final String stacktrace) {
+    return Exceptions.swallowWithDefault(() -> {
+      if (stacktrace.startsWith("Traceback (most recent call last):")) {
+        return buildPythonSentryExceptions(stacktrace);
+      }
+      if (stacktrace.contains("\tat ") && stacktrace.contains(".java")) {
+        return buildJavaSentryExceptions(stacktrace);
+      }
+
+      return Optional.empty();
+    }, Optional.empty());
+  }
+
+  private static Optional<List<SentryException>> buildPythonSentryExceptions(final String stacktrace) {
+    final List<SentryException> sentryExceptions = new ArrayList<>();
+
+    // separate chained exceptions
+    // e.g "\n\nThe above exception was the direct cause of the following exception:\n\n"
+    // "\n\nDuring handling of the above exception, another exception occurred:\n\n"
+    final String exceptionSeparator = "\n\n[\\w ,]+:\n\n";
+    final String[] exceptions = stacktrace.split(exceptionSeparator);
+
+    for (final String exceptionStr : exceptions) {
+      final SentryStackTrace stackTrace = new SentryStackTrace();
+      final List<SentryStackFrame> stackFrames = new ArrayList<>();
+
+      // Use a regex to grab stack trace frame information
+      final Pattern framePattern = Pattern.compile("File \"(?<absPath>.+)\", line (?<lineno>\\d+), in (?<function>.+)\\n {4}(?<contextLine>.+)\\n");
+      final Matcher matcher = framePattern.matcher(exceptionStr);
+      int lastMatchIdx = -1;
+
+      while (matcher.find()) {
+        final String absPath = matcher.group("absPath");
+        final String lineno = matcher.group("lineno");
+        final String function = matcher.group("function");
+        final String contextLine = matcher.group("contextLine");
+
+        final SentryStackFrame stackFrame = new SentryStackFrame();
+        stackFrame.setAbsPath(absPath);
+        stackFrame.setLineno(Integer.valueOf(lineno));
+        stackFrame.setFunction(function);
+        stackFrame.setContextLine(contextLine);
+        stackFrames.add(stackFrame);
+
+        lastMatchIdx = matcher.end();
+      }
+
+      if (stackFrames.size() > 0) {
+        stackTrace.setFrames(stackFrames);
+
+        final SentryException sentryException = new SentryException();
+        sentryException.setStacktrace(stackTrace);
+
+        // The final part of our stack trace has the exception type and (optionally) a value
+        // (e.g. "RuntimeError: This is the value")
+        final String remaining = exceptionStr.substring(lastMatchIdx);
+        final String[] parts = remaining.split(":", 2);
+
+        if (parts.length > 0) {
+          sentryException.setType(parts[0].trim());
+          if (parts.length == 2) {
+            sentryException.setValue(parts[1].trim());
+          }
+
+          sentryExceptions.add(sentryException);
+        }
+      }
+    }
+
+    if (sentryExceptions.size() == 0)
+      return Optional.empty();
+
+    return Optional.of(sentryExceptions);
+  }
+
+  private static Optional<List<SentryException>> buildJavaSentryExceptions(final String stacktrace) {
+    final List<SentryException> sentryExceptions = new ArrayList<>();
+
+    // separate chained exceptions
+    // e.g "\nCaused By: "
+    final String exceptionSeparator = "\n[\\w ]+: ";
+    final String[] exceptions = stacktrace.split(exceptionSeparator);
+
+    for (final String exceptionStr : exceptions) {
+      final SentryStackTrace stackTrace = new SentryStackTrace();
+      final List<SentryStackFrame> stackFrames = new ArrayList<>();
+
+      // Use a regex to grab stack trace frame information
+      final Pattern framePattern = Pattern.compile(
+          "\n\tat (?:[\\w.$/]+/)?(?<module>[\\w$.]+)\\.(?<function>[\\w<>$]+)\\((?:(?<filename>[\\w]+\\.java):(?<lineno>\\d+)\\)|(?<desc>[\\w\\s]*))");
+      final Matcher matcher = framePattern.matcher(exceptionStr);
+
+      while (matcher.find()) {
+        final String module = matcher.group("module");
+        final String filename = matcher.group("filename");
+        final String lineno = matcher.group("lineno");
+        final String function = matcher.group("function");
+        final String sourceDescription = matcher.group("desc");
+
+        final SentryStackFrame stackFrame = new SentryStackFrame();
+        stackFrame.setModule(module);
+        stackFrame.setFunction(function);
+        stackFrame.setFilename(filename);
+
+        if (lineno != null) {
+          stackFrame.setLineno(Integer.valueOf(lineno));
+        }
+        if (sourceDescription != null && sourceDescription.equals("Native Method")) {
+          stackFrame.setNative(true);
+        }
+
+        stackFrames.add(stackFrame);
+      }
+
+      if (stackFrames.size() > 0) {
+        Collections.reverse(stackFrames);
+        stackTrace.setFrames(stackFrames);
+
+        final SentryException sentryException = new SentryException();
+        sentryException.setStacktrace(stackTrace);
+
+        // The first section of our stacktrace before the first frame has exception type and value
+        final String[] sections = exceptionStr.split("\n\tat ", 2);
+        final String[] headerParts = sections[0].split(": ", 2);
+
+        if (headerParts.length > 0) {
+          sentryException.setType(headerParts[0].trim());
+          if (headerParts.length == 2) {
+            sentryException.setValue(headerParts[1].trim());
+          }
+
+          sentryExceptions.add(sentryException);
+        }
+      }
+    }
+
+    if (sentryExceptions.size() == 0)
+      return Optional.empty();
+
+    return Optional.of(sentryExceptions);
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java
new file mode 100644
index 0000000000000..ff509b7ce254e
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.Metadata;
+import io.airbyte.config.StandardWorkspace;
+import io.sentry.Hub;
+import io.sentry.IHub;
+import io.sentry.NoOpHub;
+import io.sentry.SentryEvent;
+import io.sentry.SentryOptions;
+import io.sentry.protocol.Message;
+import io.sentry.protocol.SentryException;
+import io.sentry.protocol.User;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class SentryJobErrorReportingClient implements JobErrorReportingClient {
+
+  static final String STACKTRACE_PARSE_ERROR_TAG_KEY = "stacktrace_parse_error";
+  private final IHub sentryHub;
+  private final SentryExceptionHelper exceptionHelper;
+
+  SentryJobErrorReportingClient(final IHub sentryHub, final SentryExceptionHelper exceptionHelper) {
+    this.sentryHub = sentryHub;
+    this.exceptionHelper = exceptionHelper;
+  }
+
+  public SentryJobErrorReportingClient(final String sentryDSN, final SentryExceptionHelper exceptionHelper) {
+    this(createSentryHubWithDSN(sentryDSN), exceptionHelper);
+  }
+
+  static IHub createSentryHubWithDSN(final String sentryDSN) {
+    if (sentryDSN == null || sentryDSN.isEmpty()) {
+      return NoOpHub.getInstance();
+    }
+
+    final SentryOptions options = new SentryOptions();
+    options.setDsn(sentryDSN);
+    options.setAttachStacktrace(false);
+    options.setEnableUncaughtExceptionHandler(false);
+    return new Hub(options);
+  }
+
+  /**
+   * Reports a Connector Job FailureReason to Sentry
+   *
+   * @param workspace - Workspace where this failure occurred
+   * @param failureReason - FailureReason to report
+   * @param dockerImage - Tagged docker image that represents the release where this failure occurred
+   * @param metadata - Extra metadata to set as tags on the event
+   */
+  @Override
+  public void reportJobFailureReason(final StandardWorkspace workspace,
+                                     final FailureReason failureReason,
+                                     final String dockerImage,
+                                     final Map<String, String> metadata) {
+    final SentryEvent event = new SentryEvent();
+
+    // Remove invalid characters from the release name, use @ so sentry knows how to grab the tag
+    // e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0
+    // More info at https://docs.sentry.io/product/cli/releases/#creating-releases
+    final String release = dockerImage.replace("/", "-").replace(":", "@");
+    event.setRelease(release);
+
+    // enhance event fingerprint to ensure separate grouping per connector
+    final String[] releaseParts = release.split("@");
+    if (releaseParts.length > 0) {
+      event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
+    }
+
+    // set workspace as the user in sentry to get impact and priority
+    final User sentryUser = new User();
+    sentryUser.setId(String.valueOf(workspace.getWorkspaceId()));
+    sentryUser.setUsername(workspace.getName());
+    event.setUser(sentryUser);
+
+    // set metadata as tags
+    event.setTags(metadata);
+
+    // set failure reason's internalMessage as event message
+    // Sentry will use this to fuzzy-group if no stacktrace information is available
+    final Message message = new Message();
+    message.setFormatted(failureReason.getInternalMessage());
+    event.setMessage(message);
+
+    // events can come from any platform
+    event.setPlatform("other");
+
+    // attach failure reason stack trace
+    final String failureStackTrace = failureReason.getStacktrace();
+    if (failureStackTrace != null && !failureStackTrace.isBlank()) {
+      final Optional<List<SentryException>> parsedExceptions = exceptionHelper.buildSentryExceptions(failureStackTrace);
+      if (parsedExceptions.isPresent()) {
+        event.setExceptions(parsedExceptions.get());
+      } else {
+        event.setTag(STACKTRACE_PARSE_ERROR_TAG_KEY, "1");
+
+        // We couldn't parse the stacktrace, but we can still give it to Sentry for (less accurate) grouping
+        final String normalizedStacktrace = failureStackTrace
+            .replace("\n", ", ")
+            .replace(failureReason.getInternalMessage(), "");
+
+        final SentryException sentryException = new SentryException();
+        sentryException.setValue(normalizedStacktrace);
+        event.setExceptions(List.of(sentryException));
+      }
+    }
+
+    sentryHub.configureScope(scope -> {
+      final Map<String, String> failureReasonContext = new HashMap<>();
+      failureReasonContext.put("internalMessage", failureReason.getInternalMessage());
+      failureReasonContext.put("externalMessage", failureReason.getExternalMessage());
+      failureReasonContext.put("stacktrace", failureReason.getStacktrace());
+      failureReasonContext.put("timestamp", failureReason.getTimestamp().toString());
+
+      final Metadata failureReasonMeta = failureReason.getMetadata();
+      if (failureReasonMeta != null) {
+        failureReasonContext.put("metadata", failureReasonMeta.toString());
+      }
+
+      scope.setContexts("Failure Reason", failureReasonContext);
+    });
+
+    sentryHub.captureEvent(event);
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java
new file mode 100644
index 0000000000000..ae99ad02ad537
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import static org.mockito.Mockito.mock;
+
+import io.airbyte.config.AttemptFailureSummary;
+import io.airbyte.config.Configs.DeploymentMode;
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.FailureReason.FailureOrigin;
+import io.airbyte.config.FailureReason.FailureType;
+import io.airbyte.config.JobSyncConfig;
+import io.airbyte.config.Metadata;
+import io.airbyte.config.StandardDestinationDefinition;
+import io.airbyte.config.StandardSourceDefinition;
+import io.airbyte.config.StandardWorkspace;
+import io.airbyte.config.persistence.ConfigRepository;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class JobErrorReporterTest {
+
+  private static final UUID CONNECTION_ID = UUID.randomUUID();
+  private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
+  private static final String AIRBYTE_VERSION = "0.1.40";
+  private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID();
+  private static final String SOURCE_DEFINITION_NAME = "stripe";
+  private static final String SOURCE_DOCKER_IMAGE = "airbyte/source-stripe:1.2.3";
+  private static final StandardSourceDefinition.ReleaseStage SOURCE_RELEASE_STAGE = StandardSourceDefinition.ReleaseStage.BETA;
+  private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
+  private static final String DESTINATION_DEFINITION_NAME = "snowflake";
+  private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA;
+  private static final String DESTINATION_DOCKER_IMAGE = "airbyte/destination-snowflake:1.2.3";
+
+  private ConfigRepository configRepository;
+  private JobErrorReportingClient jobErrorReportingClient;
+  private JobErrorReporter jobErrorReporter;
+
+  @BeforeEach
+  void setup() {
+    configRepository = mock(ConfigRepository.class);
+    jobErrorReportingClient = mock(JobErrorReportingClient.class);
+    jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, jobErrorReportingClient);
+  }
+
+  @Test
+  void testReportSyncJobFailure() {
+    final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class);
+
+    final FailureReason sourceFailureReason = new FailureReason()
+        .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true))
+        .withFailureOrigin(FailureOrigin.SOURCE)
+        .withFailureType(FailureType.SYSTEM_ERROR);
+
+    final FailureReason destinationFailureReason = new FailureReason()
+        .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true))
+        .withFailureOrigin(FailureOrigin.DESTINATION)
+        .withFailureType(FailureType.SYSTEM_ERROR);
+
+    final FailureReason nonTraceMessageFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE);
+    final FailureReason replicationFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.REPLICATION);
+
+    Mockito.when(mFailureSummary.getFailures())
+        .thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));
+
+    final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class);
+    Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(SOURCE_DOCKER_IMAGE);
+    Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DESTINATION_DOCKER_IMAGE);
+
+    Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID))
+        .thenReturn(new StandardSourceDefinition()
+            .withReleaseStage(SOURCE_RELEASE_STAGE)
+            .withSourceDefinitionId(SOURCE_DEFINITION_ID)
+            .withName(SOURCE_DEFINITION_NAME));
+
+    Mockito.when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID))
+        .thenReturn(new StandardDestinationDefinition()
+            .withReleaseStage(DESTINATION_RELEASE_STAGE)
+            .withDestinationDefinitionId(DESTINATION_DEFINITION_ID)
+            .withName(DESTINATION_DEFINITION_NAME));
+
+    final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class);
+    Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace);
+
+    jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig);
+
+    final Map<String, String> expectedSourceMetadata = Map.of(
+        "connection_id", CONNECTION_ID.toString(),
+        "deployment_mode", DEPLOYMENT_MODE.name(),
+        "airbyte_version", AIRBYTE_VERSION,
+        "failure_origin", "source",
+        "failure_type", "system_error",
+        "connector_definition_id", SOURCE_DEFINITION_ID.toString(),
+        "connector_name", SOURCE_DEFINITION_NAME,
+        "connector_release_stage", SOURCE_RELEASE_STAGE.toString());
+
+    final Map<String, String> expectedDestinationMetadata = Map.of(
+        "connection_id", CONNECTION_ID.toString(),
+        "deployment_mode", DEPLOYMENT_MODE.name(),
+        "airbyte_version", AIRBYTE_VERSION,
+        "failure_origin", "destination",
+        "failure_type", "system_error",
+        "connector_definition_id", DESTINATION_DEFINITION_ID.toString(),
+        "connector_name", DESTINATION_DEFINITION_NAME,
+        "connector_release_stage", DESTINATION_RELEASE_STAGE.toString());
+
+    Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata);
+    Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE,
+        expectedDestinationMetadata);
+    Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
+  }
+
+  @Test
+  void testReportSyncJobFailureDoesNotThrow() {
+    final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class);
+    final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class);
+
+    final FailureReason sourceFailureReason = new FailureReason()
+        .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true))
+        .withFailureOrigin(FailureOrigin.SOURCE)
+        .withFailureType(FailureType.SYSTEM_ERROR);
+
+    Mockito.when(mFailureSummary.getFailures()).thenReturn(List.of(sourceFailureReason));
+
+    Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID))
+        .thenReturn(new StandardSourceDefinition()
+            .withReleaseStage(SOURCE_RELEASE_STAGE)
+            .withSourceDefinitionId(SOURCE_DEFINITION_ID)
+            .withName(SOURCE_DEFINITION_NAME));
+
+    Mockito.doThrow(new RuntimeException("some exception"))
+        .when(jobErrorReportingClient)
+        .reportJobFailureReason(Mockito.any(), Mockito.eq(sourceFailureReason), Mockito.any(), Mockito.any());
+
+    Assertions.assertDoesNotThrow(() -> jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig));
+    Mockito.verify(jobErrorReportingClient, Mockito.times(1))
+        .reportJobFailureReason(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java
new file mode 100644
index 0000000000000..b6ebd65ad6a50
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.airbyte.config.Configs;
+import io.airbyte.config.Configs.JobErrorReportingStrategy;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class JobErrorReportingClientFactoryTest {
+
+  @Test
+  void testCreateErrorReportingClientLogging() {
+    assertTrue(
+        JobErrorReportingClientFactory.getClient(
+            JobErrorReportingStrategy.LOGGING, Mockito.mock(Configs.class)) instanceof LoggingJobErrorReportingClient);
+  }
+
+  @Test
+  void testCreateErrorReportingClientSentry() {
+    final Configs configsMock = Mockito.mock(Configs.class);
+    Mockito.when(configsMock.getJobErrorReportingSentryDSN()).thenReturn("");
+
+    assertTrue(
+        JobErrorReportingClientFactory.getClient(
+            JobErrorReportingStrategy.SENTRY, configsMock) instanceof SentryJobErrorReportingClient);
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java
new file mode 100644
index 0000000000000..55aa7dc2c385b
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import io.sentry.protocol.SentryException;
+import io.sentry.protocol.SentryStackFrame;
+import io.sentry.protocol.SentryStackTrace;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SentryExceptionHelperTest {
+
+  final SentryExceptionHelper exceptionHelper = new SentryExceptionHelper();
+
+  @Test
+  void testBuildSentryExceptionsInvalid() {
+    final String stacktrace = "this is not a stacktrace";
+    final Optional<List<SentryException>> exceptionList = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(exceptionList.isEmpty());
+  }
+
+  @Test
+  void testBuildSentryExceptionsPartiallyInvalid() {
+    final String stacktrace = "Traceback (most recent call last):\n  Oops!";
+    final Optional<List<SentryException>> exceptionList = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(exceptionList.isEmpty());
+  }
+
+  @Test
+  void testBuildSentryExceptionsPythonChained() {
+    final String stacktrace =
+        """
+        Traceback (most recent call last):
+          File "/airbyte/connector-errors/error.py", line 31, in read_records
+            failing_method()
+          File "/airbyte/connector-errors/error.py", line 36, in failing_method
+            raise HTTPError(http_error_msg, response=self)
+        requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://airbyte.com
+
+        The above exception was the direct cause of the following exception:
+
+        Traceback (most recent call last):
+          File "/airbyte/connector-errors/error.py", line 39, in <module>
+            main()
+          File "/airbyte/connector-errors/error.py", line 13, in main
+            sync_mode("incremental")
+          File "/airbyte/connector-errors/error.py", line 17, in sync_mode
+            incremental()
+          File "/airbyte/connector-errors/error.py", line 33, in incremental
+            raise RuntimeError("My other error") from err
+        RuntimeError: My other error
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(2, exceptionList.size());
+
+    assertExceptionContent(exceptionList.get(0), "requests.exceptions.HTTPError", "400 Client Error: Bad Request for url: https://airbyte.com",
+        List.of(
+            Map.of(
+                "abspath", "/airbyte/connector-errors/error.py",
+                "lineno", 31,
+                "function", "read_records",
+                "context_line", "failing_method()"),
+            Map.of(
+                "abspath", "/airbyte/connector-errors/error.py",
+                "lineno", 36,
+                "function", "failing_method",
+                "context_line", "raise HTTPError(http_error_msg, response=self)")));
+
+    assertExceptionContent(exceptionList.get(1), "RuntimeError", "My other error", List.of(
+        Map.of(
+            "abspath", "/airbyte/connector-errors/error.py",
+            "lineno", 39,
+            "function", "<module>",
+            "context_line", "main()"),
+        Map.of(
+            "abspath", "/airbyte/connector-errors/error.py",
+            "lineno", 13,
+            "function", "main",
+            "context_line", "sync_mode(\"incremental\")"),
+        Map.of(
+            "abspath", "/airbyte/connector-errors/error.py",
+            "lineno", 17,
+            "function", "sync_mode",
+            "context_line", "incremental()"),
+        Map.of(
+            "abspath", "/airbyte/connector-errors/error.py",
+            "lineno", 33,
+            "function", "incremental",
+            "context_line", "raise RuntimeError(\"My other error\") from err")));
+
+  }
+
+  @Test
+  void testBuildSentryExceptionsPythonNoValue() {
+    final String stacktrace =
+        """
+        Traceback (most recent call last):
+          File "/airbyte/connector-errors/error.py", line 33, in incremental
+            raise RuntimeError()
+        RuntimeError
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(1, exceptionList.size());
+
+    assertExceptionContent(exceptionList.get(0), "RuntimeError", null, List.of(
+        Map.of(
+            "abspath", "/airbyte/connector-errors/error.py",
+            "lineno", 33,
+            "function", "incremental",
+            "context_line", "raise RuntimeError()")));
+  }
+
+  @Test
+  void testBuildSentryExceptionsPythonMultilineValue() {
+    final String stacktrace =
+        """
+        Traceback (most recent call last):
+          File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
+            raise _InactiveRpcError(state)
+        grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
+          status = StatusCode.INTERNAL
+          details = "Internal error encountered."
+        >
+
+        During handling of the above exception, another exception occurred:
+
+        Traceback (most recent call last):
+          File "/usr/local/lib/python3.9/site-packages/google/api_core/exceptions.py", line 553, in _parse_grpc_error_details
+            status = rpc_status.from_call(rpc_exc)
+        AttributeError: 'NoneType' object has no attribute 'from_call'
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(2, exceptionList.size());
+
+    final String expectedValue =
+        """
+        <_InactiveRpcError of RPC that terminated with:
+          status = StatusCode.INTERNAL
+          details = "Internal error encountered."
+        >""";
+
+    assertExceptionContent(exceptionList.get(0), "grpc._channel._InactiveRpcError", expectedValue, List.of(
+        Map.of(
+            "abspath", "/usr/local/lib/python3.9/site-packages/grpc/_channel.py",
+            "lineno", 849,
+            "function", "_end_unary_response_blocking",
+            "context_line", "raise _InactiveRpcError(state)")));
+
+    assertExceptionContent(exceptionList.get(1), "AttributeError", "'NoneType' object has no attribute 'from_call'", List.of(
+        Map.of(
+            "abspath", "/usr/local/lib/python3.9/site-packages/google/api_core/exceptions.py",
+            "lineno", 553,
+            "function", "_parse_grpc_error_details",
+            "context_line", "status = rpc_status.from_call(rpc_exc)")));
+  }
+
+  @Test
+  void testBuildSentryExceptionsJava() {
+    final String stacktrace =
+        """
+        java.lang.ArithmeticException: / by zero
+        	at io.airbyte.integrations.base.AirbyteTraceMessageUtilityTest.testCorrectStacktraceFormat(AirbyteTraceMessageUtilityTest.java:61)
+        	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+        	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
+        	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
+        	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
+        	at jdk.proxy2/jdk.proxy2.$Proxy5.stop(Unknown Source)
+        	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(1, exceptionList.size());
+
+    assertExceptionContent(exceptionList.get(0), "java.lang.ArithmeticException", "/ by zero",
+        List.of(
+            Map.of(
+                "filename", "GradleWorkerMain.java",
+                "lineno", 74,
+                "module", "worker.org.gradle.process.internal.worker.GradleWorkerMain",
+                "function", "main"),
+            Map.of(
+                "module", "jdk.proxy2.$Proxy5",
+                "function", "stop"),
+            Map.of(
+                "filename", "ThrowableCollector.java",
+                "lineno", 73,
+                "module", "org.junit.platform.engine.support.hierarchical.ThrowableCollector",
+                "function", "execute"),
+            Map.of(
+                "filename", "NodeTestTask.java",
+                "lineno", 141,
+                "module", "org.junit.platform.engine.support.hierarchical.NodeTestTask",
+                "function", "lambda$executeRecursively$8"),
+            Map.of(
+                "filename", "ExecutableInvoker.java",
+                "lineno", 115,
+                "module", "org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall",
+                "function", "lambda$ofVoidMethod$0"),
+            Map.of(
+                "isNative", true,
+                "module", "jdk.internal.reflect.NativeMethodAccessorImpl",
+                "function", "invoke0"),
+            Map.of(
+                "filename", "AirbyteTraceMessageUtilityTest.java",
+                "lineno", 61,
+                "module", "io.airbyte.integrations.base.AirbyteTraceMessageUtilityTest",
+                "function", "testCorrectStacktraceFormat")));
+  }
+
+  @Test
+  void testBuildSentryExceptionsJavaChained() {
+    final String stacktrace =
+        """
+        java.util.concurrent.CompletionException: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
+        	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
+        	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
+        	at java.base/java.lang.Thread.run(Thread.java:833)
+        		Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
+        				at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136)
+        				at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137)
+        				at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65)
+        				at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158)
+        				at java.lang.Thread.run(Thread.java:833)
+        Caused by: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1
+        	at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$7(DefaultReplicationWorker.java:397)
+        	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
+        	... 3 more
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(2, exceptionList.size());
+
+    assertExceptionContent(exceptionList.get(0), "java.util.concurrent.CompletionException",
+        "io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1",
+        List.of(
+            Map.of(
+                "filename", "Thread.java",
+                "lineno", 833,
+                "module", "java.lang.Thread",
+                "function", "run"),
+            Map.of(
+                "filename", "ThreadPoolExecutor.java",
+                "lineno", 635,
+                "module", "java.util.concurrent.ThreadPoolExecutor$Worker",
+                "function", "run"),
+            Map.of(
+                "filename", "CompletableFuture.java",
+                "lineno", 315,
+                "module", "java.util.concurrent.CompletableFuture",
+                "function", "encodeThrowable")));
+
+    assertExceptionContent(exceptionList.get(1), "io.airbyte.workers.DefaultReplicationWorker$DestinationException",
+        "Destination process exited with non-zero exit code 1", List.of(
+            Map.of(
+                "filename", "CompletableFuture.java",
+                "lineno", 1804,
+                "module", "java.util.concurrent.CompletableFuture$AsyncRun",
+                "function", "run"),
+            Map.of(
+                "filename", "DefaultReplicationWorker.java",
+                "lineno", 397,
+                "module", "io.airbyte.workers.DefaultReplicationWorker",
+                "function", "lambda$getDestinationOutputRunnable$7")));
+  }
+
+  @Test
+  void testBuildSentryExceptionsJavaMultilineValue() {
+    final String stacktrace =
+        """
+        io.temporal.failure.ApplicationFailure: GET https://storage.googleapis.com/
+        {
+          "code" : 401,
+          "message" : "Invalid Credentials"
+        }
+        	at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
+          ... 22 more
+        """;
+
+    final Optional<List<SentryException>> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace);
+    Assertions.assertTrue(optionalSentryExceptions.isPresent());
+    final List<SentryException> exceptionList = optionalSentryExceptions.get();
+    Assertions.assertEquals(1, exceptionList.size());
+
+    final String expectedValue =
+        """
+        GET https://storage.googleapis.com/
+        {
+          "code" : 401,
+          "message" : "Invalid Credentials"
+        }""";
+
+    assertExceptionContent(exceptionList.get(0), "io.temporal.failure.ApplicationFailure",
+        expectedValue, List.of(
+            Map.of(
+                "filename", "GoogleJsonResponseException.java",
+                "lineno", 146,
+                "module", "com.google.api.client.googleapis.json.GoogleJsonResponseException",
+                "function", "from")));
+  }
+
+  private void assertExceptionContent(final SentryException exception,
+                                      final String type,
+                                      final String value,
+                                      final List<Map<String, Object>> frames) {
+    Assertions.assertEquals(type, exception.getType());
+    Assertions.assertEquals(value, exception.getValue());
+
+    final SentryStackTrace stackTrace = exception.getStacktrace();
+    Assertions.assertNotNull(stackTrace);
+    final List<SentryStackFrame> sentryFrames = stackTrace.getFrames();
+    Assertions.assertNotNull(sentryFrames);
+    Assertions.assertEquals(frames.size(), sentryFrames.size());
+
+    for (int i = 0; i < frames.size(); i++) {
+      final Map<String, Object> expectedFrame = frames.get(i);
+      final SentryStackFrame sentryFrame = sentryFrames.get(i);
+
+      if (expectedFrame.containsKey("module")) {
+        Assertions.assertEquals(expectedFrame.get("module"), sentryFrame.getModule());
+      }
+
+      if (expectedFrame.containsKey("filename")) {
+        Assertions.assertEquals(expectedFrame.get("filename"), sentryFrame.getFilename());
+      }
+
+      if (expectedFrame.containsKey("abspath")) {
+        Assertions.assertEquals(expectedFrame.get("abspath"), sentryFrame.getAbsPath());
+      }
+
+      if (expectedFrame.containsKey("function")) {
+        Assertions.assertEquals(expectedFrame.get("function"), sentryFrame.getFunction());
+      }
+
+      if (expectedFrame.containsKey("lineno")) {
+        Assertions.assertEquals(expectedFrame.get("lineno"), sentryFrame.getLineno());
+      }
+
+      if (expectedFrame.containsKey("context_line")) {
+        Assertions.assertEquals(expectedFrame.get("context_line"), sentryFrame.getContextLine());
+      }
+
+      if (expectedFrame.containsKey("isNative")) {
+        Assertions.assertEquals(expectedFrame.get("isNative"), sentryFrame.isNative());
+      }
+    }
+  }
+
+}
diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java
new file mode 100644
index 0000000000000..cff663df1b19f
--- /dev/null
+++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.persistence.job_error_reporter;
+
+import static io.airbyte.scheduler.persistence.job_error_reporter.SentryJobErrorReportingClient.STACKTRACE_PARSE_ERROR_TAG_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.airbyte.config.FailureReason;
+import io.airbyte.config.FailureReason.FailureOrigin;
+import io.airbyte.config.FailureReason.FailureType;
+import io.airbyte.config.StandardWorkspace;
+import io.sentry.IHub;
+import io.sentry.NoOpHub;
+import io.sentry.SentryEvent;
+import io.sentry.protocol.Message;
+import io.sentry.protocol.SentryException;
+import io.sentry.protocol.User;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class SentryJobErrorReportingClientTest {
+
+  private static final UUID WORKSPACE_ID = UUID.randomUUID();
+  private static final String WORKSPACE_NAME = "My Workspace";
+  private static final String DOCKER_IMAGE = "airbyte/source-stripe:1.2.3";
+
+  private final StandardWorkspace workspace = new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME);
+  private SentryJobErrorReportingClient sentryErrorReportingClient;
+  private IHub mockSentryHub;
+  private SentryExceptionHelper mockSentryExceptionHelper;
+
+  @BeforeEach
+  void setup() {
+    mockSentryHub = mock(IHub.class);
+    mockSentryExceptionHelper = mock(SentryExceptionHelper.class);
+    sentryErrorReportingClient = new SentryJobErrorReportingClient(mockSentryHub, mockSentryExceptionHelper);
+  }
+
+  @Test
+  void testCreateSentryHubWithBlankDSN() {
+    final String sentryDSN = "";
+    final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(sentryDSN);
+    assertEquals(NoOpHub.getInstance(), sentryHub);
+  }
+
+  @Test
+  void testCreateSentryHubWithNullDSN() {
+    final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(null);
+    assertEquals(NoOpHub.getInstance(), sentryHub);
+  }
+
+  @Test
+  void testCreateSentryHubWithDSN() {
+    final String sentryDSN = "https://public@sentry.example.com/1";
+    final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(sentryDSN);
+    assertNotNull(sentryHub);
+    assertEquals(sentryDSN, sentryHub.getOptions().getDsn());
+    assertFalse(sentryHub.getOptions().isAttachStacktrace());
+    assertFalse(sentryHub.getOptions().isEnableUncaughtExceptionHandler());
+  }
+
+  @Test
+  void testReportJobFailureReason() {
+    final ArgumentCaptor<SentryEvent> eventCaptor = ArgumentCaptor.forClass(SentryEvent.class);
+
+    final FailureReason failureReason = new FailureReason()
+        .withFailureOrigin(FailureOrigin.SOURCE)
+        .withFailureType(FailureType.SYSTEM_ERROR)
+        .withInternalMessage("RuntimeError: Something went wrong");
+    final Map<String, String> metadata = Map.of("some_metadata", "some_metadata_value");
+
+    sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, metadata);
+
+    verify(mockSentryHub).captureEvent(eventCaptor.capture());
+    final SentryEvent actualEvent = eventCaptor.getValue();
+    assertEquals("other", actualEvent.getPlatform());
+    assertEquals("airbyte-source-stripe@1.2.3", actualEvent.getRelease());
+    assertEquals(List.of("{{ default }}", "airbyte-source-stripe"), actualEvent.getFingerprints());
+    assertEquals("some_metadata_value", actualEvent.getTag("some_metadata"));
+    assertNull(actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY));
+    assertNull(actualEvent.getExceptions());
+
+    final User sentryUser = actualEvent.getUser();
+    assertNotNull(sentryUser);
+    assertEquals(WORKSPACE_ID.toString(), sentryUser.getId());
+    assertEquals(WORKSPACE_NAME, sentryUser.getUsername());
+
+    final Message message = actualEvent.getMessage();
+    assertNotNull(message);
+    assertEquals("RuntimeError: Something went wrong", message.getFormatted());
+  }
+
+  @Test
+  void testReportJobFailureReasonWithStacktrace() {
+    final ArgumentCaptor<SentryEvent> eventCaptor = ArgumentCaptor.forClass(SentryEvent.class);
+
+    final List<SentryException> exceptions = new ArrayList<>();
+    final SentryException exception = new SentryException();
+    exception.setType("RuntimeError");
+    exception.setValue("Something went wrong");
+    exceptions.add(exception);
+
+    when(mockSentryExceptionHelper.buildSentryExceptions("Some valid stacktrace")).thenReturn(Optional.of(exceptions));
+
+    final FailureReason failureReason = new FailureReason()
+        .withInternalMessage("RuntimeError: Something went wrong")
+        .withStacktrace("Some valid stacktrace");
+
+    sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, Map.of());
+
+    verify(mockSentryHub).captureEvent(eventCaptor.capture());
+    final SentryEvent actualEvent = eventCaptor.getValue();
+    assertEquals(exceptions, actualEvent.getExceptions());
+    assertNull(actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY));
+  }
+
+  @Test
+  void testReportJobFailureReasonWithInvalidStacktrace() {
+    final ArgumentCaptor<SentryEvent> eventCaptor = ArgumentCaptor.forClass(SentryEvent.class);
+    final String invalidStacktrace = "Invalid stacktrace\nRuntimeError: Something went wrong";
+
+    when(mockSentryExceptionHelper.buildSentryExceptions(invalidStacktrace)).thenReturn(Optional.empty());
+
+    final FailureReason failureReason = new FailureReason()
+        .withInternalMessage("Something went wrong")
+        .withStacktrace(invalidStacktrace);
+
+    sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, Map.of());
+
+    verify(mockSentryHub).captureEvent(eventCaptor.capture());
+    final SentryEvent actualEvent = eventCaptor.getValue();
+    assertEquals("1", actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY));
+    final List<SentryException> exceptions = actualEvent.getExceptions();
+    assertNotNull(exceptions);
+    assertEquals(1, exceptions.size());
+    assertEquals("Invalid stacktrace, RuntimeError: ", exceptions.get(0).getValue());
+  }
+
+}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
index 28153a05a47cd..f0a4f96a2804d 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
@@ -41,6 +41,9 @@
 import io.airbyte.scheduler.persistence.JobNotifier;
 import io.airbyte.scheduler.persistence.JobPersistence;
 import io.airbyte.scheduler.persistence.WorkspaceHelper;
+import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
+import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClient;
+import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClientFactory;
 import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory;
 import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
 import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
@@ -134,6 +137,7 @@ public class WorkerApp {
   private final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig;
   private final JobNotifier jobNotifier;
   private final JobTracker jobTracker;
+  private final JobErrorReporter jobErrorReporter;
   private final StreamResetPersistence streamResetPersistence;
 
   public void start() {
@@ -193,7 +197,8 @@ private void registerConnectionManager(final WorkerFactory factory) {
             jobTracker,
             configRepository,
             jobCreator,
-            streamResetPersistence),
+            streamResetPersistence,
+            jobErrorReporter),
         new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
         new ConnectionDeletionActivityImpl(connectionHelper),
         new CheckConnectionActivityImpl(
@@ -435,8 +440,11 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
 
     final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);
 
-    final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);
+    final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs);
+    final JobErrorReporter jobErrorReporter =
+        new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient);
 
+    final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);
     new WorkerApp(
         workspaceRoot,
         defaultProcessFactory,
@@ -464,6 +472,7 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
         containerOrchestratorConfig,
         jobNotifier,
         jobTracker,
+        jobErrorReporter,
         streamResetPersistence).start();
   }
 
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
index dc37eb4a731fa..7f548778f5be9 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
@@ -30,6 +30,7 @@
 import io.airbyte.scheduler.persistence.JobCreator;
 import io.airbyte.scheduler.persistence.JobNotifier;
 import io.airbyte.scheduler.persistence.JobPersistence;
+import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
 import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
 import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
 import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
@@ -61,6 +62,7 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta
   private final ConfigRepository configRepository;
   private final JobCreator jobCreator;
   private final StreamResetPersistence streamResetPersistence;
+  private final JobErrorReporter jobErrorReporter;
 
   @Override
   public JobCreationOutput createNewJob(final JobCreationInput input) {
@@ -199,6 +201,10 @@ public void jobFailure(final JobFailureInput input) {
       jobNotifier.failJob(input.getReason(), job);
       emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId);
       trackCompletion(job, JobStatus.FAILED);
+
+      final UUID connectionId = UUID.fromString(job.getScope());
+      job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary)
+          .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, job.getConfig().getSync()));
     } catch (final IOException e) {
       throw new RetryableException(e);
     }
@@ -224,6 +230,7 @@ public void attemptFailure(final AttemptFailureInput input) {
         MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
             MetricTags.getFailureOrigin(reason.getFailureOrigin()));
       }
+
     } catch (final IOException e) {
       throw new RetryableException(e);
     }
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
index 1c9c3da9275f1..211734d0d674a 100644
--- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
+++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
@@ -35,6 +35,7 @@
 import io.airbyte.scheduler.persistence.JobCreator;
 import io.airbyte.scheduler.persistence.JobNotifier;
 import io.airbyte.scheduler.persistence.JobPersistence;
+import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
 import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
 import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
 import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
@@ -94,6 +95,9 @@ public class JobCreationAndStatusUpdateActivityTest {
   @Mock
   private JobTracker mJobtracker;
 
+  @Mock
+  private JobErrorReporter mJobErrorReporter;
+
   @Mock
   private ConfigRepository mConfigRepository;
 
@@ -293,10 +297,22 @@ public void setJobSuccessWrapException() throws IOException {
 
     @Test
     public void setJobFailure() throws IOException {
+      final Attempt mAttempt = Mockito.mock(Attempt.class);
+      Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary));
+
+      final Job mJob = Mockito.mock(Job.class);
+      Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString());
+      Mockito.when(mJob.getConfig()).thenReturn(new JobConfig());
+      Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt));
+
+      Mockito.when(mJobPersistence.getJob(JOB_ID))
+          .thenReturn(mJob);
+
       jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason"));
 
       Mockito.verify(mJobPersistence).failJob(JOB_ID);
       Mockito.verify(mJobNotifier).failJob(eq("reason"), Mockito.any());
+      Mockito.verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any());
     }
 
     @Test
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 79a53b4d1d268..eeaa49fb2bb34 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -88,6 +88,8 @@ services:
       - WORKSPACE_ROOT=${WORKSPACE_ROOT}
       - METRIC_CLIENT=${METRIC_CLIENT}
       - OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT}
+      - JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY}
+      - JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN}
       - ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT}
       - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS}
       - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS}