Skip to content

Commit

Permalink
Remove event-listener.max-concurrent-query-completed-events
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Aug 14, 2024
1 parent 3c95ec1 commit 8653030
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.validation.FileExists;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.io.File;
Expand All @@ -27,7 +26,6 @@
public class EventListenerConfig
{
private List<File> eventListenerFiles = ImmutableList.of();
private int maxConcurrentQueryCompletedEvents = 100;

@NotNull
public List<@FileExists File> getEventListenerFiles()
Expand All @@ -43,17 +41,4 @@ public EventListenerConfig setEventListenerFiles(List<String> eventListenerFiles
.collect(toImmutableList());
return this;
}

@Min(1)
public int getMaxConcurrentQueryCompletedEvents()
{
return maxConcurrentQueryCompletedEvents;
}

@Config("event-listener.max-concurrent-query-completed-events")
public EventListenerConfig setMaxConcurrentQueryCompletedEvents(int maxConcurrentQueryCompletedEvents)
{
this.maxConcurrentQueryCompletedEvents = maxConcurrentQueryCompletedEvents;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.inject.Inject;
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.stats.TimeStat;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -58,13 +57,11 @@ public class EventListenerManager
private static final File CONFIG_FILE = new File("etc/event-listener.properties");
private static final String EVENT_LISTENER_NAME_PROPERTY = "event-listener.name";
private final List<File> configFiles;
private final int maxConcurrentQueryCompletedEvents;
private final Map<String, EventListenerFactory> eventListenerFactories = new ConcurrentHashMap<>();
private final List<EventListener> providedEventListeners = Collections.synchronizedList(new ArrayList<>());
private final AtomicReference<List<EventListener>> configuredEventListeners = new AtomicReference<>(ImmutableList.of());
private final AtomicBoolean loading = new AtomicBoolean(false);
private final AtomicInteger concurrentQueryCompletedEvents = new AtomicInteger();
private final CounterStat skippedQueryCompletedEvents = new CounterStat();

private final TimeStat queryCreatedTime = new TimeStat(MILLISECONDS);
private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS);
Expand All @@ -75,7 +72,6 @@ public class EventListenerManager
public EventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
this.maxConcurrentQueryCompletedEvents = config.getMaxConcurrentQueryCompletedEvents();
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
}

Expand Down Expand Up @@ -153,11 +149,7 @@ private static Map<String, String> loadEventListenerProperties(File configFile)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
try (TimeStat.BlockTimer _ = queryCompletedTime.time()) {
if (concurrentQueryCompletedEvents.incrementAndGet() > maxConcurrentQueryCompletedEvents) {
concurrentQueryCompletedEvents.decrementAndGet();
skippedQueryCompletedEvents.update(1);
return;
}
concurrentQueryCompletedEvents.incrementAndGet();
doQueryCompleted(queryCompletedEventProvider);
concurrentQueryCompletedEvents.decrementAndGet();
}
Expand Down Expand Up @@ -241,13 +233,6 @@ public int getConcurrentQueryCompletedEvents()
return concurrentQueryCompletedEvents.get();
}

@Managed
@Nested
public CounterStat getSkippedQueryCompletedEvents()
{
return skippedQueryCompletedEvents;
}

@PreDestroy
public void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class TestEventListenerConfig
public void testDefaults()
{
assertRecordedDefaults(ConfigAssertions.recordDefaults(EventListenerConfig.class)
.setMaxConcurrentQueryCompletedEvents(100)
.setEventListenerFiles(ImmutableList.of()));
}

Expand All @@ -44,12 +43,10 @@ public void testExplicitPropertyMappings()
Path config2 = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.of(
"event-listener.config-files", config1.toString() + "," + config2.toString(),
"event-listener.max-concurrent-query-completed-events", "1");
"event-listener.config-files", config1.toString() + "," + config2.toString());

EventListenerConfig expected = new EventListenerConfig()
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()))
.setMaxConcurrentQueryCompletedEvents(1);
.setEventListenerFiles(ImmutableList.of(config1.toFile().getPath(), config2.toFile().getPath()));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,138 +17,14 @@
import io.airlift.configuration.secrets.SecretsResolver;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
import io.trino.spi.eventlistener.QueryIOMetadata;
import io.trino.spi.eventlistener.QueryMetadata;
import io.trino.spi.eventlistener.QueryStatistics;
import io.trino.spi.session.ResourceEstimates;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;

class TestEventListenerManager
{
private static final QueryMetadata QUERY_METADATA = new QueryMetadata(
"minimal_query",
Optional.empty(),
"query",
Optional.empty(),
Optional.empty(),
"queryState",
// not stored
List.of(),
// not stored
List.of(),
URI.create("http://localhost"),
Optional.empty(),
Optional.empty(),
Optional.empty());

private static final QueryStatistics QUERY_STATISTICS = new QueryStatistics(
ofMillis(101),
ofMillis(102),
ofMillis(103),
ofMillis(104),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
115L,
116L,
117L,
118L,
119L,
1191L,
1192L,
120L,
121L,
122L,
123L,
124L,
125L,
126L,
127L,
1271L,
128.0,
129.0,
// not stored
Collections.emptyList(),
130,
false,
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
// not stored
Optional.empty());

private static final QueryContext QUERY_CONTEXT = new QueryContext(
"user",
"originalUser",
Optional.empty(),
Set.of(),
Set.of(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Set.of(),
// not stored
Set.of(),
Optional.empty(),
UTC_KEY.getId(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Map.of(),
// not stored
new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.empty()),
"serverAddress",
"serverVersion",
"environment",
Optional.empty(),
"NONE");

private static final QueryIOMetadata QUERY_IO_METADATA = new QueryIOMetadata(List.of(), Optional.empty());

private static final QueryCompletedEvent QUERY_COMPLETED_EVENT = new QueryCompletedEvent(
QUERY_METADATA,
QUERY_STATISTICS,
QUERY_CONTEXT,
QUERY_IO_METADATA,
Optional.empty(),
List.of(),
Instant.now(),
Instant.now(),
Instant.now());

@Test
public void testShutdownIsForwardedToListeners()
{
Expand All @@ -170,33 +46,6 @@ public void shutdown()
assertThat(wasCalled.get()).isTrue();
}

@Test
public void testMaxConcurrentQueryCompletedEvents()
throws InterruptedException
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig().setMaxConcurrentQueryCompletedEvents(1), new SecretsResolver(ImmutableMap.of()));
eventListenerManager.addEventListener(new BlockingEventListener());
eventListenerManager.loadEventListeners();
ExecutorService executor = newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
Runnable queryCompletedEvent = () -> {
eventListenerManager.queryCompleted(_ -> QUERY_COMPLETED_EVENT);
countDownLatch.countDown();
};
executor.submit(queryCompletedEvent);
executor.submit(queryCompletedEvent);

countDownLatch.await();
assertThat(eventListenerManager.getSkippedQueryCompletedEvents().getTotalCount()).isEqualTo(1);
assertThat(eventListenerManager.getConcurrentQueryCompletedEvents()).isEqualTo(1);
}
finally {
executor.shutdownNow();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
}
}

private static final class BlockingEventListener
implements EventListener
{
Expand Down
4 changes: 0 additions & 4 deletions docs/src/main/sphinx/develop/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ custom-property1=custom-value1
custom-property2=custom-value2
```

Maximum number of concurrent query completed events
can be configured using `event-listener.max-concurrent-query-completed-events` property
(`100` by default). Excessive events are dropped.

(multiple-listeners)=
## Multiple event listeners

Expand Down

0 comments on commit 8653030

Please sign in to comment.