Skip to content

Commit

Permalink
Do not expose hard-deleted docs in Lucene history (#32333)
Browse files Browse the repository at this point in the history
Today when reading operation history in Lucene, we read all documents.
However, if indexing a document is aborted, IndexWriter will hard-delete
it; we, therefore, need to exclude that document from Lucene history.

This commit makes sure that we exclude aborted documents by using the
hard liveDocs of a SegmentReader if there are deletes.
  • Loading branch information
dnhatn committed Aug 2, 2018
1 parent 5f9492d commit 133f56c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 33 deletions.
33 changes: 26 additions & 7 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,8 @@ public int length() {
}

/**
* Wraps a directory reader to include all live docs.
* Wraps a directory reader to make all documents live except those were rolled back
* or hard-deleted due to non-aborting exceptions during indexing.
* The wrapped reader can be used to query all documents.
*
* @param in the input directory reader
Expand All @@ -848,17 +849,21 @@ public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOExcep
}

private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
static final class SubReaderWithAllLiveDocs extends FilterLeafReader {
SubReaderWithAllLiveDocs(LeafReader in) {
static final class LeafReaderWithLiveDocs extends FilterLeafReader {
final Bits liveDocs;
final int numDocs;
LeafReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) {
super(in);
this.liveDocs = liveDocs;
this.numDocs = numDocs;
}
@Override
public Bits getLiveDocs() {
return null;
return liveDocs;
}
@Override
public int numDocs() {
return maxDoc();
return numDocs;
}
@Override
public CacheHelper getCoreCacheHelper() {
Expand All @@ -869,14 +874,28 @@ public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
super(in, new FilterDirectoryReader.SubReaderWrapper() {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader leaf) {
return new SubReaderWithAllLiveDocs(leaf);
SegmentReader segmentReader = segmentReader(leaf);
Bits hardLiveDocs = segmentReader.getHardLiveDocs();
if (hardLiveDocs == null) {
return new LeafReaderWithLiveDocs(leaf, null, leaf.maxDoc());
}
// TODO: Avoid recalculate numDocs everytime.
int numDocs = 0;
for (int i = 0; i < hardLiveDocs.length(); i++) {
if (hardLiveDocs.get(i)) {
numDocs++;
}
}
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
}
});
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapAllDocsLive(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,23 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -53,6 +58,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;

public class LuceneTests extends ESTestCase {
public void testWaitForIndex() throws Exception {
final MockDirectoryWrapper dir = newMockDirectory();
Expand Down Expand Up @@ -406,4 +413,88 @@ public void testMMapHackSupported() throws Exception {
// add assume's here if needed for certain platforms, but we should know if it does not work.
assertTrue("MMapDirectory does not support unmapping: " + MMapDirectory.UNMAP_NOT_SUPPORTED_REASON, MMapDirectory.UNMAP_SUPPORTED);
}

public void testWrapAllDocsLive() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
IndexWriter writer = new IndexWriter(dir, config);
int numDocs = between(1, 10);
Set<String> liveDocs = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
Document doc = new Document();
doc.add(new StringField("id", id, Store.YES));
writer.addDocument(doc);
liveDocs.add(id);
}
for (int i = 0; i < numDocs; i++) {
if (randomBoolean()) {
String id = Integer.toString(i);
Document doc = new Document();
doc.add(new StringField("id", "v2-" + id, Store.YES));
if (randomBoolean()) {
doc.add(Lucene.newSoftDeleteField());
}
writer.softUpdateDocument(new Term("id", id), doc, Lucene.newSoftDeleteField());
liveDocs.add("v2-" + id);
}
}
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
IndexSearcher searcher = new IndexSearcher(reader);
Set<String> actualDocs = new HashSet<>();
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
}
assertThat(actualDocs, equalTo(liveDocs));
}
IOUtils.close(writer, dir);
}

public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
IndexWriter writer = new IndexWriter(dir, config);
int numDocs = between(1, 10);
List<String> liveDocs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
Document doc = new Document();
doc.add(new StringField("id", id, Store.YES));
if (randomBoolean()) {
doc.add(Lucene.newSoftDeleteField());
}
writer.addDocument(doc);
liveDocs.add(id);
}
int abortedDocs = between(1, 10);
for (int i = 0; i < abortedDocs; i++) {
try {
Document doc = new Document();
doc.add(new StringField("id", "aborted-" + i, Store.YES));
StringReader reader = new StringReader("");
doc.add(new TextField("other", reader));
reader.close(); // mark the indexing hit non-aborting error
writer.addDocument(doc);
fail("index should have failed");
} catch (Exception ignored) { }
}
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
assertThat(reader.maxDoc(), equalTo(numDocs + abortedDocs));
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
IndexSearcher searcher = new IndexSearcher(reader);
List<String> actualDocs = new ArrayList<>();
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
}
assertThat(actualDocs, equalTo(liveDocs));
}
IOUtils.close(writer, dir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -3087,30 +3088,4 @@ public void testSupplyTombstoneDoc() throws Exception {

closeShards(shard);
}

public void testSearcherIncludesSoftDeletes() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(shard);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}");
deleteDoc(shard, "test", "0");
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L));
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L));
assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L));
}
closeShards(shard);
}
}

0 comments on commit 133f56c

Please sign in to comment.