Skip to content

Commit

Permalink
feat(core,jdbc): add message protection metric
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Oct 23, 2024
1 parent 9598eac commit c461466
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 52 deletions.
106 changes: 54 additions & 52 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,60 @@
@Singleton
@Slf4j
public class MetricRegistry {
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public final static String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public final static String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public final static String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public final static String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public final static String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public final static String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public final static String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public final static String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public final static String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public final static String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public final static String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public final static String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public final static String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public final static String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public final static String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public final static String STREAMS_STATE_COUNT = "stream.state.count";


public final static String JDBC_QUERY_DURATION = "jdbc.query.duration";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_TRIGGER_TYPE = "trigger_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";
public final static String TAG_TENANT_ID = "tenant_id";
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public static final String STREAMS_STATE_COUNT = "stream.state.count";

public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";

public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";

public static final String TAG_TASK_TYPE = "task_type";
public static final String TAG_TRIGGER_TYPE = "trigger_type";
public static final String TAG_FLOW_ID = "flow_id";
public static final String TAG_NAMESPACE_ID = "namespace_id";
public static final String TAG_STATE = "state";
public static final String TAG_ATTEMPT_COUNT = "attempt_count";
public static final String TAG_WORKER_GROUP = "worker_group";
public static final String TAG_TENANT_ID = "tenant_id";
public static final String TAG_CLASS_NAME = "class_name";

@Inject
private MeterRegistry meterRegistry;
Expand Down
8 changes: 8 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -64,6 +65,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {

protected final MessageProtectionConfiguration messageProtectionConfiguration;

private final MetricRegistry metricRegistry;

protected final Table<Record> table;

protected final JdbcQueueIndexer jdbcQueueIndexer;
Expand All @@ -80,6 +83,7 @@ public JdbcQueue(Class<T> cls, ApplicationContext applicationContext) {
this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
this.configuration = applicationContext.getBean(Configuration.class);
this.messageProtectionConfiguration = applicationContext.getBean(MessageProtectionConfiguration.class);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);

JdbcTableConfigs jdbcTableConfigs = applicationContext.getBean(JdbcTableConfigs.class);

Expand All @@ -97,6 +101,10 @@ protected Map<Field<Object>, Object> produceFields(String consumerGroup, String
}

if (messageProtectionConfiguration.enabled && bytes.length >= messageProtectionConfiguration.limit) {
metricRegistry
.counter(MetricRegistry.QUEUE_BIG_MESSAGE_COUNT, MetricRegistry.TAG_CLASS_NAME, cls.getName())
.increment();

// we let terminated execution messages to go through anyway
if (!(message instanceof Execution execution) || !execution.getState().isTerminated()) {
throw new MessageTooBigException("Message of size " + bytes.length + " has exceeded the configured limit of " + messageProtectionConfiguration.limit);
Expand Down

0 comments on commit c461466

Please sign in to comment.