diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java index 0fe70a947d402..8e002e39c21a3 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; +import io.airbyte.metrics.lib.MetricClientFactory; +import io.airbyte.metrics.lib.OssMetricsRegistry; import io.airbyte.protocol.models.AirbyteLogMessage; import io.airbyte.protocol.models.AirbyteMessage; import java.io.BufferedReader; @@ -42,7 +44,9 @@ public DefaultAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder this(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder); } - DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger, final MdcScope.Builder containerLogMdcBuilder) { + DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, + final Logger logger, + final MdcScope.Builder containerLogMdcBuilder) { protocolValidator = protocolPredicate; this.logger = logger; this.containerLogMdcBuilder = containerLogMdcBuilder; @@ -50,9 +54,12 @@ public DefaultAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder @Override public Stream create(final BufferedReader bufferedReader) { + final var metricClient = MetricClientFactory.getMetricClient(); return bufferedReader .lines() + .peek(str -> metricClient.distribution(OssMetricsRegistry.JSON_STRING_LENGTH, str.length())) .flatMap(this::parseJson) + .peek(json -> metricClient.distribution(OssMetricsRegistry.JSON_SIZE, json.size())) .filter(this::validate) .flatMap(this::toAirbyteMessage) .filter(this::filterLog); @@ -99,7 +106,8 @@ protected boolean filterLog(final AirbyteMessage message) { protected void internalLog(final AirbyteLogMessage logMessage) { final String combinedMessage = - logMessage.getMessage() + (logMessage.getStackTrace() != null ? (System.lineSeparator() + "Stack Trace: " + logMessage.getStackTrace()) : ""); + logMessage.getMessage() + (logMessage.getStackTrace() != null ? (System.lineSeparator() + + "Stack Trace: " + logMessage.getStackTrace()) : ""); switch (logMessage.getLevel()) { case FATAL, ERROR -> logger.error(combinedMessage); diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index 75e30ebf335dc..3c5f81b0296c7 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -71,6 +71,14 @@ public enum OssMetricsRegistry implements MetricsRegistry { MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", "time taken to create a new kube pod process"), + JSON_STRING_LENGTH( + MetricEmittingApps.WORKER, + "json_string_length", + "string length of a raw json string"), + JSON_SIZE( + MetricEmittingApps.WORKER, + "json_size", + "size of the json object"), NUM_PENDING_JOBS( MetricEmittingApps.METRICS_REPORTER, "num_pending_jobs", @@ -124,7 +132,9 @@ public enum OssMetricsRegistry implements MetricsRegistry { private final String metricName; private final String metricDescription; - OssMetricsRegistry(final MetricEmittingApp application, final String metricName, final String metricDescription) { + OssMetricsRegistry(final MetricEmittingApp application, + final String metricName, + final String metricDescription) { Preconditions.checkNotNull(metricDescription); Preconditions.checkNotNull(application);