Skip to content

Commit

Permalink
move lastSeenDocId to PerThreadSourceProvider, otherwise multiple thr…
Browse files Browse the repository at this point in the history
…eads overwrite the same lastSeenDocId
  • Loading branch information
martijnvg committed Jan 13, 2025
1 parent 00fa74d commit 1932099
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.search.lookup.SourceProvider;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
Expand All @@ -28,10 +29,6 @@ final class ReinitializingSourceProvider implements SourceProvider {
private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
private int lastSeenDocId;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}
Expand All @@ -40,15 +37,25 @@ final class ReinitializingSourceProvider implements SourceProvider {
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
if (provider == null || provider.creatingThread != currentThread || doc < provider.lastSeenDocId) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
lastSeenDocId = doc;
provider.lastSeenDocId = doc;
return provider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {
private static final class PerThreadSourceProvider {
final SourceProvider source;
final Thread creatingThread;
// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
int lastSeenDocId;

private PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {
this.source = source;
this.creatingThread = creatingThread;
}

}
}

0 comments on commit 1932099

Please sign in to comment.