Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various processing buffer leaks and simplify BlockingPool. #9928

Merged
merged 3 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions core/src/main/java/org/apache/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,12 @@

package org.apache.druid.collections;

import javax.annotation.Nullable;
import java.util.List;

public interface BlockingPool<T>
{
int maxSize();

/**
* Take a resource from the pool, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeoutMs maximum time to wait for a resource, in milliseconds.
*
* @return a resource, or null if the timeout was reached
*/
@Nullable
ReferenceCountingResourceHolder<T> take(long timeoutMs);

/**
* Take a resource from the pool, waiting if necessary until an element becomes available.
*
* @return a resource
*/
ReferenceCountingResourceHolder<T> take();

/**
* Take resources from the pool, waiting up to the
* specified wait time if necessary for elements of the given number to become available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,6 @@ public int getPoolSize()
return objects.size();
}

@Override
@Nullable
public ReferenceCountingResourceHolder<T> take(final long timeoutMs)
{
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public ReferenceCountingResourceHolder<T> take()
{
checkInitialized();
try {
return wrapObject(takeObject());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Nullable
private ReferenceCountingResourceHolder<T> wrapObject(T theObject)
{
Expand Down Expand Up @@ -144,21 +118,6 @@ private T pollObject(long timeoutMs) throws InterruptedException
}
}

private T takeObject() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (objects.isEmpty()) {
notEnough.await();
}
return objects.pop();
}
finally {
lock.unlock();
}
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum, final long timeoutMs)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@ public int maxSize()
return 0;
}

@Override
public ReferenceCountingResourceHolder<T> take(long timeoutMs)
{
throw new UnsupportedOperationException();
}

@Override
public ReferenceCountingResourceHolder<T> take()
{
throw new UnsupportedOperationException();
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs)
{
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/druid/collections/StupidPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicReference;

/**
*
*/
public class StupidPool<T> implements NonBlockingPool<T>
{
Expand Down Expand Up @@ -61,6 +62,7 @@ public class StupidPool<T> implements NonBlockingPool<T>
private final String name;
private final Supplier<T> generator;

private final AtomicLong createdObjectsCounter = new AtomicLong(0);
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);

//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
Expand Down Expand Up @@ -114,6 +116,7 @@ public ResourceHolder<T> take()
private ObjectResourceHolder makeObjectWithHandler()
{
T object = generator.get();
createdObjectsCounter.incrementAndGet();
ObjectId objectId = new ObjectId();
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this);
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
Expand All @@ -122,7 +125,7 @@ private ObjectResourceHolder makeObjectWithHandler()
}

@VisibleForTesting
long poolSize()
public long poolSize()
{
return poolSize.get();
}
Expand All @@ -133,6 +136,12 @@ long leakedObjectsCount()
return leakedObjectsCounter.get();
}

@VisibleForTesting
public long objectsCreatedCount()
{
return createdObjectsCounter.get();
}

private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
{
long currentPoolSize;
Expand Down Expand Up @@ -160,7 +169,12 @@ private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cle
* This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in
* it's body in OpenJDK 8.
*/
private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
private void impossibleOffsetFailed(
T object,
ObjectId objectId,
Cleaners.Cleanable cleanable,
ObjectLeakNotifier notifier
)
{
poolSize.decrementAndGet();
notifier.disable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.collections;

import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -66,7 +67,7 @@ public void testTakeFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
emptyPool.take(0);
emptyPool.takeBatch(1, 0);
}

@Test
Expand All @@ -80,7 +81,7 @@ public void testDrainFromEmptyPool()
@Test(timeout = 60_000L)
public void testTake()
{
final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
final ReferenceCountingResourceHolder<Integer> holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null);
Assert.assertNotNull(holder);
Assert.assertEquals(9, pool.getPoolSize());
holder.close();
Expand All @@ -91,7 +92,7 @@ public void testTake()
public void testTakeTimeout()
{
final List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 100L);
final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
final ReferenceCountingResourceHolder<Integer> holder = Iterables.getOnlyElement(pool.takeBatch(1, 100), null);
Assert.assertNull(holder);
batchHolder.forEach(ReferenceCountingResourceHolder::close);
}
Expand Down Expand Up @@ -147,7 +148,7 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException
() -> {
List<ReferenceCountingResourceHolder<Integer>> result = new ArrayList<>();
for (int i = 0; i < limit1; i++) {
result.add(pool.take(10));
result.add(Iterables.getOnlyElement(pool.takeBatch(1, 10), null));
}
return result;
}
Expand All @@ -156,7 +157,7 @@ public void testConcurrentTake() throws ExecutionException, InterruptedException
() -> {
List<ReferenceCountingResourceHolder<Integer>> result = new ArrayList<>();
for (int i = 0; i < limit2; i++) {
result.add(pool.take(10));
result.add(Iterables.getOnlyElement(pool.takeBatch(1, 10), null));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,47 +131,53 @@ public static Sequence<ResultRow> process(

final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();

final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded(
query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, null)
);

final DateTime fudgeTimestamp = fudgeTimestampString == null
? null
: DateTimes.utc(Long.parseLong(fudgeTimestampString));
try {
final String fudgeTimestampString = NullHandling.emptyToNullIfNeeded(
query.getContextValue(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, null)
);

final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());
final DateTime fudgeTimestamp = fudgeTimestampString == null
? null
: DateTimes.utc(Long.parseLong(fudgeTimestampString));

final boolean doVectorize = queryConfig.getVectorize().shouldVectorize(
VectorGroupByEngine.canVectorize(query, storageAdapter, filter)
);
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter()));
final Interval interval = Iterables.getOnlyElement(query.getIntervals());

final Sequence<ResultRow> result;

if (doVectorize) {
result = VectorGroupByEngine.process(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
filter,
interval,
querySpecificConfig,
queryConfig
final boolean doVectorize = queryConfig.getVectorize().shouldVectorize(
VectorGroupByEngine.canVectorize(query, storageAdapter, filter)
);
} else {
result = processNonVectorized(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
querySpecificConfig,
filter,
interval
);
}

return result.withBaggage(bufferHolder);
final Sequence<ResultRow> result;

if (doVectorize) {
result = VectorGroupByEngine.process(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
filter,
interval,
querySpecificConfig,
queryConfig
);
} else {
result = processNonVectorized(
query,
storageAdapter,
bufferHolder.get(),
fudgeTimestamp,
querySpecificConfig,
filter,
interval
);
}

return result.withBaggage(bufferHolder);
}
catch (Throwable e) {
bufferHolder.close();
throw e;
}
}

private static Sequence<ResultRow> processNonVectorized(
Expand Down Expand Up @@ -965,13 +971,13 @@ public Grouper.BufferComparator bufferComparatorWithAggregators(
DefaultLimitSpec limitSpec = (DefaultLimitSpec) query.getLimitSpec();

return GrouperBufferComparatorUtils.bufferComparatorWithAggregators(
query.getAggregatorSpecs().toArray(new AggregatorFactory[0]),
aggregatorOffsets,
limitSpec,
query.getDimensions(),
getDimensionComparators(limitSpec),
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst()
query.getAggregatorSpecs().toArray(new AggregatorFactory[0]),
aggregatorOffsets,
limitSpec,
query.getDimensions(),
getDimensionComparators(limitSpec),
query.getResultRowHasTimestamp(),
query.getContextSortByDimsFirst()
);
}

Expand Down
Loading