Skip to content

Commit

Permalink
Merge pull request #2301 from metamx/fix-2299_2
Browse files Browse the repository at this point in the history
fix reference counting for segments
  • Loading branch information
fjy committed Jan 20, 2016
2 parents 695f107 + 61aca6f commit 0c5f4b9
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 48 deletions.
14 changes: 14 additions & 0 deletions processing/src/main/java/io/druid/query/QueryRunnerHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
Expand All @@ -33,7 +34,9 @@
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*/
Expand Down Expand Up @@ -81,4 +84,15 @@ public Result<T> apply(Cursor input)
Predicates.<Result<T>>notNull()
);
}

public static <T> QueryRunner<T> makeClosingQueryRunner(final QueryRunner<T> runner, final Closeable closeable){
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,41 @@
import java.util.Map;

/**
*/
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter;
private final SegmentDescriptor descriptor;

public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
ReferenceCountingSegment adapter
ReferenceCountingSegment adapter,
SegmentDescriptor descriptor
)
{
this.factory = factory;
this.adapter = adapter;
this.descriptor = descriptor;
}

@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
if (closeable != null) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);

return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
}
} else {
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(query, responseContext);
}
}
}
32 changes: 23 additions & 9 deletions server/src/main/java/io/druid/segment/realtime/FireHydrant.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package io.druid.segment.realtime;

import com.google.common.base.Throwables;
import com.metamx.common.Pair;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;

import java.io.Closeable;
import java.io.IOException;

/**
Expand All @@ -34,6 +36,7 @@ public class FireHydrant
private final int count;
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
private Object swapLock = new Object();

public FireHydrant(
IncrementalIndex index,
Expand Down Expand Up @@ -61,7 +64,7 @@ public IncrementalIndex getIndex()
return index;
}

public ReferenceCountingSegment getSegment()
public Segment getSegment()
{
return adapter;
}
Expand All @@ -78,16 +81,27 @@ public boolean hasSwapped()

public void swapSegment(Segment adapter)
{
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
synchronized (swapLock) {
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}
}

public Pair<Segment, Closeable> getAndIncrementSegment()
{
// Prevent swapping of index before increment is called
synchronized (swapLock) {
Closeable closeable = adapter.increment();
return new Pair<Segment, Closeable>(adapter, closeable);
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
Expand All @@ -55,6 +56,7 @@
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
Expand Down Expand Up @@ -327,47 +329,38 @@ public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
@Override
public QueryRunner<T> apply(FireHydrant input)
{
// It is possible that we got a query for a segment, and while that query
// is in the jetty queue, the segment is abandoned. Here, we need to retry
// the query for the segment.
if (input == null || input.getSegment() == null) {
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
}

if (skipIncrementalSegment && !input.hasSwapped()) {
return new NoopQueryRunner<T>();
}

// Prevent the underlying segment from closing when its being iterated
final ReferenceCountingSegment segment = input.getSegment();
final Closeable closeable = segment.increment();
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = input.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);

if (input.hasSwapped() // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantIdentifier(input, segment),
makeHydrantIdentifier(input, segment.lhs),
descriptor,
objectMapper,
cache,
toolchest,
factory.createRunner(segment),
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return factory.createRunner(input.getSegment());
return baseRunner;
}
}
finally {
try {
if (closeable != null) {
closeable.close();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
Expand All @@ -385,7 +378,7 @@ public QueryRunner<T> apply(FireHydrant input)
);
}

protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment)
protected static String makeHydrantIdentifier(FireHydrant input, Segment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
}
Expand All @@ -406,12 +399,12 @@ public void persist(final Committer committer)
final Stopwatch persistStopwatch = Stopwatch.createStarted();

final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);

persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
return toolChest.makeMetricBuilder(input);
}
},
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
),
Expand Down

0 comments on commit 0c5f4b9

Please sign in to comment.