From c4614664bd74f0caee2d316b60c848a12323e3f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 23 Oct 2024 12:38:12 +0200 Subject: [PATCH] feat(core,jdbc): add message protection metric --- .../kestra/core/metrics/MetricRegistry.java | 106 +++++++++--------- .../java/io/kestra/jdbc/runner/JdbcQueue.java | 8 ++ 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java index cc563b05be..36d7ae35f5 100644 --- a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java +++ b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java @@ -19,58 +19,60 @@ @Singleton @Slf4j public class MetricRegistry { - public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending"; - public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running"; - public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread"; - public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count"; - public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration"; - public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count"; - public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count"; - public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count"; - public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration"; - public final static String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration"; - public final static String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count"; - public final static String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count"; - public final static String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count"; - public final static String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count"; - public final static String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count"; - - public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; - public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; - public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; - public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; - public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; - public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; - public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; - - public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count"; - public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration"; - public final static String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count"; - public final static String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration"; - public final static String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count"; - public final static String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count"; - public final static String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count"; - - public final static String SCHEDULER_LOOP_COUNT = "scheduler.loop.count"; - public final static String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count"; - public final static String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration"; - public final static String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count"; - public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration"; - public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration"; - - public final static String STREAMS_STATE_COUNT = "stream.state.count"; - - - public final static String JDBC_QUERY_DURATION = "jdbc.query.duration"; - - public final static String TAG_TASK_TYPE = "task_type"; - public final static String TAG_TRIGGER_TYPE = "trigger_type"; - public final static String TAG_FLOW_ID = "flow_id"; - public final static String TAG_NAMESPACE_ID = "namespace_id"; - public final static String TAG_STATE = "state"; - public final static String TAG_ATTEMPT_COUNT = "attempt_count"; - public final static String TAG_WORKER_GROUP = "worker_group"; - public final static String TAG_TENANT_ID = "tenant_id"; + public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending"; + public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running"; + public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread"; + public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count"; + public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration"; + public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count"; + public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count"; + public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count"; + public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration"; + public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration"; + public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count"; + public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count"; + public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count"; + public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count"; + public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count"; + + public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; + public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; + public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; + public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; + public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; + public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; + public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; + + public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count"; + public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration"; + public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count"; + public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration"; + public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count"; + public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count"; + public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count"; + + public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count"; + public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count"; + public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration"; + public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count"; + public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration"; + public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration"; + + public static final String STREAMS_STATE_COUNT = "stream.state.count"; + + public static final String JDBC_QUERY_DURATION = "jdbc.query.duration"; + + public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count"; + + public static final String TAG_TASK_TYPE = "task_type"; + public static final String TAG_TRIGGER_TYPE = "trigger_type"; + public static final String TAG_FLOW_ID = "flow_id"; + public static final String TAG_NAMESPACE_ID = "namespace_id"; + public static final String TAG_STATE = "state"; + public static final String TAG_ATTEMPT_COUNT = "attempt_count"; + public static final String TAG_WORKER_GROUP = "worker_group"; + public static final String TAG_TENANT_ID = "tenant_id"; + public static final String TAG_CLASS_NAME = "class_name"; @Inject private MeterRegistry meterRegistry; diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index e28b4b710f..f79c66f7ae 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CaseFormat; import io.kestra.core.exceptions.DeserializationException; +import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueInterface; @@ -64,6 +65,8 @@ public abstract class JdbcQueue implements QueueInterface { protected final MessageProtectionConfiguration messageProtectionConfiguration; + private final MetricRegistry metricRegistry; + protected final Table table; protected final JdbcQueueIndexer jdbcQueueIndexer; @@ -80,6 +83,7 @@ public JdbcQueue(Class cls, ApplicationContext applicationContext) { this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class); this.configuration = applicationContext.getBean(Configuration.class); this.messageProtectionConfiguration = applicationContext.getBean(MessageProtectionConfiguration.class); + this.metricRegistry = applicationContext.getBean(MetricRegistry.class); JdbcTableConfigs jdbcTableConfigs = applicationContext.getBean(JdbcTableConfigs.class); @@ -97,6 +101,10 @@ protected Map, Object> produceFields(String consumerGroup, String } if (messageProtectionConfiguration.enabled && bytes.length >= messageProtectionConfiguration.limit) { + metricRegistry + .counter(MetricRegistry.QUEUE_BIG_MESSAGE_COUNT, MetricRegistry.TAG_CLASS_NAME, cls.getName()) + .increment(); + // we let terminated execution messages to go through anyway if (!(message instanceof Execution execution) || !execution.getState().isTerminated()) { throw new MessageTooBigException("Message of size " + bytes.length + " has exceeded the configured limit of " + messageProtectionConfiguration.limit);