diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java index 374ea9f45d1a..5f9ef30ef294 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerConfig.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.validation.FileExists; -import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import java.io.File; @@ -27,7 +26,6 @@ public class EventListenerConfig { private List eventListenerFiles = ImmutableList.of(); - private int maxConcurrentQueryCompletedEvents = 100; @NotNull public List<@FileExists File> getEventListenerFiles() @@ -43,17 +41,4 @@ public EventListenerConfig setEventListenerFiles(List eventListenerFiles .collect(toImmutableList()); return this; } - - @Min(1) - public int getMaxConcurrentQueryCompletedEvents() - { - return maxConcurrentQueryCompletedEvents; - } - - @Config("event-listener.max-concurrent-query-completed-events") - public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents) - { - this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents; - return this; - } } diff --git a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java index cbb158b23c52..2be3158a20b9 100644 --- a/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java +++ b/core/trino-main/src/main/java/io/trino/eventlistener/EventListenerManager.java @@ -18,7 +18,6 @@ import com.google.inject.Inject; import io.airlift.configuration.secrets.SecretsResolver; import io.airlift.log.Logger; -import io.airlift.stats.CounterStat; import io.airlift.stats.TimeStat; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.eventlistener.EventListener; @@ -58,13 +57,11 @@ public class EventListenerManager private static final File CONFIG_FILE = new File("etc/event-listener.properties"); private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name"; private final List configFiles; - private final int maxConcurrentQueryCompletedEvents; private final Map eventListenerFactories = new ConcurrentHashMap<>(); private final List providedEventListeners = Collections.synchronizedList(new ArrayList<>()); private final AtomicReference> configuredEventListeners = new AtomicReference<>(ImmutableList.of()); private final AtomicBoolean loading = new AtomicBoolean(false); private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger(); - private final CounterStat skippedQueryCompletedEvents = new CounterStat(); private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS); private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS); @@ -75,7 +72,6 @@ public class EventListenerManager public EventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver) { this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles()); - this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents(); this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null"); } @@ -153,11 +149,7 @@ private static Map loadEventListenerProperties(File configFile) public void queryCompleted(Function queryCompletedEventProvider) { try (TimeStat.BlockTimer _ = queryCompletedTime.time()) { - if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) { - concurrentQueryCompletedEvents.decrementAndGet(); - skippedQueryCompletedEvents.update(1); - return; - } + concurrentQueryCompletedEvents.incrementAndGet(); doQueryCompleted(queryCompletedEventProvider); concurrentQueryCompletedEvents.decrementAndGet(); } @@ -241,13 +233,6 @@ public int getConcurrentQueryCompletedEvents() return concurrentQueryCompletedEvents.get(); } - @Managed - @Nested - public CounterStat getSkippedQueryCompletedEvents() - { - return skippedQueryCompletedEvents; - } - @PreDestroy public void shutdown() { diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java index 32180cdcf0c3..3ebb5a5ac02b 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerConfig.java @@ -32,7 +32,6 @@ public class TestEventListenerConfig public void testDefaults() { assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class) - .setMaxConcurrentQueryCompletedEvents(100) .setEventListenerFiles(ImmutableList.of())); } @@ -44,12 +43,10 @@ public void testExplicitPropertyMappings() Path config2 = Files.createTempFile(null, null); Map properties = ImmutableMap.of( - "event-listener.config-files", config1.toString() + "," + config2.toString(), - "event-listener.max-concurrent-query-completed-events", "1"); + "event-listener.config-files", config1.toString() + "," + config2.toString()); EventListenerConfig expected = new EventListenerConfig() - .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())) - .setMaxConcurrentQueryCompletedEvents(1); + .setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath())); assertFullMapping(properties, expected); } diff --git a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java index 78811900bbb5..c330ff4d17f6 100644 --- a/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java +++ b/core/trino-main/src/test/java/io/trino/eventlistener/TestEventListenerManager.java @@ -17,138 +17,14 @@ import io.airlift.configuration.secrets.SecretsResolver; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.eventlistener.QueryCompletedEvent; -import io.trino.spi.eventlistener.QueryContext; -import io.trino.spi.eventlistener.QueryIOMetadata; -import io.trino.spi.eventlistener.QueryMetadata; -import io.trino.spi.eventlistener.QueryStatistics; -import io.trino.spi.session.ResourceEstimates; import org.junit.jupiter.api.Test; -import java.net.URI; -import java.time.Instant; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static io.trino.spi.type.TimeZoneKey.UTC_KEY; -import static java.time.Duration.ofMillis; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.assertj.core.api.Assertions.assertThat; class TestEventListenerManager { - private static final QueryMetadata QUERY_METADATA = new QueryMetadata( - "minimal_query", - Optional.empty(), - "query", - Optional.empty(), - Optional.empty(), - "queryState", - // not stored - List.of(), - // not stored - List.of(), - URI.create("http://localhost"), - Optional.empty(), - Optional.empty(), - Optional.empty()); - - private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics( - ofMillis(101), - ofMillis(102), - ofMillis(103), - ofMillis(104), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - 115L, - 116L, - 117L, - 118L, - 119L, - 1191L, - 1192L, - 120L, - 121L, - 122L, - 123L, - 124L, - 125L, - 126L, - 127L, - 1271L, - 128.0, - 129.0, - // not stored - Collections.emptyList(), - 130, - false, - // not stored - Collections.emptyList(), - // not stored - Collections.emptyList(), - // not stored - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - // not stored - Optional.empty()); - - private static final QueryContext QUERY_CONTEXT = new QueryContext( - "user", - "originalUser", - Optional.empty(), - Set.of(), - Set.of(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Set.of(), - // not stored - Set.of(), - Optional.empty(), - UTC_KEY.getId(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Map.of(), - // not stored - new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()), - "serverAddress", - "serverVersion", - "environment", - Optional.empty(), - "NONE"); - - private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty()); - - private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent( - QUERY_METADATA, - QUERY_STATISTICS, - QUERY_CONTEXT, - QUERY_IO_METADATA, - Optional.empty(), - List.of(), - Instant.now(), - Instant.now(), - Instant.now()); - @Test public void testShutdownIsForwardedToListeners() { @@ -170,33 +46,6 @@ public void shutdown() assertThat(wasCalled.get()).isTrue(); } - @Test - public void testMaxConcurrentQueryCompletedEvents() - throws InterruptedException - { - EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1), new SecretsResolver(ImmutableMap.of())); - eventListenerManager.addEventListener(new BlockingEventListener()); - eventListenerManager.loadEventListeners(); - ExecutorService executor = newFixedThreadPool(2); - CountDownLatch countDownLatch = new CountDownLatch(1); - try { - Runnable queryCompletedEvent = () -> { - eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT); - countDownLatch.countDown(); - }; - executor.submit(queryCompletedEvent); - executor.submit(queryCompletedEvent); - - countDownLatch.await(); - assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1); - assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1); - } - finally { - executor.shutdownNow(); - assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); - } - } - private static final class BlockingEventListener implements EventListener { diff --git a/docs/src/main/sphinx/develop/event-listener.md b/docs/src/main/sphinx/develop/event-listener.md index 8d365b605704..3c81931b34de 100644 --- a/docs/src/main/sphinx/develop/event-listener.md +++ b/docs/src/main/sphinx/develop/event-listener.md @@ -45,10 +45,6 @@ custom-property1=custom-value1 custom-property2=custom-value2 ``` -Maximum number of concurrent query completed events -can be configured using `event-listener.max-concurrent-query-completed-events` property -(`100` by default). Excessive events are dropped. - (multiple-listeners)= ## Multiple event listeners