Skip to content

Commit

Permalink
Add EventListenerContext to factory
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Dec 5, 2024
1 parent cdb3337 commit e46e56b
Show file tree
Hide file tree
Showing 31 changed files with 259 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.eventlistener;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.eventlistener.EventListenerFactory;

import static java.util.Objects.requireNonNull;

public class EventListenerContextInstance
implements EventListenerFactory.EventListenerContext
{
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final String version;

public EventListenerContextInstance(String version, OpenTelemetry openTelemetry, Tracer tracer)
{
this.version = requireNonNull(version, "version is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
}

@Override
public String getVersion()
{
return version;
}

@Override
public OpenTelemetry getOpenTelemetry()
{
return openTelemetry;
}

@Override
public Tracer getTracer()
{
return tracer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.client.NodeVersion;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
Expand Down Expand Up @@ -67,12 +70,14 @@ public class EventListenerManager
private final TimeStat queryCompletedTime = new TimeStat(MILLISECONDS);
private final TimeStat splitCompletedTime = new TimeStat(MILLISECONDS);
private final SecretsResolver secretsResolver;
private final EventListenerContextInstance context;

@Inject
public EventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver)
public EventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver, OpenTelemetry openTelemetry, Tracer tracer, NodeVersion version)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.context = new EventListenerContextInstance(version.toString(), openTelemetry, tracer);
}

public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
Expand Down Expand Up @@ -129,7 +134,7 @@ private EventListener createEventListener(File configFile)

EventListener eventListener;
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
eventListener = factory.create(secretsResolver.getResolvedConfiguration(properties));
eventListener = factory.create(secretsResolver.getResolvedConfiguration(properties), context);
}

log.info("-- Loaded event listener %s --", configFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
this.joinCompiler = new JoinCompiler(typeOperators);
this.hashStrategyCompiler = new FlatHashStrategyCompiler(typeOperators);
PageIndexerFactory pageIndexerFactory = new GroupByHashPageIndexerFactory(hashStrategyCompiler);
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), secretsResolver);
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), secretsResolver, noop(), tracer, nodeManager.getCurrentNode().getNodeVersion());
this.accessControl = new TestingAccessControlManager(transactionManager, eventListenerManager, secretsResolver);
accessControl.loadSystemAccessControl(AllowAllSystemAccessControl.NAME, ImmutableMap.of());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.testing;

import io.airlift.tracing.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.eventlistener.EventListenerFactory;

public class TestingEventListenerContext
implements EventListenerFactory.EventListenerContext
{
@Override
public String getVersion()
{
return "test-version";
}

@Override
public OpenTelemetry getOpenTelemetry()
{
return OpenTelemetry.noop();
}

@Override
public Tracer getTracer()
{
return Tracing.noopTracer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.tracing.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.client.NodeVersion;
import io.trino.eventlistener.EventListenerConfig;
import io.trino.eventlistener.EventListenerManager;
import io.trino.spi.eventlistener.EventListener;
Expand All @@ -43,13 +46,13 @@ public static TestingEventListenerManager emptyEventListenerManager()
@Inject
public TestingEventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver)
{
super(config, secretsResolver);
super(config, secretsResolver, OpenTelemetry.noop(), Tracing.noopTracer(), new NodeVersion("test-version"));
}

@Override
public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
{
configuredEventListeners.add(eventListenerFactory.create(ImmutableMap.of()));
configuredEventListeners.add(eventListenerFactory.create(ImmutableMap.of(), new TestingEventListenerContext()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airlift.json.JsonCodec;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.connector.ConnectorCatalogServiceProvider;
Expand Down Expand Up @@ -76,6 +75,8 @@

import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.tracing.Tracing.noopTracer;
import static io.opentelemetry.api.OpenTelemetry.noop;
import static io.trino.SessionTestUtils.TEST_SESSION;
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static io.trino.metadata.TestMetadataManager.createTestMetadataManager;
Expand All @@ -102,7 +103,7 @@ public void testSubmittedForDispatchedQuery()
transactionManager,
emptyEventListenerManager(),
new AccessControlConfig(),
OpenTelemetry.noop(),
noop(),
new SecretsResolver(ImmutableMap.of()),
DefaultSystemAccessControl.NAME);
accessControl.setSystemAccessControls(List.of(AllowAllSystemAccessControl.INSTANCE));
Expand All @@ -128,7 +129,7 @@ public void testSubmittedForDispatchedQuery()
JsonCodec.jsonCodec(OperatorStats.class),
JsonCodec.jsonCodec(ExecutionFailureInfo.class),
JsonCodec.jsonCodec(StatsAndCosts.class),
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of())),
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new NodeVersion("test")),
new NodeInfo("node"),
new NodeVersion("version"),
new SessionPropertyManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@

import com.google.common.collect.ImmutableMap;
import io.airlift.configuration.secrets.SecretsResolver;
import io.trino.client.NodeVersion;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static io.airlift.tracing.Tracing.noopTracer;
import static io.opentelemetry.api.OpenTelemetry.noop;
import static org.assertj.core.api.Assertions.assertThat;

class TestEventListenerManager
{
@Test
public void testShutdownIsForwardedToListeners()
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()));
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new NodeVersion("test-version"));
AtomicBoolean wasCalled = new AtomicBoolean(false);
EventListener listener = new EventListener()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.tracing.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogServiceProvider;
Expand Down Expand Up @@ -73,6 +71,8 @@
import java.util.Optional;
import java.util.Set;

import static io.airlift.tracing.Tracing.noopTracer;
import static io.opentelemetry.api.OpenTelemetry.noop;
import static io.trino.SessionTestUtils.TEST_SESSION;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
Expand Down Expand Up @@ -195,7 +195,7 @@ public static LocalExecutionPlanner createTestingPlanner()
blockTypeOperators,
PLANNER_CONTEXT.getTypeOperators(),
new TableExecuteContextManager(),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())),
new NodeVersion("test"),
new CompilerConfig());
}
Expand All @@ -208,7 +208,7 @@ public static TaskInfo updateTask(SqlTask sqlTask, List<SplitAssignment> splitAs
public static SplitMonitor createTestSplitMonitor()
{
return new SplitMonitor(
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of())),
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new NodeVersion("test")),
new ObjectMapperProvider().get());
}
}
7 changes: 7 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@
<new>method io.trino.spi.security.SystemAccessControl io.trino.spi.security.SystemAccessControlFactory::create(java.util.Map&lt;java.lang.String, java.lang.String&gt;, io.trino.spi.security.SystemAccessControlFactory.SystemAccessControlContext)</new>
<justification>Old variant was removed and this becomes the new non-default one</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.numberOfParametersChanged</code>
<old>method io.trino.spi.eventlistener.EventListener io.trino.spi.eventlistener.EventListenerFactory::create(java.util.Map&lt;java.lang.String, java.lang.String&gt;)</old>
<new>method io.trino.spi.eventlistener.EventListener io.trino.spi.eventlistener.EventListenerFactory::create(java.util.Map&lt;java.lang.String, java.lang.String&gt;, io.trino.spi.eventlistener.EventListenerFactory.EventListenerContext)</new>
<justification>Added EventListenerContext</justification>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,23 @@
*/
package io.trino.spi.eventlistener;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;

import java.util.Map;

public interface EventListenerFactory
{
String getName();

EventListener create(Map<String, String> config);
EventListener create(Map<String, String> config, EventListenerContext context);

interface EventListenerContext
{
String getVersion();

OpenTelemetry getOpenTelemetry();

Tracer getTracer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.base.evenlistener;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.eventlistener.EventListenerFactory;

import static java.util.Objects.requireNonNull;

public class TestingEventListenerContext
implements EventListenerFactory.EventListenerContext
{
private final String trinoVersion;

public TestingEventListenerContext()
{
this("trino-version");
}

public TestingEventListenerContext(String version)
{
this.trinoVersion = requireNonNull(version, "version is null");
}

@Override
public String getVersion()
{
return this.trinoVersion;
}

@Override
public OpenTelemetry getOpenTelemetry()
{
return OpenTelemetry.noop();
}

@Override
public Tracer getTracer()
{
return OpenTelemetry.noop().getTracer("TEST_TRACER");
}
}
7 changes: 7 additions & 0 deletions plugin/trino-http-event-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getName()
}

@Override
public EventListener create(Map<String, String> config)
public EventListener create(Map<String, String> config, EventListenerContext context)
{
Bootstrap app = new Bootstrap(
new JsonModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.trino.operator.RetryPolicy;
import io.trino.plugin.base.evenlistener.TestingEventListenerContext;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
import io.trino.spi.eventlistener.QueryCompletedEvent;
Expand Down Expand Up @@ -520,6 +521,6 @@ private EventListener createEventListener(Map<String, String> config)
return factory.create(ImmutableMap.<String, String>builder()
.putAll(config)
.put("bootstrap.quiet", "true")
.buildOrThrow());
.buildOrThrow(), new TestingEventListenerContext());
}
}
Loading

0 comments on commit e46e56b

Please sign in to comment.