diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index eddb6b5758452..1bf1f2487cd29 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -68,9 +68,9 @@ import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; import org.elasticsearch.rest.action.document.RestGetAction; +import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.rest.action.document.RestMultiGetAction; import org.elasticsearch.rest.action.document.RestUpdateAction; -import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; @@ -90,8 +90,10 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; @@ -606,22 +608,46 @@ public void testUpdate() throws IOException { IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT); assertEquals(RestStatus.CREATED, indexResponse.status()); - UpdateRequest updateRequest = new UpdateRequest("index", "id"); - updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values())); - UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); - assertEquals(RestStatus.OK, updateResponse.status()); - assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); - - UpdateRequest updateRequestConflict = new UpdateRequest("index", "id"); - updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values())); - updateRequestConflict.version(indexResponse.getVersion()); + long lastUpdateSeqNo; + long lastUpdatePrimaryTerm; + { + UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values())); + final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); + assertEquals(RestStatus.OK, updateResponse.status()); + assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); + lastUpdateSeqNo = updateResponse.getSeqNo(); + lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm(); + assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L)); + assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L)); + } - ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> - execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync)); - assertEquals(RestStatus.CONFLICT, exception.status()); - assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " + - "current version [2] is different than the one provided [1]]", exception.getMessage()); + { + final UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values())); + if (randomBoolean()) { + updateRequest.setIfSeqNo(lastUpdateSeqNo + 1); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm); + } else { + updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1)); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1); + } + ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> + execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync)); + assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status()); + assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception")); + } + { + final UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values())); + updateRequest.setIfSeqNo(lastUpdateSeqNo); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm); + final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); + assertEquals(RestStatus.OK, updateResponse.status()); + assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo()); + assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm()); + } } { IndexRequest indexRequest = new IndexRequest("index").id("with_script"); diff --git a/docs/reference/docs/delete.asciidoc b/docs/reference/docs/delete.asciidoc index bc6f7b840048d..22301b98f1031 100644 --- a/docs/reference/docs/delete.asciidoc +++ b/docs/reference/docs/delete.asciidoc @@ -39,7 +39,7 @@ The result of the above delete operation is: [[optimistic-concurrency-control-delete]] === Optimistic concurrency control -Delete operations can be made optional and only be performed if the last +Delete operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number and primary term specified by the `if_seq_no` and `if_primary_term` parameters. If a mismatch is detected, the operation will result in a `VersionConflictException` diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index c7ca42bfaf4c4..257b88289d87a 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -185,7 +185,7 @@ The result of the above index operation is: [[optimistic-concurrency-control-index]] === Optimistic concurrency control -Index operations can be made optional and only be performed if the last +Index operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number and primary term specified by the `if_seq_no` and `if_primary_term` parameters. If a mismatch is detected, the operation will result in a `VersionConflictException` diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 1cfc122bee402..42840b1b0a5ec 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -349,3 +349,11 @@ version numbers being out of sync with the external system. Use the <> instead. ===================================================== + +`if_seq_no` and `if_primary_term`:: + +Update operations can be made conditional and only be performed if the last +modification to the document was assigned the sequence number and primary +term specified by the `if_seq_no` and `if_primary_term` parameters. If a +mismatch is detected, the operation will result in a `VersionConflictException` +and a status code of 409. See <> for more details. \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json index 37710cf07a10b..92f1013a317c3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json @@ -63,6 +63,14 @@ "type": "time", "description": "Explicit operation timeout" }, + "if_seq_no" : { + "type" : "number", + "description" : "only perform the update operation if the last operation that has changed the document has the specified sequence number" + }, + "if_primary_term" : { + "type" : "number", + "description" : "only perform the update operation if the last operation that has changed the document has the specified primary term" + }, "version": { "type": "number", "description": "Explicit version number for concurrency control" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml new file mode 100644 index 0000000000000..dbc569104cf4c --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -0,0 +1,64 @@ +--- +"Update with if_seq_no": + + - skip: + version: " - 6.99.99" + reason: if_seq_no was added in 7.0 + + - do: + catch: missing + update: + index: test_1 + id: 1 + if_seq_no: 1 + if_primary_term: 1 + body: + doc: { foo: baz } + + - do: + index: + index: test_1 + id: 1 + body: + foo: baz + + - do: + catch: conflict + update: + index: test_1 + id: 1 + if_seq_no: 234 + if_primary_term: 1 + body: + doc: { foo: baz } + + - do: + update: + index: test_1 + id: 1 + if_seq_no: 0 + if_primary_term: 1 + body: + doc: { foo: bar } + + - do: + get: + index: test_1 + id: 1 + + - match: { _source: { foo: bar } } + + - do: + bulk: + body: + - update: + _index: test_1 + _id: 1 + if_seq_no: 100 + if_primary_term: 200 + - doc: + foo: baz + + - match: { errors: true } + - match: { items.0.update.status: 409 } + diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index fdf62e951a517..d8a9a3503a617 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -24,11 +24,16 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; import java.io.IOException; import java.util.Locale; +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} @@ -117,6 +122,39 @@ public interface DocWriteRequest extends IndicesRequest { */ T versionType(VersionType versionType); + /** + * only perform this request if the document was last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + T setIfSeqNo(long seqNo); + + /** + * only performs this request if the document was last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + T setIfPrimaryTerm(long term); + + /** + * If set, only perform this request if the document was last modification was assigned this sequence number. + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + long ifSeqNo(); + + /** + * If set, only perform this request if the document was last modification was assigned this primary term. + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + long ifPrimaryTerm(); + /** * Get the requested document operation type of the request * @return the operation type {@link OpType} @@ -216,4 +254,30 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); } } + + static ActionRequestValidationException validateSeqNoBasedCASParams( + DocWriteRequest request, ActionRequestValidationException validationException) { + if (request.versionType().validateVersionForWrites(request.version()) == false) { + validationException = addValidationError("illegal version value [" + request.version() + "] for version type [" + + request.versionType().name() + "]", validationException); + } + if (request.versionType() == VersionType.FORCE) { + validationException = addValidationError("version type [force] may no longer be used", validationException); + } + + if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && ( + request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY + )) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); + } + if (request.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() != UNASSIGNED_SEQ_NO) { + validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); + } + if (request.ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() == UNASSIGNED_SEQ_NO) { + validationException = + addValidationError("ifSeqNo is unassigned, but primary term is [" + request.ifPrimaryTerm() + "]", validationException); + } + + return validationException; + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index e58b0dfbffbd3..b5c786ab2df6d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -503,6 +503,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } else if ("update".equals(action)) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) .version(version).versionType(versionType) + .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .routing(routing); // EMPTY is safe here because we never call namedObject try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 8d2967fd28ba4..a033bf3cb000f 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -110,27 +110,8 @@ public ActionRequestValidationException validate() { if (Strings.isEmpty(id)) { validationException = addValidationError("id is missing", validationException); } - if (versionType.validateVersionForWrites(version) == false) { - validationException = addValidationError("illegal version value [" + version + "] for version type [" - + versionType.name() + "]", validationException); - } - if (versionType == VersionType.FORCE) { - validationException = addValidationError("version type [force] may no longer be used", validationException); - } - - if (ifSeqNo != UNASSIGNED_SEQ_NO && ( - versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY - )) { - validationException = addValidationError("compare and write operations can not use versioning", validationException); - } - if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { - validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); - } - if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { - validationException = - addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); - } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); return validationException; } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index a9aac3025de1e..37d960831776d 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -188,14 +188,7 @@ public ActionRequestValidationException validate() { addValidationError("an id is required for a " + opType() + " operation", validationException); } - if (!versionType.validateVersionForWrites(resolvedVersion)) { - validationException = addValidationError("illegal version value [" + resolvedVersion + "] for version type [" - + versionType.name() + "]", validationException); - } - - if (versionType == VersionType.FORCE) { - validationException = addValidationError("version type [force] may no longer be used", validationException); - } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) { validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " + @@ -210,18 +203,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - if (ifSeqNo != UNASSIGNED_SEQ_NO && ( - versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY - )) { - validationException = addValidationError("compare and write operations can not use versioning", validationException); - } - if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { - validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); - } - if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { - validationException = - addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); - } return validationException; } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index a8a5fb8f72f30..8cd6146768fff 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -69,8 +69,8 @@ public UpdateHelper(ScriptService scriptService) { * Prepares an update request by converting it into an index or delete request or an update response (no action). */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(), - request.versionType()); + final GetResult getResult = indexShard.getService().getForUpdate( + request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index a7805b4cbdbad..2a1865aa80818 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -54,6 +54,8 @@ import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class UpdateRequest extends InstanceShardOperationRequest implements DocWriteRequest, WriteRequest, ToXContentObject { @@ -66,6 +68,8 @@ public class UpdateRequest extends InstanceShardOperationRequest private static final ParseField DOC_AS_UPSERT_FIELD = new ParseField("doc_as_upsert"); private static final ParseField DETECT_NOOP_FIELD = new ParseField("detect_noop"); private static final ParseField SOURCE_FIELD = new ParseField("_source"); + private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); + private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); static { PARSER = new ObjectParser<>(UpdateRequest.class.getSimpleName()); @@ -89,6 +93,8 @@ public class UpdateRequest extends InstanceShardOperationRequest PARSER.declareField(UpdateRequest::fetchSource, (parser, context) -> FetchSourceContext.fromXContent(parser), SOURCE_FIELD, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + PARSER.declareLong(UpdateRequest::setIfSeqNo, IF_SEQ_NO); + PARSER.declareLong(UpdateRequest::setIfPrimaryTerm, IF_PRIMARY_TERM); } // Set to null initially so we can know to override in bulk requests that have a default type. @@ -105,6 +111,9 @@ public class UpdateRequest extends InstanceShardOperationRequest private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private int retryOnConflict = 0; + private long ifSeqNo = UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; @@ -170,6 +179,16 @@ public ActionRequestValidationException validate() { } } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); + + if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) { + validationException = addValidationError("compare and write operations can not be retried", validationException); + } + + if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) { + validationException = addValidationError("compare and write operations can not be used with upsert", validationException); + } + if (script == null && doc == null) { validationException = addValidationError("script or doc is missing", validationException); } @@ -531,6 +550,55 @@ public VersionType versionType() { return this.versionType; } + /** + * only perform this update request if the document's modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this update request if the document's last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this update request if the document was last modification was assigned this sequence number. + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this update request if the document was last modification was assigned this primary term. + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifPrimaryTerm() { + return ifPrimaryTerm; + } + @Override public OpType opType() { return OpType.UPDATE; @@ -811,6 +879,10 @@ public void readFrom(StreamInput in) throws IOException { docAsUpsert = in.readBoolean(); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } detectNoop = in.readBoolean(); scriptedUpsert = in.readBoolean(); } @@ -862,6 +934,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(docAsUpsert); out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNo); + out.writeVLong(ifPrimaryTerm); + } out.writeBoolean(detectNoop); out.writeBoolean(scriptedUpsert); } @@ -880,6 +956,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.copyCurrentStructure(parser); } } + + if (ifSeqNo != UNASSIGNED_SEQ_NO) { + builder.field(IF_SEQ_NO.getPreferredName(), ifSeqNo); + builder.field(IF_PRIMARY_TERM.getPreferredName(), ifPrimaryTerm); + } + if (script != null) { builder.field("script", script); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 181dba6a10734..919f460e8c07b 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -150,6 +150,30 @@ public UpdateRequestBuilder setVersionType(VersionType versionType) { return this; } + /** + * only perform this update request if the document was last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequestBuilder setIfSeqNo(long seqNo) { + request.setIfSeqNo(seqNo); + return this; + } + + /** + * only perform this update request if the document was last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequestBuilder setIfPrimaryTerm(long term) { + request.setIfPrimaryTerm(term); + return this; + } + /** * Sets the number of shard copies that must be active before proceeding with the write. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e1df104d338df..e450e93e9d397 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -622,6 +623,13 @@ protected final GetResult getFromSearcher(Get get, BiFunction search throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } + if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term + )) { + throw new VersionConflictEngineException(shardId, get.type(), get.id(), + get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term); + } if (get.isReadFromTranslog()) { // this is only used for updates - API _GET calls will always read form a reader for consistency // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 6d58b981ddc53..9fb1cb804946f 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -45,7 +45,6 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; @@ -56,6 +55,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + public final class ShardGetService extends AbstractIndexShardComponent { private final MapperService mapperService; private final MeanMetric existsMetric = new MeanMetric(); @@ -77,15 +79,17 @@ public GetStats stats() { public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) { - return get(type, id, gFields, realtime, version, versionType, fetchSourceContext, false); + return + get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext, false); } private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, - FetchSourceContext fetchSourceContext, boolean readFromTranslog) { + long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) { currentMetric.inc(); try { long now = System.nanoTime(); - GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext, readFromTranslog); + GetResult getResult = + innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext, readFromTranslog); if (getResult.isExists()) { existsMetric.inc(System.nanoTime() - now); @@ -98,8 +102,8 @@ private GetResult get(String type, String id, String[] gFields, boolean realtime } } - public GetResult getForUpdate(String type, String id, long version, VersionType versionType) { - return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, version, versionType, + public GetResult getForUpdate(String type, String id, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) { + return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, version, versionType, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true); } @@ -113,7 +117,7 @@ public GetResult getForUpdate(String type, String id, long version, VersionType public GetResult get(Engine.GetResult engineGetResult, String id, String type, String[] fields, FetchSourceContext fetchSourceContext) { if (!engineGetResult.exists()) { - return new GetResult(shardId.getIndexName(), type, id, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, -1, false, null, null); + return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null); } currentMetric.inc(); @@ -151,7 +155,7 @@ private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceCont } private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, - FetchSourceContext fetchSourceContext, boolean readFromTranslog) { + long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) { fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields); if (type == null || type.equals("_all")) { DocumentMapper mapper = mapperService.documentMapper(); @@ -162,14 +166,14 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea if (type != null) { Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm) - .version(version).versionType(versionType)); + .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)); if (get.exists() == false) { get.close(); } } if (get == null || get.exists() == false) { - return new GetResult(shardId.getIndexName(), type, id, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, -1, false, null, null); + return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null); } try { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 033176c4a7300..463a18ea6b802 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -86,6 +86,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC updateRequest.version(RestActions.parseVersion(request)); updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); + updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); request.applyContentParser(parser -> { updateRequest.fromXContent(parser); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index edf1925fdd798..f57bff72fc57c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1378,6 +1378,38 @@ public void testVersionedUpdate() throws IOException { } + public void testGetIfSeqNoIfPrimaryTerm() throws IOException { + final BiFunction searcherFactory = engine::acquireSearcher; + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); + Engine.IndexResult indexResult = engine.index(create); + if (randomBoolean()) { + engine.refresh("test"); + } + if (randomBoolean()) { + engine.flush(); + } + try (Engine.GetResult get = engine.get( + new Engine.Get(true, true, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get()), + searcherFactory)) { + assertEquals(indexResult.getSeqNo(), get.docIdAndVersion().seqNo); + } + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get()), + searcherFactory)); + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get() + 1), + searcherFactory)); + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get() + 1), + searcherFactory)); + } + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 7db904f89dfa8..14e513ff89cfe 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -20,17 +20,21 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.RoutingFieldMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; +import static org.elasticsearch.common.lucene.uid.Versions.MATCH_ANY; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + public class ShardGetServiceTests extends IndexShardTestCase { public void testGetForUpdate() throws IOException { @@ -47,7 +51,8 @@ public void testGetForUpdate() throws IOException { recoverShardFromStore(primary); Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + GetResult testGet = primary.getService().getForUpdate( + "test", "0", test.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -56,7 +61,8 @@ public void testGetForUpdate() throws IOException { Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + GetResult testGet1 = primary.getService().getForUpdate( + "test", "1", test1.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); @@ -69,13 +75,22 @@ public void testGetForUpdate() throws IOException { } // now again from the reader - test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + Engine.IndexResult test2 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + testGet1 = primary.getService().getForUpdate("test", "1", test2.getVersion(), VersionType.INTERNAL, + UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + final long primaryTerm = primary.operationPrimaryTerm; + testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + + expectThrows(VersionConflictEngineException.class, () -> + primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo() + 1, primaryTerm)); + expectThrows(VersionConflictEngineException.class, () -> + primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm + 1)); closeShards(primary); } @@ -93,13 +108,16 @@ public void testTypelessGetForUpdate() throws IOException { Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}"); assertTrue(indexResult.isCreated()); - GetResult getResult = shard.getService().getForUpdate("some_type", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + GetResult getResult = shard.getService().getForUpdate( + "some_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); - getResult = shard.getService().getForUpdate("some_other_type", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + getResult = shard.getService().getForUpdate( + "some_other_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(getResult.isExists()); - getResult = shard.getService().getForUpdate("_doc", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + getResult = shard.getService().getForUpdate( + "_doc", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/update/UpdateIT.java b/server/src/test/java/org/elasticsearch/update/UpdateIT.java index 05b27758ee434..7652c503450ae 100644 --- a/server/src/test/java/org/elasticsearch/update/UpdateIT.java +++ b/server/src/test/java/org/elasticsearch/update/UpdateIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; @@ -36,7 +37,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -411,6 +414,45 @@ public void testUpdate() throws Exception { } } + public void testUpdateWithIfSeqNo() throws Exception { + createTestIndex(); + ensureGreen(); + + IndexResponse result = client().prepareIndex("test", "type1", "1").setSource("field", 1).get(); + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo() + 1) + .setIfPrimaryTerm(result.getPrimaryTerm()) + .get() + ); + + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo()) + .setIfPrimaryTerm(result.getPrimaryTerm() + 1) + .get() + ); + + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo() + 1) + .setIfPrimaryTerm(result.getPrimaryTerm() + 1) + .get() + ); + + UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo()) + .setIfPrimaryTerm(result.getPrimaryTerm()) + .get(); + + assertThat(updateResponse.status(), equalTo(RestStatus.OK)); + assertThat(updateResponse.getSeqNo(), equalTo(result.getSeqNo() + 1)); + } + public void testUpdateRequestWithBothScriptAndDoc() throws Exception { createTestIndex(); ensureGreen();