diff --git a/core/src/main/java/org/apache/druid/collections/BlockingPool.java b/core/src/main/java/org/apache/druid/collections/BlockingPool.java index 91c3b35b6549..c17329917cd2 100644 --- a/core/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/core/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -19,31 +19,12 @@ package org.apache.druid.collections; -import javax.annotation.Nullable; import java.util.List; public interface BlockingPool { int maxSize(); - /** - * Take a resource from the pool, waiting up to the - * specified wait time if necessary for an element to become available. - * - * @param timeoutMs maximum time to wait for a resource, in milliseconds. - * - * @return a resource, or null if the timeout was reached - */ - @Nullable - ReferenceCountingResourceHolder take(long timeoutMs); - - /** - * Take a resource from the pool, waiting if necessary until an element becomes available. - * - * @return a resource - */ - ReferenceCountingResourceHolder take(); - /** * Take resources from the pool, waiting up to the * specified wait time if necessary for elements of the given number to become available. diff --git a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index b1b43fd22477..a32236db20db 100644 --- a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -76,32 +76,6 @@ public int getPoolSize() return objects.size(); } - @Override - @Nullable - public ReferenceCountingResourceHolder take(final long timeoutMs) - { - Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); - checkInitialized(); - try { - return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public ReferenceCountingResourceHolder take() - { - checkInitialized(); - try { - return wrapObject(takeObject()); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - @Nullable private ReferenceCountingResourceHolder wrapObject(T theObject) { @@ -144,21 +118,6 @@ private T pollObject(long timeoutMs) throws InterruptedException } } - private T takeObject() throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.isEmpty()) { - notEnough.await(); - } - return objects.pop(); - } - finally { - lock.unlock(); - } - } - @Override public List> takeBatch(final int elementNum, final long timeoutMs) { diff --git a/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index 037a489e1f19..dcd6cea07aa7 100644 --- a/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -44,18 +44,6 @@ public int maxSize() return 0; } - @Override - public ReferenceCountingResourceHolder take(long timeoutMs) - { - throw new UnsupportedOperationException(); - } - - @Override - public ReferenceCountingResourceHolder take() - { - throw new UnsupportedOperationException(); - } - @Override public List> takeBatch(int elementNum, long timeoutMs) { diff --git a/core/src/main/java/org/apache/druid/collections/StupidPool.java b/core/src/main/java/org/apache/druid/collections/StupidPool.java index a69a8fde30a9..d1d6a9b9b7b1 100644 --- a/core/src/main/java/org/apache/druid/collections/StupidPool.java +++ b/core/src/main/java/org/apache/druid/collections/StupidPool.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; /** + * */ public class StupidPool implements NonBlockingPool { @@ -61,6 +62,7 @@ public class StupidPool implements NonBlockingPool private final String name; private final Supplier generator; + private final AtomicLong createdObjectsCounter = new AtomicLong(0); private final AtomicLong leakedObjectsCounter = new AtomicLong(0); //note that this is just the max entries in the cache, pool can still create as many buffers as needed. @@ -114,6 +116,7 @@ public ResourceHolder take() private ObjectResourceHolder makeObjectWithHandler() { T object = generator.get(); + createdObjectsCounter.incrementAndGet(); ObjectId objectId = new ObjectId(); ObjectLeakNotifier notifier = new ObjectLeakNotifier(this); // Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken @@ -122,7 +125,7 @@ private ObjectResourceHolder makeObjectWithHandler() } @VisibleForTesting - long poolSize() + public long poolSize() { return poolSize.get(); } @@ -133,6 +136,12 @@ long leakedObjectsCount() return leakedObjectsCounter.get(); } + @VisibleForTesting + public long objectsCreatedCount() + { + return createdObjectsCounter.get(); + } + private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier) { long currentPoolSize; @@ -160,7 +169,12 @@ private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cle * This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in * it's body in OpenJDK 8. */ - private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier) + private void impossibleOffsetFailed( + T object, + ObjectId objectId, + Cleaners.Cleanable cleanable, + ObjectLeakNotifier notifier + ) { poolSize.decrementAndGet(); notifier.disable(); diff --git a/core/src/test/java/org/apache/druid/collections/BlockingPoolTest.java b/core/src/test/java/org/apache/druid/collections/BlockingPoolTest.java index 9efb9b8341ee..cc5b82ba26e0 100644 --- a/core/src/test/java/org/apache/druid/collections/BlockingPoolTest.java +++ b/core/src/test/java/org/apache/druid/collections/BlockingPoolTest.java @@ -20,6 +20,7 @@ package org.apache.druid.collections; import com.google.common.base.Suppliers; +import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.concurrent.Execs; import org.junit.After; import org.junit.Assert; @@ -66,7 +67,7 @@ public void testTakeFromEmptyPool() { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); - emptyPool.take(0); + emptyPool.takeBatch(1, 0); } @Test @@ -80,7 +81,7 @@ public void testDrainFromEmptyPool() @Test(timeout = 60_000L) public void testTake() { - final ReferenceCountingResourceHolder holder = pool.take(100); + final ReferenceCountingResourceHolder holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null); Assert.assertNotNull(holder); Assert.assertEquals(9, pool.getPoolSize()); holder.close(); @@ -91,7 +92,7 @@ public void testTake() public void testTakeTimeout() { final List> batchHolder = pool.takeBatch(10, 100L); - final ReferenceCountingResourceHolder holder = pool.take(100); + final ReferenceCountingResourceHolder holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null); Assert.assertNull(holder); batchHolder.forEach(ReferenceCountingResourceHolder::close); } @@ -147,7 +148,7 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException () -> { List> result = new ArrayList<>(); for (int i = 0; i < limit1; i++) { - result.add(pool.take(10)); + result.add(Iterables.getOnlyElement(pool.takeBatch(1, 10), null)); } return result; } @@ -156,7 +157,7 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException () -> { List> result = new ArrayList<>(); for (int i = 0; i < limit2; i++) { - result.add(pool.take(10)); + result.add(Iterables.getOnlyElement(pool.takeBatch(1, 10), null)); } return result; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 28399e0dbd35..9a417c9d6290 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -131,47 +131,53 @@ public static Sequence process( final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); - final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded( - query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, null) - ); - - final DateTime fudgeTimestamp = fudgeTimestampString == null - ? null - : DateTimes.utc(Long.parseLong(fudgeTimestampString)); + try { + final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded( + query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, null) + ); - final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); - final Interval interval = Iterables.getOnlyElement(query.getIntervals()); + final DateTime fudgeTimestamp = fudgeTimestampString == null + ? null + : DateTimes.utc(Long.parseLong(fudgeTimestampString)); - final boolean doVectorize = queryConfig.getVectorize().shouldVectorize( - VectorGroupByEngine.canVectorize(query, storageAdapter, filter) - ); + final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); + final Interval interval = Iterables.getOnlyElement(query.getIntervals()); - final Sequence result; - - if (doVectorize) { - result = VectorGroupByEngine.process( - query, - storageAdapter, - bufferHolder.get(), - fudgeTimestamp, - filter, - interval, - querySpecificConfig, - queryConfig + final boolean doVectorize = queryConfig.getVectorize().shouldVectorize( + VectorGroupByEngine.canVectorize(query, storageAdapter, filter) ); - } else { - result = processNonVectorized( - query, - storageAdapter, - bufferHolder.get(), - fudgeTimestamp, - querySpecificConfig, - filter, - interval - ); - } - return result.withBaggage(bufferHolder); + final Sequence result; + + if (doVectorize) { + result = VectorGroupByEngine.process( + query, + storageAdapter, + bufferHolder.get(), + fudgeTimestamp, + filter, + interval, + querySpecificConfig, + queryConfig + ); + } else { + result = processNonVectorized( + query, + storageAdapter, + bufferHolder.get(), + fudgeTimestamp, + querySpecificConfig, + filter, + interval + ); + } + + return result.withBaggage(bufferHolder); + } + catch (Throwable e) { + bufferHolder.close(); + throw e; + } } private static Sequence processNonVectorized( @@ -965,13 +971,13 @@ public Grouper.BufferComparator bufferComparatorWithAggregators( DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec(); return GrouperBufferComparatorUtils.bufferComparatorWithAggregators( - query.getAggregatorSpecs().toArray(new AggregatorFactory[0]), - aggregatorOffsets, - limitSpec, - query.getDimensions(), - getDimensionComparators(limitSpec), - query.getResultRowHasTimestamp(), - query.getContextSortByDimsFirst() + query.getAggregatorSpecs().toArray(new AggregatorFactory[0]), + aggregatorOffsets, + limitSpec, + query.getDimensions(), + getDimensionComparators(limitSpec), + query.getResultRowHasTimestamp(), + query.getContextSortByDimsFirst() ); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index c546b6f97c67..7cd93401b1c0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -49,6 +49,7 @@ import java.util.List; /** + * */ public class PooledTopNAlgorithm extends BaseTopNAlgorithm @@ -211,10 +212,6 @@ public PooledTopNAlgorithm( @Override public PooledTopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor) { - ResourceHolder resultsBufHolder = bufferPool.take(); - ByteBuffer resultsBuf = resultsBufHolder.get(); - resultsBuf.clear(); - final DimensionSelector dimSelector = (DimensionSelector) selectorPlus.getSelector(); final int cardinality = dimSelector.getValueCardinality(); @@ -243,27 +240,38 @@ public int[] build() } }; - final int numBytesToWorkWith = resultsBuf.remaining(); - final int[] aggregatorSizes = new int[query.getAggregatorSpecs().size()]; - int numBytesPerRecord = 0; + final ResourceHolder resultsBufHolder = bufferPool.take(); - for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) { - aggregatorSizes[i] = query.getAggregatorSpecs().get(i).getMaxIntermediateSizeWithNulls(); - numBytesPerRecord += aggregatorSizes[i]; - } + try { + final ByteBuffer resultsBuf = resultsBufHolder.get(); + resultsBuf.clear(); + + final int numBytesToWorkWith = resultsBuf.remaining(); + final int[] aggregatorSizes = new int[query.getAggregatorSpecs().size()]; + int numBytesPerRecord = 0; - final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; - - return PooledTopNParams.builder() - .withSelectorPlus(selectorPlus) - .withCursor(cursor) - .withResultsBufHolder(resultsBufHolder) - .withResultsBuf(resultsBuf) - .withArrayProvider(arrayProvider) - .withNumBytesPerRecord(numBytesPerRecord) - .withNumValuesPerPass(numValuesPerPass) - .withAggregatorSizes(aggregatorSizes) - .build(); + for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) { + aggregatorSizes[i] = query.getAggregatorSpecs().get(i).getMaxIntermediateSizeWithNulls(); + numBytesPerRecord += aggregatorSizes[i]; + } + + final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; + + return PooledTopNParams.builder() + .withSelectorPlus(selectorPlus) + .withCursor(cursor) + .withResultsBufHolder(resultsBufHolder) + .withResultsBuf(resultsBuf) + .withArrayProvider(arrayProvider) + .withNumBytesPerRecord(numBytesPerRecord) + .withNumValuesPerPass(numValuesPerPass) + .withAggregatorSizes(aggregatorSizes) + .build(); + } + catch (Throwable e) { + resultsBufHolder.close(); + throw e; + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 221bc321b4aa..fa18a0abbe05 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -71,17 +71,6 @@ private static class TestBlockingPool extends CloseableDefaultBlockingPool take(final long timeout) - { - final ReferenceCountingResourceHolder holder = super.take(timeout); - final int poolSize = getPoolSize(); - if (minRemainBufferNum > poolSize) { - minRemainBufferNum = poolSize; - } - return holder; - } - @Override public List> takeBatch(final int maxElements, final long timeout) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 1b8519d68b49..0417d4a430ba 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -404,7 +404,11 @@ public ByteBuffer get() ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector); final Closer closer = Closer.create(); - closer.register(bufferPool); + closer.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals(bufferPool.poolSize(), bufferPool.objectsCreatedCount()); + bufferPool.close(); + }); closer.register(mergeBufferPool); return Pair.of(new GroupByQueryRunnerFactory(strategySelector, toolChest), closer); } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 48fd49c0d17d..3f5b41db0a15 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -175,6 +175,15 @@ public static List>> queryRunners() ) ) ); + + RESOURCE_CLOSER.register(() -> { + // Verify that all objects have been returned to the pool. + Assert.assertEquals("defaultPool objects created", defaultPool.poolSize(), defaultPool.objectsCreatedCount()); + Assert.assertEquals("customPool objects created", customPool.poolSize(), customPool.objectsCreatedCount()); + defaultPool.close(); + customPool.close(); + }); + return retVal; }