From 9ce424c448d21d939a4fc3cacccabdd7933c6e03 Mon Sep 17 00:00:00 2001 From: Arjun Gupta Date: Wed, 10 Jul 2024 14:14:32 -0700 Subject: [PATCH] Send periodic query progress events This change enables coordinators to push their current state of Queued/Running queries as progress events which can be logged to different sources for analysis purposes. Progress event is sent every 1 minute for each queued/running queries. --- .../facebook/presto/event/QueryMonitor.java | 66 +++++++++++++ .../presto/event/QueryMonitorConfig.java | 18 ++++ .../presto/event/QueryProgressMonitor.java | 86 ++++++++++++++++ .../eventlistener/EventListenerManager.java | 7 ++ .../presto/server/CoordinatorModule.java | 2 + .../testing/TestingEventListenerManager.java | 9 ++ .../dispatcher/TestLocalDispatchQuery.java | 7 ++ .../presto/event/TestQueryMonitorConfig.java | 9 +- .../spi/eventlistener/EventListener.java | 4 + .../spi/eventlistener/QueryProgressEvent.java | 77 +++++++++++++++ .../presto/execution/TestEventListener.java | 98 ++++++++++++++++--- .../execution/TestEventListenerPlugin.java | 7 ++ 12 files changed, 376 insertions(+), 14 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/event/QueryProgressMonitor.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryProgressEvent.java diff --git a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java index 171b57d8913f..1998c567ebb0 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java +++ b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitor.java @@ -46,6 +46,7 @@ import com.facebook.presto.operator.TableFinishInfo; import com.facebook.presto.operator.TaskStats; import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.BasicQueryStats; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.eventlistener.OperatorStatistics; @@ -57,6 +58,7 @@ import com.facebook.presto.spi.eventlistener.QueryInputMetadata; import com.facebook.presto.spi.eventlistener.QueryMetadata; import com.facebook.presto.spi.eventlistener.QueryOutputMetadata; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.QueryStatistics; import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.ResourceDistribution; @@ -173,6 +175,30 @@ public void queryUpdatedEvent(QueryInfo queryInfo) eventListenerManager.queryUpdated(new QueryUpdatedEvent(createQueryMetadata(queryInfo))); } + public void publishQueryProgressEvent(long monotonicallyIncreasingEventId, BasicQueryInfo queryInfo) + { + eventListenerManager.publishQueryProgress(new QueryProgressEvent( + monotonicallyIncreasingEventId, + new QueryMetadata( + queryInfo.getQueryId().toString(), + queryInfo.getSession().getTransactionId().map(TransactionId::toString), + queryInfo.getQuery(), + queryInfo.getQueryHash(), + queryInfo.getPreparedQuery(), + queryInfo.getState().toString(), + queryInfo.getSelf(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + ImmutableList.of(), + queryInfo.getSession().getTraceToken()), + createQueryStatistics(queryInfo), + createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId()), + queryInfo.getQueryType(), + ofEpochMilli(queryInfo.getQueryStats().getCreateTime().getMillis()))); + } + public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure) { eventListenerManager.queryCompleted(new QueryCompletedEvent( @@ -431,6 +457,46 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getRuntimeStats()); } + private QueryStatistics createQueryStatistics(BasicQueryInfo basicQueryInfo) + { + BasicQueryStats queryStats = basicQueryInfo.getQueryStats(); + + return new QueryStatistics( + ofMillis(queryStats.getTotalCpuTime().toMillis()), + ofMillis(0), + ofMillis(queryStats.getElapsedTime().toMillis()), + ofMillis(queryStats.getWaitingForPrerequisitesTime().toMillis()), + ofMillis(queryStats.getQueuedTime().toMillis()), + ofMillis(0), + ofMillis(0), + ofMillis(0), + ofMillis(0), + ofMillis(0), + Optional.of(ofMillis(0)), + ofMillis(queryStats.getExecutionTime().toMillis()), + queryStats.getPeakRunningTasks(), + queryStats.getPeakUserMemoryReservation().toBytes(), + queryStats.getPeakTotalMemoryReservation().toBytes(), + 0, + 0, + 0, + 0, + 0, + queryStats.getRawInputDataSize().toBytes(), + queryStats.getRawInputPositions(), + 0, + 0, + 0, + 0, + 0, + 0, + queryStats.getCumulativeUserMemory(), + queryStats.getCumulativeTotalMemory(), + queryStats.getCompletedDrivers(), + false, + new RuntimeStats()); + } + private QueryContext createQueryContext(SessionRepresentation session, Optional resourceGroup) { return new QueryContext( diff --git a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitorConfig.java b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitorConfig.java index 37995e73367c..c43f208028f9 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/QueryMonitorConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/event/QueryMonitorConfig.java @@ -14,16 +14,21 @@ package com.facebook.presto.event; import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; +import io.airlift.units.Duration; import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; import javax.validation.constraints.NotNull; +import static java.util.concurrent.TimeUnit.MINUTES; + public class QueryMonitorConfig { private DataSize maxOutputStageJsonSize = new DataSize(16, Unit.MEGABYTE); + private Duration queryProgressPublishInterval = new Duration(0, MINUTES); @MinDataSize("1kB") @MaxDataSize("1GB") @@ -39,4 +44,17 @@ public QueryMonitorConfig setMaxOutputStageJsonSize(DataSize maxOutputStageJsonS this.maxOutputStageJsonSize = maxOutputStageJsonSize; return this; } + + public Duration getQueryProgressPublishInterval() + { + return queryProgressPublishInterval; + } + + @Config("event.query-progress-publish-interval") + @ConfigDescription("How frequently to publish query progress events. 0 duration disables the publication of these events.") + public QueryMonitorConfig setQueryProgressPublishInterval(Duration queryProgressPublishInterval) + { + this.queryProgressPublishInterval = queryProgressPublishInterval; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/event/QueryProgressMonitor.java b/presto-main/src/main/java/com/facebook/presto/event/QueryProgressMonitor.java new file mode 100644 index 000000000000..8864a3da8a0f --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/event/QueryProgressMonitor.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.event; + +import com.facebook.presto.dispatcher.DispatchManager; +import com.facebook.presto.server.BasicQueryInfo; +import io.airlift.units.Duration; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; + +public class QueryProgressMonitor +{ + private final AtomicLong monotonicallyIncreasingEventId = new AtomicLong(); + + private final QueryMonitor queryMonitor; + private final DispatchManager dispatchManager; + private final Duration queryProgressPublishInterval; + + @GuardedBy("this") + private ScheduledExecutorService queryProgressMonitorExecutor; + + @Inject + public QueryProgressMonitor( + QueryMonitor queryMonitor, + DispatchManager dispatchManager, + QueryMonitorConfig queryMonitorConfig) + { + this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); + this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); + this.queryProgressPublishInterval = requireNonNull(queryMonitorConfig, "queryMonitorConfig is null").getQueryProgressPublishInterval(); + } + + @PostConstruct + public synchronized void start() + { + if (queryProgressPublishInterval.getValue() > 0) { + if (queryProgressMonitorExecutor == null) { + queryProgressMonitorExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("query-progress-monitor-executor")); + } + + queryProgressMonitorExecutor.scheduleWithFixedDelay( + this::publishQueryProgressEvent, + (long) queryProgressPublishInterval.getValue(), + (long) queryProgressPublishInterval.getValue(), + queryProgressPublishInterval.getUnit()); + } + } + + @PreDestroy + public synchronized void stop() + { + if (queryProgressMonitorExecutor != null) { + queryProgressMonitorExecutor.shutdown(); + } + } + + private void publishQueryProgressEvent() + { + for (BasicQueryInfo basicQueryInfo : dispatchManager.getQueries()) { + if (!basicQueryInfo.getState().isDone()) { + queryMonitor.publishQueryProgressEvent(monotonicallyIncreasingEventId.incrementAndGet(), basicQueryInfo); + } + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java index a5d1ec6cf85b..24a1d04b54bb 100644 --- a/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java +++ b/presto-main/src/main/java/com/facebook/presto/eventlistener/EventListenerManager.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.google.common.annotations.VisibleForTesting; @@ -115,6 +116,12 @@ public void queryUpdated(QueryUpdatedEvent queryUpdatedEvent) .ifPresent(eventListener -> eventListener.queryUpdated(queryUpdatedEvent)); } + public void publishQueryProgress(QueryProgressEvent queryProgressEvent) + { + configuredEventListener.get() + .ifPresent(eventListener -> eventListener.publishQueryProgress(queryProgressEvent)); + } + public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { configuredEventListener.get() diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java index 60489142b810..0306a44c24a5 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java @@ -31,6 +31,7 @@ import com.facebook.presto.dispatcher.LocalDispatchQueryFactory; import com.facebook.presto.event.QueryMonitor; import com.facebook.presto.event.QueryMonitorConfig; +import com.facebook.presto.event.QueryProgressMonitor; import com.facebook.presto.execution.ClusterSizeMonitor; import com.facebook.presto.execution.ExecutionFactoriesManager; import com.facebook.presto.execution.ExplainAnalyzeContext; @@ -163,6 +164,7 @@ protected void setup(Binder binder) jsonCodecBinder(binder).bindJsonCodec(OperatorInfo.class); configBinder(binder).bindConfig(QueryMonitorConfig.class); binder.bind(QueryMonitor.class).in(Scopes.SINGLETON); + binder.bind(QueryProgressMonitor.class).in(Scopes.SINGLETON); // query manager jaxrsBinder(binder).bind(QueryResource.class); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java index 2935a0289383..d0bc6e235917 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingEventListenerManager.java @@ -18,6 +18,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.google.common.collect.ImmutableMap; @@ -60,6 +61,14 @@ public void queryUpdated(QueryUpdatedEvent queryUpdatedEvent) } } + @Override + public void publishQueryProgress(QueryProgressEvent queryProgressEvent) + { + if (configuredEventListener.get().isPresent()) { + configuredEventListener.get().get().publishQueryProgress(queryProgressEvent); + } + } + @Override public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { diff --git a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java index 107f07dd7e5b..eb632eb160f2 100644 --- a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java +++ b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java @@ -39,6 +39,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.facebook.presto.spi.prerequisites.QueryPrerequisites; @@ -522,6 +523,12 @@ public void queryUpdated(QueryUpdatedEvent queryUpdatedEvent) fail("Query update events should not be created in this test"); } + @Override + public void publishQueryProgress(QueryProgressEvent queryProgressEvent) + { + fail("Query Progress events should not be created in this test"); + } + @Override public void queryCompleted(QueryCompletedEvent event) { diff --git a/presto-main/src/test/java/com/facebook/presto/event/TestQueryMonitorConfig.java b/presto-main/src/test/java/com/facebook/presto/event/TestQueryMonitorConfig.java index 4f9987cce3ec..5c1df0ce98c6 100644 --- a/presto-main/src/test/java/com/facebook/presto/event/TestQueryMonitorConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/event/TestQueryMonitorConfig.java @@ -15,9 +15,11 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import org.testng.annotations.Test; import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; @@ -30,7 +32,8 @@ public class TestQueryMonitorConfig public void testDefaults() { assertRecordedDefaults(recordDefaults(QueryMonitorConfig.class) - .setMaxOutputStageJsonSize(new DataSize(16, Unit.MEGABYTE))); + .setMaxOutputStageJsonSize(new DataSize(16, Unit.MEGABYTE)) + .setQueryProgressPublishInterval(new Duration(0, TimeUnit.MINUTES))); } @Test @@ -38,10 +41,12 @@ public void testExplicitPropertyMappings() { Map properties = new ImmutableMap.Builder() .put("event.max-output-stage-size", "512kB") + .put("event.query-progress-publish-interval", "2m") .build(); QueryMonitorConfig expected = new QueryMonitorConfig() - .setMaxOutputStageJsonSize(new DataSize(512, Unit.KILOBYTE)); + .setMaxOutputStageJsonSize(new DataSize(512, Unit.KILOBYTE)) + .setQueryProgressPublishInterval(new Duration(2, TimeUnit.MINUTES)); assertFullMapping(properties, expected); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/EventListener.java b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/EventListener.java index cd5796065d7d..3e7b79be8c5b 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/EventListener.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/EventListener.java @@ -27,6 +27,10 @@ default void queryUpdated(QueryUpdatedEvent queryUpdatedEvent) { } + default void publishQueryProgress(QueryProgressEvent queryProgressEvent) + { + } + default void queryCompleted(QueryCompletedEvent queryCompletedEvent) { } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryProgressEvent.java b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryProgressEvent.java new file mode 100644 index 000000000000..35f7b3a37686 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/eventlistener/QueryProgressEvent.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.eventlistener; + +import com.facebook.presto.common.resourceGroups.QueryType; + +import java.time.Instant; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class QueryProgressEvent +{ + private final long monotonicallyIncreasingEventId; + private final QueryMetadata metadata; + private final QueryStatistics statistics; + private final QueryContext context; + private final Optional queryType; + private final Instant createTime; + + public QueryProgressEvent( + long monotonicallyIncreasingEventId, + QueryMetadata metadata, + QueryStatistics statistics, + QueryContext context, + Optional queryType, + Instant createTime) + { + this.monotonicallyIncreasingEventId = monotonicallyIncreasingEventId; + this.metadata = requireNonNull(metadata, "metadata is null"); + this.statistics = requireNonNull(statistics, "statistics is null"); + this.context = requireNonNull(context, "context is null"); + this.queryType = requireNonNull(queryType, "queryType is null"); + this.createTime = requireNonNull(createTime, "createTime is null"); + } + + public long getMonotonicallyIncreasingEventId() + { + return monotonicallyIncreasingEventId; + } + + public QueryMetadata getMetadata() + { + return metadata; + } + + public QueryStatistics getStatistics() + { + return statistics; + } + + public QueryContext getContext() + { + return context; + } + + public Optional getQueryType() + { + return queryType; + } + + public Instant getCreateTime() + { + return createTime; + } +} diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java index 2fa5244420d5..85892ea69c1b 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListener.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.tests.DistributedQueryRunner; @@ -32,6 +33,7 @@ import org.testng.annotations.Test; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -44,6 +46,7 @@ import static java.util.stream.Collectors.toSet; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) @@ -54,6 +57,7 @@ public class TestEventListener private DistributedQueryRunner queryRunner; private Session session; + private long lastSeenQueryProgressEventId; @BeforeClass private void setUp() @@ -65,7 +69,12 @@ private void setUp() .setSchema("tiny") .setClientInfo("{\"clientVersion\":\"testVersion\"}") .build(); - queryRunner = new DistributedQueryRunner(session, 1); + + Map properties = ImmutableMap.builder() + .put("event.query-progress-publish-interval", "1ms") + .build(); + + queryRunner = new DistributedQueryRunner(session, 1, properties); queryRunner.installPlugin(new TpchPlugin()); queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents)); queryRunner.installPlugin(new ResourceGroupManagerPlugin()); @@ -101,8 +110,8 @@ private MaterializedResult runQueryAndWaitForEvents(@Language("SQL") String sql, public void testConstantQuery() throws Exception { - // QueryCreated: 1, QueryCompleted: 1, Splits: 1 - runQueryAndWaitForEvents("SELECT 1", 3); + // QueryCreated: 1, QueryProgressEvent:1, QueryCompleted: 1, Splits: 1 + runQueryAndWaitForEvents("SELECT 1", 4); QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); @@ -112,6 +121,16 @@ public void testConstantQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT 1"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); + QueryProgressEvent queryProgressEvent = generatedEvents.getQueryProgressEvent(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); + assertTrue(queryProgressEvent.getContext().getResourceGroupId().isPresent()); + assertEquals(queryProgressEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); + assertEquals(queryProgressEvent.getStatistics().getTotalRows(), 0L); + assertEquals(queryProgressEvent.getContext().getClientInfo().get(), "{\"clientVersion\":\"testVersion\"}"); + assertEquals(queryCreatedEvent.getMetadata().getQueryId(), queryProgressEvent.getMetadata().getQueryId()); + QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); @@ -131,8 +150,8 @@ public void testNormalQuery() throws Exception { // We expect the following events - // QueryCreated: 1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; + // QueryCreated: 1, QueryProgressEvent:1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split + int expectedEvents = 1 + 1 + 1 + SPLITS_PER_NODE + 1 + 1; runQueryAndWaitForEvents("SELECT sum(linenumber) FROM lineitem", expectedEvents); QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); @@ -143,6 +162,16 @@ public void testNormalQuery() assertEquals(queryCreatedEvent.getMetadata().getQuery(), "SELECT sum(linenumber) FROM lineitem"); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); + QueryProgressEvent queryProgressEvent = generatedEvents.getQueryProgressEvent(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); + assertTrue(queryProgressEvent.getContext().getResourceGroupId().isPresent()); + assertEquals(queryProgressEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); + assertEquals(queryProgressEvent.getContext().getClientInfo().get(), "{\"clientVersion\":\"testVersion\"}"); + assertEquals(queryCreatedEvent.getMetadata().getQueryId(), queryProgressEvent.getMetadata().getQueryId()); + assertFalse(queryProgressEvent.getMetadata().getPreparedQuery().isPresent()); + QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); @@ -183,8 +212,8 @@ public void testPrepareAndExecute() String selectQuery = "SELECT count(*) FROM lineitem WHERE shipmode = ?"; String prepareQuery = "PREPARE stmt FROM " + selectQuery; - // QueryCreated: 1, QueryCompleted: 1, Splits: 0 - runQueryAndWaitForEvents(prepareQuery, 2); + // QueryCreated: 1, QueryProgressEvent: 1, QueryCompleted: 1, Splits: 0 + runQueryAndWaitForEvents(prepareQuery, 3); QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); assertEquals(queryCreatedEvent.getContext().getServerVersion(), "testversion"); @@ -194,6 +223,16 @@ public void testPrepareAndExecute() assertEquals(queryCreatedEvent.getMetadata().getQuery(), prepareQuery); assertFalse(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); + QueryProgressEvent queryProgressEvent = generatedEvents.getQueryProgressEvent(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); + assertTrue(queryProgressEvent.getContext().getResourceGroupId().isPresent()); + assertEquals(queryProgressEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); + assertEquals(queryProgressEvent.getContext().getClientInfo().get(), "{\"clientVersion\":\"testVersion\"}"); + assertEquals(queryCreatedEvent.getMetadata().getQueryId(), queryProgressEvent.getMetadata().getQueryId()); + assertFalse(queryProgressEvent.getMetadata().getPreparedQuery().isPresent()); + QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); @@ -208,8 +247,8 @@ public void testPrepareAndExecute() Session sessionWithPrepare = Session.builder(session).addPreparedStatement("stmt", selectQuery).build(); // We expect the following events - // QueryCreated: 1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; + // QueryCreated: 1, QueryProgressEvent:1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split + int expectedEvents = 1 + 1 + 1 + SPLITS_PER_NODE + 1 + 1; runQueryAndWaitForEvents("EXECUTE stmt USING 'SHIP'", expectedEvents, sessionWithPrepare); queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); @@ -221,6 +260,17 @@ public void testPrepareAndExecute() assertTrue(queryCreatedEvent.getMetadata().getPreparedQuery().isPresent()); assertEquals(queryCreatedEvent.getMetadata().getPreparedQuery().get(), selectQuery); + queryProgressEvent = generatedEvents.getQueryProgressEvent(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); + assertTrue(queryProgressEvent.getContext().getResourceGroupId().isPresent()); + assertEquals(queryProgressEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); + assertEquals(queryProgressEvent.getContext().getClientInfo().get(), "{\"clientVersion\":\"testVersion\"}"); + assertEquals(queryCreatedEvent.getMetadata().getQueryId(), queryProgressEvent.getMetadata().getQueryId()); + assertTrue(queryProgressEvent.getMetadata().getPreparedQuery().isPresent()); + assertEquals(queryProgressEvent.getMetadata().getPreparedQuery().get(), selectQuery); + queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); assertTrue(queryCompletedEvent.getContext().getResourceGroupId().isPresent()); assertEquals(queryCompletedEvent.getContext().getResourceGroupId().get(), createResourceGroupId("global", "user-user")); @@ -239,13 +289,17 @@ public void testOutputStats() throws Exception { // We expect the following events - // QueryCreated: 1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; + // QueryCreated: 1, QueryProgressEvent:1, QueryCompleted: 1, Splits: SPLITS_PER_NODE (leaf splits) + LocalExchange[SINGLE] split + Aggregation/Output split + int expectedEvents = 1 + 1 + 1 + SPLITS_PER_NODE + 1 + 1; MaterializedResult result = runQueryAndWaitForEvents("SELECT 1 FROM lineitem", expectedEvents); QueryCreatedEvent queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + QueryProgressEvent queryProgressEvent = generatedEvents.getQueryProgressEvent(); QueryCompletedEvent queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); QueryStats queryStats = queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); assertTrue(queryCompletedEvent.getStatistics().getOutputBytes() > 0L); assertEquals(result.getRowCount(), queryStats.getOutputPositions()); @@ -253,9 +307,13 @@ public void testOutputStats() runQueryAndWaitForEvents("SELECT COUNT(1) FROM lineitem", expectedEvents); queryCreatedEvent = getOnlyElement(generatedEvents.getQueryCreatedEvents()); + queryProgressEvent = generatedEvents.getQueryProgressEvent(); queryCompletedEvent = getOnlyElement(generatedEvents.getQueryCompletedEvents()); queryStats = queryRunner.getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(queryCreatedEvent.getMetadata().getQueryId())).getQueryStats(); + assertNotNull(queryProgressEvent); + assertTrue(queryProgressEvent.getMonotonicallyIncreasingEventId() > lastSeenQueryProgressEventId); + lastSeenQueryProgressEventId = queryProgressEvent.getMonotonicallyIncreasingEventId(); assertTrue(queryStats.getOutputDataSize().toBytes() > 0L); assertTrue(queryCompletedEvent.getStatistics().getOutputBytes() > 0L); assertEquals(1L, queryStats.getOutputPositions()); @@ -266,7 +324,7 @@ public void testOutputStats() public void testGraphvizQueryPlanOutput() throws Exception { - int expectedEvents = 1 + 1 + SPLITS_PER_NODE + 1 + 1; + int expectedEvents = 1 + 1 + 1 + SPLITS_PER_NODE + 1 + 1; String query = "EXPLAIN (type distributed, format graphviz) SELECT * FROM LINEITEM limit 1"; Session sessionForEventLoggingWithStats = Session.builder(session) .setSystemProperty("print_stats_for_non_join_query", "true") @@ -282,6 +340,7 @@ static class EventsBuilder private ImmutableList.Builder queryCreatedEvents; private ImmutableList.Builder queryCompletedEvents; private ImmutableList.Builder splitCompletedEvents; + private QueryProgressEvent queryProgressEvent; private CountDownLatch eventsLatch; @@ -290,6 +349,7 @@ public synchronized void initialize(int numEvents) queryCreatedEvents = ImmutableList.builder(); queryCompletedEvents = ImmutableList.builder(); splitCompletedEvents = ImmutableList.builder(); + queryProgressEvent = null; eventsLatch = new CountDownLatch(numEvents); } @@ -318,6 +378,15 @@ public synchronized void addSplitCompleted(SplitCompletedEvent event) eventsLatch.countDown(); } + public synchronized void addQueryProgress(QueryProgressEvent event) + { + // Store only one QueryProgress event + if (queryProgressEvent == null) { + queryProgressEvent = event; + eventsLatch.countDown(); + } + } + public List getQueryCreatedEvents() { return queryCreatedEvents.build(); @@ -332,5 +401,10 @@ public List getSplitCompletedEvents() { return splitCompletedEvents.build(); } + + public QueryProgressEvent getQueryProgressEvent() + { + return queryProgressEvent; + } } } diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java index 613e10fe7f51..bc5798435232 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestEventListenerPlugin.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.QueryProgressEvent; import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; import com.google.common.collect.ImmutableList; @@ -95,5 +96,11 @@ public void splitCompleted(SplitCompletedEvent splitCompletedEvent) { eventsBuilder.addSplitCompleted(splitCompletedEvent); } + + @Override + public void publishQueryProgress(QueryProgressEvent queryProgressEvent) + { + eventsBuilder.addQueryProgress(queryProgressEvent); + } } }