Skip to content

Commit

Permalink
Integrates soft-deletes into Elasticsearch (elastic#33222)
Browse files Browse the repository at this point in the history
This PR integrates Lucene soft-deletes(LUCENE-8200) into Elasticsearch.
Highlight works in this PR include:

- Replace hard-deletes by soft-deletes in InternalEngine
- Use _recovery_source if _source is disabled or modified (elastic#31106)
- Soft-deletes retention policy based on the global checkpoint (elastic#30335)
- Read operation history from Lucene instead of translog (elastic#30120)
- Use Lucene history in peer-recovery (elastic#30522)

Relates elastic#30086
Closes elastic#29530

---
These works have been done by the whole team; however, these individuals
(lexical order) have significant contribution in coding and reviewing:

Co-authored-by: Adrien Grand <jpountz@gmail.com>
Co-authored-by: Boaz Leskes <b.leskes@gmail.com>
Co-authored-by: Jason Tedor <jason@tedor.me>
Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
Co-authored-by: Nhat Nguyen <nhat.nguyen@elastic.co>
Co-authored-by: Simon Willnauer <simonw@apache.org>
  • Loading branch information
6 people committed Aug 31, 2018
1 parent 547de71 commit ad4dd08
Show file tree
Hide file tree
Showing 63 changed files with 3,432 additions and 499 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -87,6 +88,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -1109,7 +1111,11 @@ private void duelRun(PercolateQuery.QueryStore queryStore, MemoryIndex memoryInd
}

private void addQuery(Query query, List<ParseContext.Document> docs) {
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document queryDocument = parseContext.doc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -58,6 +59,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperParser;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand Down Expand Up @@ -182,7 +184,11 @@ public void testExtractTerms() throws Exception {

DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -204,7 +210,7 @@ public void testExtractTerms() throws Exception {
bq.add(termQuery1, Occur.MUST);
bq.add(termQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY, mapperService.documentMapperParser(),
parseContext = new ParseContext.InternalParseContext(settings, mapperService.documentMapperParser(),
documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand Down Expand Up @@ -232,8 +238,12 @@ public void testExtractRanges() throws Exception {
bq.add(rangeQuery2, Occur.MUST);

DocumentMapper documentMapper = mapperService.documentMapper("doc");
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -259,7 +269,7 @@ public void testExtractRanges() throws Exception {
.rangeQuery(15, 20, true, true, null, null, null, null);
bq.add(rangeQuery2, Occur.MUST);

parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(bq.build(), parseContext);
document = parseContext.doc();
Expand All @@ -283,7 +293,11 @@ public void testExtractTermsAndRanges_failed() throws Exception {
TermRangeQuery query = new TermRangeQuery("field1", new BytesRef("a"), new BytesRef("z"), true, true);
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(query, parseContext);
ParseContext.Document document = parseContext.doc();
Expand All @@ -298,7 +312,11 @@ public void testExtractTermsAndRanges_partial() throws Exception {
PhraseQuery phraseQuery = new PhraseQuery("field", "term");
DocumentMapper documentMapper = mapperService.documentMapper("doc");
PercolatorFieldMapper fieldMapper = (PercolatorFieldMapper) documentMapper.mappers().getMapper(fieldName);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(Settings.EMPTY,
IndexMetaData build = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(1).numberOfReplicas(0).build();
IndexSettings settings = new IndexSettings(build, Settings.EMPTY);
ParseContext.InternalParseContext parseContext = new ParseContext.InternalParseContext(settings,
mapperService.documentMapperParser(), documentMapper, null, null);
fieldMapper.processQuery(phraseQuery, parseContext);
ParseContext.Document document = parseContext.doc();
Expand Down
86 changes: 85 additions & 1 deletion server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
Expand Down Expand Up @@ -96,6 +98,8 @@ public class Lucene {
assert annotation == null : "DocValuesFormat " + LATEST_DOC_VALUES_FORMAT + " is deprecated" ;
}

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

public static final NamedAnalyzer STANDARD_ANALYZER = new NamedAnalyzer("_standard", AnalyzerScope.GLOBAL, new StandardAnalyzer());
public static final NamedAnalyzer KEYWORD_ANALYZER = new NamedAnalyzer("_keyword", AnalyzerScope.GLOBAL, new KeywordAnalyzer());

Expand Down Expand Up @@ -140,7 +144,7 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
public static int getNumDocs(SegmentInfos info) {
int numDocs = 0;
for (SegmentCommitInfo si : info) {
numDocs += si.info.maxDoc() - si.getDelCount();
numDocs += si.info.maxDoc() - si.getDelCount() - si.getSoftDelCount();
}
return numDocs;
}
Expand Down Expand Up @@ -197,6 +201,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
}
final CommitPoint cp = new CommitPoint(si, directory);
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setIndexCommit(cp)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
Expand All @@ -220,6 +225,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException {
}
}
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(NoMergePolicy.INSTANCE) // no merges
.setCommitOnClose(false) // no commits
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append...
Expand Down Expand Up @@ -829,4 +835,82 @@ public int length() {
}
};
}

/**
* Wraps a directory reader to make all documents live except those were rolled back
* or hard-deleted due to non-aborting exceptions during indexing.
* The wrapped reader can be used to query all documents.
*
* @param in the input directory reader
* @return the wrapped reader
*/
public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException {
return new DirectoryReaderWithAllLiveDocs(in);
}

private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader {
static final class LeafReaderWithLiveDocs extends FilterLeafReader {
final Bits liveDocs;
final int numDocs;
LeafReaderWithLiveDocs(LeafReader in, Bits liveDocs, int numDocs) {
super(in);
this.liveDocs = liveDocs;
this.numDocs = numDocs;
}
@Override
public Bits getLiveDocs() {
return liveDocs;
}
@Override
public int numDocs() {
return numDocs;
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader leaf) {
SegmentReader segmentReader = segmentReader(leaf);
Bits hardLiveDocs = segmentReader.getHardLiveDocs();
if (hardLiveDocs == null) {
return new LeafReaderWithLiveDocs(leaf, null, leaf.maxDoc());
}
// TODO: Can we avoid calculate numDocs by using SegmentReader#getSegmentInfo with LUCENE-8458?
int numDocs = 0;
for (int i = 0; i < hardLiveDocs.length(); i++) {
if (hardLiveDocs.get(i)) {
numDocs++;
}
}
return new LeafReaderWithLiveDocs(segmentReader, hardLiveDocs, numDocs);
}
});
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return wrapAllDocsLive(in);
}

@Override
public CacheHelper getReaderCacheHelper() {
return null; // Modifying liveDocs
}
}

/**
* Returns a numeric docvalues which can be used to soft-delete documents.
*/
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -66,15 +67,22 @@ final class PerThreadIDVersionAndSeqNoLookup {
*/
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
this.uidField = uidField;
Terms terms = reader.terms(uidField);
final Terms terms = reader.terms(uidField);
if (terms == null) {
throw new IllegalArgumentException("reader misses the [" + uidField + "] field");
// If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields.
final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD);
final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
if (softDeletesDV == null || tombstoneDV == null) {
throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " +
"_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]");
}
termsEnum = null;
} else {
termsEnum = terms.iterator();
}
termsEnum = terms.iterator();
if (reader.getNumericDocValues(VersionFieldMapper.NAME) == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field; _uid terms [" + terms + "]");
}

Object readerKey = null;
assert (readerKey = reader.getCoreCacheHelper().getKey()) != null;
this.readerKey = readerKey;
Expand Down Expand Up @@ -111,7 +119,8 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
if (termsEnum.seekExact(id)) {
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.MAX_REGEX_LENGTH_SETTING,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
Expand Down
Loading

0 comments on commit ad4dd08

Please sign in to comment.