Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core,jdbc): add message protection metric #5624

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 54 additions & 52 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,60 @@
@Singleton
@Slf4j
public class MetricRegistry {
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public final static String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public final static String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public final static String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public final static String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public final static String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public final static String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public final static String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public final static String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public final static String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public final static String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public final static String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public final static String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public final static String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public final static String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public final static String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public final static String STREAMS_STATE_COUNT = "stream.state.count";


public final static String JDBC_QUERY_DURATION = "jdbc.query.duration";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_TRIGGER_TYPE = "trigger_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";
public final static String TAG_TENANT_ID = "tenant_id";
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public static final String STREAMS_STATE_COUNT = "stream.state.count";

public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";

public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";

public static final String TAG_TASK_TYPE = "task_type";
public static final String TAG_TRIGGER_TYPE = "trigger_type";
public static final String TAG_FLOW_ID = "flow_id";
public static final String TAG_NAMESPACE_ID = "namespace_id";
public static final String TAG_STATE = "state";
public static final String TAG_ATTEMPT_COUNT = "attempt_count";
public static final String TAG_WORKER_GROUP = "worker_group";
public static final String TAG_TENANT_ID = "tenant_id";
public static final String TAG_CLASS_NAME = "class_name";

@Inject
private MeterRegistry meterRegistry;
Expand Down
8 changes: 8 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -64,6 +65,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {

protected final MessageProtectionConfiguration messageProtectionConfiguration;

private final MetricRegistry metricRegistry;

protected final Table<Record> table;

protected final JdbcQueueIndexer jdbcQueueIndexer;
Expand All @@ -80,6 +83,7 @@ public JdbcQueue(Class<T> cls, ApplicationContext applicationContext) {
this.dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
this.configuration = applicationContext.getBean(Configuration.class);
this.messageProtectionConfiguration = applicationContext.getBean(MessageProtectionConfiguration.class);
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);

JdbcTableConfigs jdbcTableConfigs = applicationContext.getBean(JdbcTableConfigs.class);

Expand All @@ -97,6 +101,10 @@ protected Map<Field<Object>, Object> produceFields(String consumerGroup, String
}

if (messageProtectionConfiguration.enabled && bytes.length >= messageProtectionConfiguration.limit) {
metricRegistry
.counter(MetricRegistry.QUEUE_BIG_MESSAGE_COUNT, MetricRegistry.TAG_CLASS_NAME, cls.getName())
.increment();

// we let terminated execution messages to go through anyway
if (!(message instanceof Execution execution) || !execution.getState().isTerminated()) {
throw new MessageTooBigException("Message of size " + bytes.length + " has exceeded the configured limit of " + messageProtectionConfiguration.limit);
Expand Down