diff --git a/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java b/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java index 06f4b728c8c1f..4156fd206722d 100644 --- a/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java +++ b/plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.TextFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.termvectors.TermVectorsService; import org.elasticsearch.indices.IndicesService; @@ -130,7 +131,7 @@ public void testAnnotationInjection() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -185,7 +186,7 @@ public void testToleranceForBadAnnotationMarkup() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -384,7 +385,7 @@ public void testDefaultPositionIncrementGap() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -426,7 +427,7 @@ public void testPositionIncrementGap() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index d2929a2dbc564..1eb60263843ea 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; @@ -77,6 +78,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); private static final ParseField PIPELINE = new ParseField("pipeline"); private static final ParseField SOURCE = new ParseField("_source"); + private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match"); + private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match"); /** * Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and @@ -347,6 +350,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null String opType = null; long version = Versions.MATCH_ANY; VersionType versionType = VersionType.INTERNAL; + long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; + long ifPrimaryTermMatch = 0; int retryOnConflict = 0; String pipeline = valueOrDefault(defaultPipeline, globalPipeline); @@ -377,6 +382,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null version = parser.longValue(); } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { versionType = VersionType.fromString(parser.text()); + } else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) { + ifSeqNoMatch = parser.longValue(); + } else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) { + ifPrimaryTermMatch = parser.longValue(); } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) { retryOnConflict = parser.intValue(); } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { @@ -404,7 +413,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } if ("delete".equals(action)) { - add(new DeleteRequest(index, type, id).routing(routing).version(version).versionType(versionType), payload); + add(new DeleteRequest(index, type, id).routing(routing) + .version(version).versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload); } else { nextMarker = findNextMarker(marker, from, data, length); if (nextMarker == -1) { @@ -417,16 +427,16 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null if ("index".equals(action)) { if (opType == null) { internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .setPipeline(pipeline) + .setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload); } else { internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .create("create".equals(opType)).setPipeline(pipeline) + .create("create".equals(opType)).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } } else if ("create".equals(action)) { internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .create(true).setPipeline(pipeline) + .create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } else if ("update".equals(action)) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index c65f516c39594..4f782f2b13599 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.action.bulk; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteRequest; @@ -460,7 +460,7 @@ private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext con executeOnPrimaryWhileHandlingMappingUpdates(context, () -> primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, - request.getAutoGeneratedTimestamp(), request.isRetry()), + request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()), e -> primary.getFailedIndexResult(e, request.version()), context::markOperationAsExecuted, mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); @@ -471,7 +471,8 @@ private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext co final DeleteRequest request = context.getRequestToExecute(); final IndexShard primary = context.getPrimary(); executeOnPrimaryWhileHandlingMappingUpdates(context, - () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()), + () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), + request.ifSeqNoMatch(), request.ifPrimaryTermMatch()), e -> primary.getFailedDeleteResult(e, request.version()), context::markOperationAsExecuted, mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index ae1271537f755..c9f2df5633351 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -57,6 +58,8 @@ public class DeleteRequest extends ReplicatedWriteRequest private String routing; private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; + private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTermMatch = 0; public DeleteRequest() { } @@ -112,6 +115,12 @@ public ActionRequestValidationException validate() { if (versionType == VersionType.FORCE) { validationException = addValidationError("version type [force] may no longer be used", validationException); } + + if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY + )) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); + } return validationException; } @@ -194,6 +203,32 @@ public DeleteRequest versionType(VersionType versionType) { return this; } + public long ifSeqNoMatch() { + return ifSeqNoMatch; + } + + public long ifPrimaryTermMatch() { + return ifPrimaryTermMatch; + } + + public DeleteRequest setIfMatch(long seqNo, long term) { + if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("seqNo is set, but primary term is [0]"); + } + if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]"); + } + if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifSeqNoMatch = seqNo; + ifPrimaryTermMatch = term; + return this; + } + @Override public VersionType versionType() { return this.versionType; @@ -215,6 +250,13 @@ public void readFrom(StreamInput in) throws IOException { } version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNoMatch = in.readZLong(); + ifPrimaryTermMatch = in.readVLong(); + } else { + ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; + ifPrimaryTermMatch = 0; + } } @Override @@ -228,6 +270,15 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNoMatch); + out.writeVLong(ifPrimaryTermMatch); + } else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) { + assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]"; + throw new IllegalStateException( + "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " + + "Stream version [" + out.getVersion() + "]"); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java index 9060af8e17c8c..f0df2d3558bca 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java @@ -80,4 +80,14 @@ public DeleteRequestBuilder setVersionType(VersionType versionType) { request.versionType(versionType); return this; } + + /** + * only performs this delete request if the document was last modification was assigned the given + * sequence number and primary term + */ + public DeleteRequestBuilder setIfMatch(long seqNo, long term) { + request.setIfMatch(seqNo, term); + return this; + } + } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 710ae331b99f4..fd80139d5ce85 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -104,6 +105,8 @@ public class IndexRequest extends ReplicatedWriteRequest implement private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP; private boolean isRetry = false; + private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTermMatch = 0; public IndexRequest() { @@ -164,6 +167,12 @@ public ActionRequestValidationException validate() { validationException); return validationException; } + + if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) { + validationException = addValidationError("create operations do not support compare and set. use index instead", + validationException); + return validationException; + } } if (opType() != OpType.INDEX && id == null) { @@ -192,6 +201,12 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } + if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY + )) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); + } + return validationException; } @@ -471,6 +486,33 @@ public IndexRequest versionType(VersionType versionType) { return this; } + public IndexRequest ifMatch(long seqNo, long term) { + if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("seqNo is set, but primary term is [0]"); + } + + if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]"); + } + if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifSeqNoMatch = seqNo; + ifPrimaryTermMatch = term; + return this; + } + + public long ifSeqNoMatch() { + return ifSeqNoMatch; + } + + public long ifPrimaryTermMatch() { + return ifPrimaryTermMatch; + } + @Override public VersionType versionType() { return this.versionType; @@ -492,6 +534,8 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi // generate id if not already provided if (id == null) { assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!"; + assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO; + assert ifPrimaryTermMatch == 0; autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia String uid; if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) { @@ -533,6 +577,13 @@ public void readFrom(StreamInput in) throws IOException { } else { contentType = null; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNoMatch = in.readZLong(); + ifPrimaryTermMatch = in.readVLong(); + } else { + ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO; + ifPrimaryTermMatch = 0; + } } @Override @@ -564,6 +615,15 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNoMatch); + out.writeVLong(ifPrimaryTermMatch); + } else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) { + assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]"; + throw new IllegalStateException( + "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " + + "Stream version [" + out.getVersion() + "]"); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java index b81d09abda3ab..8ca32d40e8c8c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java @@ -199,6 +199,15 @@ public IndexRequestBuilder setVersionType(VersionType versionType) { return this; } + /** + * only performs this indexing request if the document was last modification was assigned the given + * sequence number and primary term + */ + public IndexRequestBuilder setIfMatch(long seqNo, long term) { + request.ifMatch(seqNo, term); + return this; + } + /** * Sets the ingest pipeline to be executed before indexing the document */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 29490a7997a7e..217c2b70f8d77 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -510,7 +510,7 @@ public void updateShardState(final ShardRouting newRouting, * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { // If the old primary was on an old version that did not replicate the msu, // we need to bootstrap it manually from its local history. assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); @@ -686,30 +686,33 @@ private IndexShardState changeState(IndexShardState newState, String reason) { } public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, - long autoGeneratedTimestamp, boolean isRetry) throws IOException { + long ifSeqNoMatch, long ifPrimaryTermMatch, long autoGeneratedTimestamp, + boolean isRetry) + throws IOException { assert versionType.validateVersionForWrites(version); - return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp, - isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); + return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNoMatch, + ifPrimaryTermMatch, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, - Engine.Operation.Origin.REPLICA, sourceToParse); + return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0, + autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, - @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry, - Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { + @Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch, + long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, + SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; ensureWriteAllowed(origin); Engine.Index operation; try { operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo, - opPrimaryTerm, version, versionType, origin, - autoGeneratedTimeStamp, isRetry); + opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, + ifSeqNoMatch, ifPrimaryTermMatch); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { return new Engine.IndexResult(update); @@ -727,8 +730,9 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o } public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo, - long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin, long autoGeneratedIdTimestamp, - boolean isRetry) { + long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin, + long autoGeneratedIdTimestamp, boolean isRetry, + long ifSeqNoMatch, long ifPrimaryTermMatch) { long startTime = System.nanoTime(); ParsedDocument doc = docMapper.getDocumentMapper().parse(source); if (docMapper.getMapping() != null) { @@ -736,7 +740,7 @@ public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version } Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id())); return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, - UNASSIGNED_SEQ_NO, 0); + ifSeqNoMatch, ifPrimaryTermMatch); } private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException { @@ -787,19 +791,22 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) { return new Engine.DeleteResult(e, version, operationPrimaryTerm); } - public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) + public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType, + long ifSeqNoMatch, long ifPrimaryTermMatch) throws IOException { assert versionType.validateVersionForWrites(version); return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType, - Engine.Operation.Origin.PRIMARY); + ifSeqNoMatch, ifPrimaryTermMatch, Engine.Operation.Origin.PRIMARY); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { - return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); + return applyDeleteOperation( + getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id, - @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { + @Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch, + Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm + "]"; ensureWriteAllowed(origin); @@ -828,15 +835,16 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long } final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version, - versionType, origin); + versionType, origin, ifSeqNoMatch, ifPrimaryTermMatch); return delete(engine, delete); } private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, - VersionType versionType, Engine.Operation.Origin origin) { + VersionType versionType, Engine.Operation.Origin origin, + long ifSeqNoMatch, long ifPrimaryTermMatch) { long startTime = System.nanoTime(); return new Engine.Delete(resolveType(type), id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, - UNASSIGNED_SEQ_NO, 0); + ifSeqNoMatch, ifPrimaryTermMatch); } private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { @@ -1283,14 +1291,14 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(), - versionType, index.getAutoGeneratedIdTimestamp(), true, origin, + versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), - versionType, origin); + versionType, UNASSIGNED_SEQ_NO, 0, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; @@ -1997,7 +2005,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { // If the old primary was on an old version that did not replicate the msu, // we need to bootstrap it manually from its local history. assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); @@ -2916,8 +2924,8 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { - assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO - || getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : + assert seqNo != UNASSIGNED_SEQ_NO + || getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO : "replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not"; getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index ca19dcc250948..37e82884c5133 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -268,7 +268,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(mappingUpdate); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(mappingUpdate); // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -285,9 +286,10 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); // Verify that the shard "executed" the operation twice - verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(success); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); @@ -295,7 +297,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { // Verify that the shard "executed" the operation only once (2 for previous invocations plus // 1 for this execution) - verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); @@ -488,7 +490,8 @@ public void testUpdateRequestWithFailure() throws Exception { Exception err = new ElasticsearchException("I'm dead <(x.x)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); @@ -536,7 +539,8 @@ public void testUpdateRequestWithConflictFailure() throws Exception { "I'm conflicted <(;_;)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); @@ -581,7 +585,8 @@ public void testUpdateRequestWithSuccess() throws Exception { Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult indexResult = new FakeIndexResult(1, 1, 13, created, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); @@ -626,7 +631,7 @@ public void testUpdateWithDelete() throws Exception { final long resultSeqNo = 13; Engine.DeleteResult deleteResult = new FakeDeleteResult(1, 1, resultSeqNo, found, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any())).thenReturn(deleteResult); + when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any(), anyLong(), anyLong())).thenReturn(deleteResult); when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); @@ -769,7 +774,7 @@ public void testRetries() throws Exception { Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenAnswer(ir -> { + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(ir -> { if (randomBoolean()) { return conflictedResult; } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index f4856d51a2d4e..56caa94466c56 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -57,6 +57,7 @@ import org.elasticsearch.index.query.MatchPhraseQueryBuilder; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.search.MatchQuery; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -251,7 +252,7 @@ public void testDefaultPositionIncrementGap() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -293,7 +294,7 @@ public void testPositionIncrementGap() throws IOException { IndexShard shard = indexService.getShard(0); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index f4c29800cdbd3..747d951d5a8cc 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; @@ -210,7 +211,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source("index", "type", "primary", new BytesArray("{}"), XContentType.JSON), - randomNonNegativeLong(), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, randomNonNegativeLong(), false); } final IndexShard recoveredReplica = diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index dd14f5f854491..6035a81a1b99f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -356,7 +357,7 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); @@ -406,7 +407,7 @@ public void testMaybeRollTranslogGeneration() throws Exception { assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); final Translog.Location location = result.getTranslogLocation(); shard.afterWriteOperation(); if (location.translogLocation + location.size > generationThreshold) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7b73901d2cfbf..7679595a7fa3b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -172,6 +172,7 @@ import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -323,10 +324,10 @@ public void testClosesPreventsNewOperations() throws Exception { expectThrows(IndexShardClosedException.class, () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L))); expectThrows(IndexShardClosedException.class, - () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); expectThrows(IndexShardClosedException.class, - () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L))); } @@ -334,7 +335,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, "")); + UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, "")); closeShards(indexShard); } @@ -828,7 +829,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint); } final long expectedLocalCheckpoint; - if (newGlobalCheckPoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (newGlobalCheckPoint == UNASSIGNED_SEQ_NO) { expectedLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; } else { expectedLocalCheckpoint = newGlobalCheckPoint; @@ -1039,10 +1040,10 @@ public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); final Set docsBeforeRollback = getShardDocUIDs(indexShard); @@ -1104,9 +1105,9 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt final int operations = 1024 - scaledRandomIntBetween(0, 1024); indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); - final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); - final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); + final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); Set docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream() .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(1); @@ -1132,7 +1133,7 @@ public void onFailure(final Exception e) { }, ""); latch.await(); - if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (globalCheckpointOnReplica == UNASSIGNED_SEQ_NO && globalCheckpoint == UNASSIGNED_SEQ_NO) { assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); } else { assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); @@ -3711,10 +3712,11 @@ public void testTypelessDelete() throws IOException { Engine.IndexResult indexResult = indexDoc(shard, "some_type", "id", "{}"); assertTrue(indexResult.isCreated()); - DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL); + DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL, + UNASSIGNED_SEQ_NO, 0); assertFalse(deleteResult.isFound()); - deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL); + deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 0); assertTrue(deleteResult.isFound()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index b65d682fa57ba..282404de9a45e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -81,6 +81,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true); } @@ -150,7 +151,7 @@ public void testSyncerOnClosingShard() throws Exception { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } String allocationId = shard.routingEntry().allocationId().getId(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2a53c79448d15..694032bd9887a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -314,7 +314,7 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { Engine.IndexResult result = primaryShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(primaryShard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); if (randomBoolean()) { globalCheckpoint = randomLongBetween(globalCheckpoint, i); diff --git a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java index c0ee228a456a7..9de70f4339fa1 100644 --- a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -281,6 +281,74 @@ public void testInternalVersioning() throws Exception { assertThat(deleteResponse.getVersion(), equalTo(4L)); } + public void testCompareAndSet() { + createIndex("test"); + ensureGreen(); + + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); + assertThat(indexResponse.getSeqNo(), equalTo(0L)); + assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); + + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); + assertThat(indexResponse.getSeqNo(), equalTo(1L)); + assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); + + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 1).execute(), + VersionConflictEngineException.class); + + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 2).execute(), + VersionConflictEngineException.class); + + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(1, 2).execute(), + VersionConflictEngineException.class); + + + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 2).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class); + + client().admin().indices().prepareRefresh().execute().actionGet(); + // TODO: Enable once get response returns seqNo +// for (int i = 0; i < 10; i++) { +// final GetResponse response = client().prepareGet("test", "type", "1").get(); +// assertThat(response.getSeqNo(), equalTo(1L)); +// assertThat(response.getPrimaryTerm(), equalTo(1L)); +// } + + // search with versioning + for (int i = 0; i < 10; i++) { + // TODO: ADD SEQ NO! + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet(); + assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L)); + } + + // search without versioning + for (int i = 0; i < 10; i++) { + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); + assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND)); + } + + DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute().actionGet(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + assertThat(deleteResponse.getSeqNo(), equalTo(2L)); + assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L)); + + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(3, 2).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class); + + + // This is intricate - the object was deleted but a delete transaction was with the right version. We add another one + // and thus the transaction is increased. + deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(2, 1).execute().actionGet(); + assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult()); + assertThat(deleteResponse.getSeqNo(), equalTo(3L)); + assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L)); + } + public void testSimpleVersioningWithFlush() throws Exception { createIndex("test"); ensureGreen(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index d2b9a259298fa..830fcec3726a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -125,7 +125,7 @@ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine. source(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())) .routing(index.routing()), index.seqNo(), index.primaryTerm(), - index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true); + index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); return engineIndex; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 104e7dcb75847..0a007f2a18e2c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -702,12 +702,12 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, Engine.IndexResult result; if (shard.routingEntry().primary()) { result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { updateMappings(shard, IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) .putMapping(type, result.getRequiredMappingUpdate().toString()).build()); result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint()); @@ -731,7 +731,8 @@ protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) { protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException { final Engine.DeleteResult result; if (shard.routingEntry().primary()) { - result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); + result = shard.applyDeleteOperationOnPrimary( + Versions.MATCH_ANY, type, id, VersionType.INTERNAL, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint()); } else { final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index fbfd0fda22857..1b4856487afe4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -318,12 +318,11 @@ private Engine.Result applyOperation(Engine engine, Engine.Operation op, Engine.Index index = (Engine.Index) op; result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(), versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), - SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch())); } else if (op instanceof Engine.Delete) { Engine.Delete delete = (Engine.Delete) op; result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, - delete.version(), versionType, origin, delete.startTime(), - SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + delete.version(), versionType, origin, delete.startTime(), delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch())); } else { Engine.NoOp noOp = (Engine.NoOp) op; result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason())); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 5229f78bdc9e9..f1de51c95b6e9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -294,7 +295,7 @@ public IndexShard reindex(DirectoryReader reader, MappingMetaData mapping) throw assert source != null : "_source is null but should have been filtered out at snapshot time"; Engine.Result result = targetShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, source (index, uid.type(), uid.id(), source, XContentHelper.xContentType(source)) - .routing(rootFieldsVisitor.routing()), 1, false); + .routing(rootFieldsVisitor.routing()), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 1, false); if (result.getResultType() != Engine.Result.Type.SUCCESS) { throw new IllegalStateException("failed applying post restore operation result: " + result .getResultType(), result.getFailure());