From d297d875fff4bf1350bf75b13651fcc681a49ead Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 22 Jan 2019 14:39:04 +0100 Subject: [PATCH 01/13] wip --- .../java/org/elasticsearch/client/CrudIT.java | 20 ++++- docs/reference/docs/update.asciidoc | 8 ++ .../resources/rest-api-spec/api/update.json | 8 ++ .../action/bulk/BulkRequest.java | 1 + .../action/update/UpdateRequest.java | 89 +++++++++++++++++++ .../action/update/UpdateRequestBuilder.java | 25 ++++++ .../elasticsearch/index/engine/Engine.java | 29 ++++++ .../index/engine/InternalEngine.java | 6 ++ .../action/document/RestUpdateAction.java | 2 + 9 files changed, 187 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index eddb6b5758452..4f01f3366e977 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -613,7 +613,7 @@ public void testUpdate() throws IOException { assertEquals(RestStatus.OK, updateResponse.status()); assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); - UpdateRequest updateRequestConflict = new UpdateRequest("index", "id"); + final UpdateRequest updateRequestConflict = new UpdateRequest("index", "id"); updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values())); updateRequestConflict.version(indexResponse.getVersion()); @@ -622,6 +622,24 @@ public void testUpdate() throws IOException { assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " + "current version [2] is different than the one provided [1]]", exception.getMessage()); + + final UpdateRequest updateRequestSeqNoConflict = new UpdateRequest("index", "id"); + updateRequestSeqNoConflict.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values())); + if (randomBoolean()) { + updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo() + 1); + updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm()); + } else { + updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo() + (randomBoolean() ? 0 : 1)); + updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm() + 1); + } + + final UpdateRequest updateRequestSeqNo = new UpdateRequest("index", "id"); + updateRequestSeqNo.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values())); + updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo()); + updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm()); + updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); + assertEquals(RestStatus.OK, updateResponse.status()); + assertEquals(updateRequestSeqNo.ifSeqNo() + 1, updateResponse.getSeqNo()); } { IndexRequest indexRequest = new IndexRequest("index").id("with_script"); diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 1cfc122bee402..313cf2be5cdf8 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -349,3 +349,11 @@ version numbers being out of sync with the external system. Use the <> instead. ===================================================== + +`if_seq_no` and `if_primary_term`:: + +Update operations can be made optional and only be performed if the last +modification to the document was assigned the sequence number and primary +term specified by the `if_seq_no` and `if_primary_term` parameters. If a +mismatch is detected, the operation will result in a `VersionConflictException` +and a status code of 409. See <> for more details. \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json index 37710cf07a10b..92f1013a317c3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json @@ -63,6 +63,14 @@ "type": "time", "description": "Explicit operation timeout" }, + "if_seq_no" : { + "type" : "number", + "description" : "only perform the update operation if the last operation that has changed the document has the specified sequence number" + }, + "if_primary_term" : { + "type" : "number", + "description" : "only perform the update operation if the last operation that has changed the document has the specified primary term" + }, "version": { "type": "number", "description": "Explicit version number for concurrency control" diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index e58b0dfbffbd3..b5c786ab2df6d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -503,6 +503,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } else if ("update".equals(action)) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) .version(version).versionType(versionType) + .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .routing(routing); // EMPTY is safe here because we never call namedObject try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index a7805b4cbdbad..ec55abe7dfbb6 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -54,6 +54,8 @@ import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class UpdateRequest extends InstanceShardOperationRequest implements DocWriteRequest, WriteRequest, ToXContentObject { @@ -66,6 +68,8 @@ public class UpdateRequest extends InstanceShardOperationRequest private static final ParseField DOC_AS_UPSERT_FIELD = new ParseField("doc_as_upsert"); private static final ParseField DETECT_NOOP_FIELD = new ParseField("detect_noop"); private static final ParseField SOURCE_FIELD = new ParseField("_source"); + private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); + private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); static { PARSER = new ObjectParser<>(UpdateRequest.class.getSimpleName()); @@ -89,6 +93,8 @@ public class UpdateRequest extends InstanceShardOperationRequest PARSER.declareField(UpdateRequest::fetchSource, (parser, context) -> FetchSourceContext.fromXContent(parser), SOURCE_FIELD, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); + PARSER.declareLong(UpdateRequest::setIfSeqNo, IF_SEQ_NO); + PARSER.declareLong(UpdateRequest::setIfPrimaryTerm, IF_SEQ_NO); } // Set to null initially so we can know to override in bulk requests that have a default type. @@ -105,6 +111,9 @@ public class UpdateRequest extends InstanceShardOperationRequest private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private int retryOnConflict = 0; + private long ifSeqNo = UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; @@ -170,6 +179,27 @@ public ActionRequestValidationException validate() { } } + if (ifSeqNo != UNASSIGNED_SEQ_NO && ( + versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY + )) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); + } + if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { + validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); + } + if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { + validationException = + addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); + } + + if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) { + validationException = addValidationError("compare and write operations can not be retried", validationException); + } + + if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) { + validationException = addValidationError("compare and write operations can not be used with upsert", validationException); + } + if (script == null && doc == null) { validationException = addValidationError("script or doc is missing", validationException); } @@ -531,6 +561,55 @@ public VersionType versionType() { return this.versionType; } + /** + * only perform this update request if the document was last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this update request if the document was last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this update request if the document was last modification was assigned this sequence number. + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this update request if the document was last modification was assigned this primary term. + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifPrimaryTerm() { + return ifPrimaryTerm; + } + @Override public OpType opType() { return OpType.UPDATE; @@ -862,6 +941,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(docAsUpsert); out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNo); + out.writeVLong(ifPrimaryTerm); + } out.writeBoolean(detectNoop); out.writeBoolean(scriptedUpsert); } @@ -880,6 +963,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.copyCurrentStructure(parser); } } + + if (ifSeqNo != UNASSIGNED_SEQ_NO) { + builder.field(IF_SEQ_NO.getPreferredName(), ifSeqNo); + builder.field(IF_PRIMARY_TERM.getPreferredName(), ifPrimaryTerm); + } + if (script != null) { builder.field("script", script); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 181dba6a10734..bfc77717f770c 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.update; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -150,6 +151,30 @@ public UpdateRequestBuilder setVersionType(VersionType versionType) { return this; } + /** + * only perform this update request if the document was last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequestBuilder setIfSeqNo(long seqNo) { + request.setIfSeqNo(seqNo); + return this; + } + + /** + * only perform this update request if the document was last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public UpdateRequestBuilder setIfPrimaryTerm(long term) { + request.setIfPrimaryTerm(term); + return this; + } + /** * Sets the number of shard copies that must be active before proceeding with the write. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e1df104d338df..b4a93f2764163 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -622,6 +623,12 @@ protected final GetResult getFromSearcher(Get get, BiFunction search throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } + if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term + )) { + throw new VersionConflictEngineException(shardId, get.type(), get.id(), + get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term); + } if (get.isReadFromTranslog()) { // this is only used for updates - API _GET calls will always read form a reader for consistency // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 033176c4a7300..463a18ea6b802 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -86,6 +86,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC updateRequest.version(RestActions.parseVersion(request)); updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); + updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); request.applyContentParser(parser -> { updateRequest.fromXContent(parser); From cb585db70e3bc7ac1a80cd85c33f6050dd9470da Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 10:45:21 +0100 Subject: [PATCH 02/13] add rest test --- .../test/update/35_if_seq_no.yml | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml new file mode 100644 index 0000000000000..5c61359c84be7 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -0,0 +1,49 @@ +--- +"Internal version": + + - skip: + version: " - 6.99.99" + reason: if_seq_no was added in 7.0 + + - do: + catch: missing + update: + index: test_1 + id: 1 + if_seq_no: 1 + if_primay_term: 1 + body: + doc: { foo: baz } + + - do: + index: + index: test_1 + id: 1 + body: + doc: { foo: baz } + + - do: + catch: conflict + update: + index: test_1 + id: 1 + if_seq_no: 2 + if_primay_term: 1 + body: + doc: { foo: baz } + + - do: + update: + index: test_1 + id: 1 + if_seq_no: 1 + if_primay_term: 1 + body: + doc: { foo: bar } + + - do: + get: + index: test_1 + id: 1 + + - match: { _source: { foo: bar } } From e7eb0a73e8e4e6c6d23fe42fa6fd412aed99865e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 10:51:47 +0100 Subject: [PATCH 03/13] lint --- .../rest-api-spec/test/update/35_if_seq_no.yml | 16 +++++++++++++++- .../action/update/UpdateRequestBuilder.java | 1 - 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml index 5c61359c84be7..3f1b6ed3f5bf6 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -1,5 +1,5 @@ --- -"Internal version": +"Update with if_seq_no": - skip: version: " - 6.99.99" @@ -47,3 +47,17 @@ id: 1 - match: { _source: { foo: bar } } + + - do: + bulk: + body: + - update: + _index: test + _id: 1 + if_seq_no: 100 + if_primary_term: 200 + - foo: baz + + - match: { errors: true } + - match: { items.0.update.status: 409 } + diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index bfc77717f770c..919f460e8c07b 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.update; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequest; From 11d3986c86f4c5523135c164048874c309da68f9 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 11:09:48 +0100 Subject: [PATCH 04/13] fix write issue --- .../java/org/elasticsearch/action/update/UpdateRequest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index ec55abe7dfbb6..d90cb05fa3818 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -890,6 +890,10 @@ public void readFrom(StreamInput in) throws IOException { docAsUpsert = in.readBoolean(); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } detectNoop = in.readBoolean(); scriptedUpsert = in.readBoolean(); } From 1b4fbf1fc6e87c9a836210bc3ceeb1fc02c1e510 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 11:10:15 +0100 Subject: [PATCH 05/13] sigh --- .../main/resources/rest-api-spec/test/update/35_if_seq_no.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml index 3f1b6ed3f5bf6..61281dc11420d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -11,7 +11,7 @@ index: test_1 id: 1 if_seq_no: 1 - if_primay_term: 1 + if_primary_term: 1 body: doc: { foo: baz } @@ -28,7 +28,7 @@ index: test_1 id: 1 if_seq_no: 2 - if_primay_term: 1 + if_primary_term: 1 body: doc: { foo: baz } From 8d2a1a553e2141856a44c129946a5300cf5cee9a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 12:15:55 +0100 Subject: [PATCH 06/13] wires --- .../action/update/UpdateHelper.java | 4 +- .../elasticsearch/index/engine/Engine.java | 1 + .../index/get/ShardGetService.java | 24 ++++++----- .../index/engine/InternalEngineTests.java | 32 ++++++++++++++ .../index/shard/ShardGetServiceTests.java | 34 +++++++++++---- .../org/elasticsearch/update/UpdateIT.java | 42 +++++++++++++++++++ 6 files changed, 117 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index a8a5fb8f72f30..8cd6146768fff 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -69,8 +69,8 @@ public UpdateHelper(ScriptService scriptService) { * Prepares an update request by converting it into an index or delete request or an update response (no action). */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(), - request.versionType()); + final GetResult getResult = indexShard.getService().getForUpdate( + request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b4a93f2764163..e450e93e9d397 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -626,6 +626,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory = engine::acquireSearcher; + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); + Engine.IndexResult indexResult = engine.index(create); + if (randomBoolean()) { + engine.refresh("test"); + } + if (randomBoolean()) { + engine.flush(); + } + try (Engine.GetResult get = engine.get( + new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get()), + searcherFactory)) { + assertEquals(indexResult.getSeqNo(), get.docIdAndVersion().seqNo); + } + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get()), + searcherFactory)); + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get() + 1), + searcherFactory)); + + expectThrows(VersionConflictEngineException.class, () -> engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + .setIfSeqNo(indexResult.getSeqNo() + 1).setIfPrimaryTerm(primaryTerm.get() + 1), + searcherFactory)); + } + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 7db904f89dfa8..14e513ff89cfe 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -20,17 +20,21 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.RoutingFieldMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; +import static org.elasticsearch.common.lucene.uid.Versions.MATCH_ANY; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + public class ShardGetServiceTests extends IndexShardTestCase { public void testGetForUpdate() throws IOException { @@ -47,7 +51,8 @@ public void testGetForUpdate() throws IOException { recoverShardFromStore(primary); Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + GetResult testGet = primary.getService().getForUpdate( + "test", "0", test.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -56,7 +61,8 @@ public void testGetForUpdate() throws IOException { Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + GetResult testGet1 = primary.getService().getForUpdate( + "test", "1", test1.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); @@ -69,13 +75,22 @@ public void testGetForUpdate() throws IOException { } // now again from the reader - test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + Engine.IndexResult test2 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + testGet1 = primary.getService().getForUpdate("test", "1", test2.getVersion(), VersionType.INTERNAL, + UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + final long primaryTerm = primary.operationPrimaryTerm; + testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + + expectThrows(VersionConflictEngineException.class, () -> + primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo() + 1, primaryTerm)); + expectThrows(VersionConflictEngineException.class, () -> + primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm + 1)); closeShards(primary); } @@ -93,13 +108,16 @@ public void testTypelessGetForUpdate() throws IOException { Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}"); assertTrue(indexResult.isCreated()); - GetResult getResult = shard.getService().getForUpdate("some_type", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + GetResult getResult = shard.getService().getForUpdate( + "some_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); - getResult = shard.getService().getForUpdate("some_other_type", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + getResult = shard.getService().getForUpdate( + "some_other_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(getResult.isExists()); - getResult = shard.getService().getForUpdate("_doc", "0", Versions.MATCH_ANY, VersionType.INTERNAL); + getResult = shard.getService().getForUpdate( + "_doc", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/update/UpdateIT.java b/server/src/test/java/org/elasticsearch/update/UpdateIT.java index 05b27758ee434..7652c503450ae 100644 --- a/server/src/test/java/org/elasticsearch/update/UpdateIT.java +++ b/server/src/test/java/org/elasticsearch/update/UpdateIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; @@ -36,7 +37,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -411,6 +414,45 @@ public void testUpdate() throws Exception { } } + public void testUpdateWithIfSeqNo() throws Exception { + createTestIndex(); + ensureGreen(); + + IndexResponse result = client().prepareIndex("test", "type1", "1").setSource("field", 1).get(); + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo() + 1) + .setIfPrimaryTerm(result.getPrimaryTerm()) + .get() + ); + + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo()) + .setIfPrimaryTerm(result.getPrimaryTerm() + 1) + .get() + ); + + expectThrows(VersionConflictEngineException.class, () -> + client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo() + 1) + .setIfPrimaryTerm(result.getPrimaryTerm() + 1) + .get() + ); + + UpdateResponse updateResponse = client().prepareUpdate(indexOrAlias(), "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 2).endObject()) + .setIfSeqNo(result.getSeqNo()) + .setIfPrimaryTerm(result.getPrimaryTerm()) + .get(); + + assertThat(updateResponse.status(), equalTo(RestStatus.OK)); + assertThat(updateResponse.getSeqNo(), equalTo(result.getSeqNo() + 1)); + } + public void testUpdateRequestWithBothScriptAndDoc() throws Exception { createTestIndex(); ensureGreen(); From 4cb6b60ffb872249cf35d5d24317542cfa13c78a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 12:40:17 +0100 Subject: [PATCH 07/13] sigh --- .../main/resources/rest-api-spec/test/update/35_if_seq_no.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml index 61281dc11420d..06dc79668dd90 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -37,7 +37,7 @@ index: test_1 id: 1 if_seq_no: 1 - if_primay_term: 1 + if_primary_term: 1 body: doc: { foo: bar } From dc89962aa2e73484783db5488a3f2fb44d321f77 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 12:42:23 +0100 Subject: [PATCH 08/13] numbers are hard --- .../main/resources/rest-api-spec/test/update/35_if_seq_no.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml index 06dc79668dd90..e212ed7680cc7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -27,7 +27,7 @@ update: index: test_1 id: 1 - if_seq_no: 2 + if_seq_no: 234 if_primary_term: 1 body: doc: { foo: baz } @@ -36,7 +36,7 @@ update: index: test_1 id: 1 - if_seq_no: 1 + if_seq_no: 0 if_primary_term: 1 body: doc: { foo: bar } From 96dd81ee66b7fe98473a50ef0a731d5e80846e99 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 12:44:01 +0100 Subject: [PATCH 09/13] rest fix --- .../resources/rest-api-spec/test/update/35_if_seq_no.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml index e212ed7680cc7..dbc569104cf4c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_if_seq_no.yml @@ -20,7 +20,7 @@ index: test_1 id: 1 body: - doc: { foo: baz } + foo: baz - do: catch: conflict @@ -52,11 +52,12 @@ bulk: body: - update: - _index: test + _index: test_1 _id: 1 if_seq_no: 100 if_primary_term: 200 - - foo: baz + - doc: + foo: baz - match: { errors: true } - match: { items.0.update.status: 409 } From ff6c34e90c808eed01223bcb86f6a186bab44f12 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 14:21:31 +0100 Subject: [PATCH 10/13] some assertions --- .../src/test/java/org/elasticsearch/client/CrudIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 4f01f3366e977..a978090972278 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -68,9 +68,9 @@ import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; import org.elasticsearch.rest.action.document.RestGetAction; +import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.rest.action.document.RestMultiGetAction; import org.elasticsearch.rest.action.document.RestUpdateAction; -import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; @@ -92,6 +92,7 @@ import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; @@ -612,6 +613,8 @@ public void testUpdate() throws IOException { UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); assertEquals(RestStatus.OK, updateResponse.status()); assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); + assertThat(updateResponse.getSeqNo(), greaterThanOrEqualTo(0L)); + assertThat(updateResponse.getPrimaryTerm(), greaterThanOrEqualTo(1L)); final UpdateRequest updateRequestConflict = new UpdateRequest("index", "id"); updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values())); From 2b8f9f84578514ca5271fbf597ad528ee7202e38 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 25 Jan 2019 14:27:10 +0100 Subject: [PATCH 11/13] wrong field --- .../java/org/elasticsearch/client/CrudIT.java | 71 ++++++++++--------- .../action/update/UpdateRequest.java | 2 +- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index a978090972278..1bf1f2487cd29 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -90,6 +90,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -607,42 +608,46 @@ public void testUpdate() throws IOException { IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT); assertEquals(RestStatus.CREATED, indexResponse.status()); - UpdateRequest updateRequest = new UpdateRequest("index", "id"); - updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values())); - UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); - assertEquals(RestStatus.OK, updateResponse.status()); - assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); - assertThat(updateResponse.getSeqNo(), greaterThanOrEqualTo(0L)); - assertThat(updateResponse.getPrimaryTerm(), greaterThanOrEqualTo(1L)); - - final UpdateRequest updateRequestConflict = new UpdateRequest("index", "id"); - updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values())); - updateRequestConflict.version(indexResponse.getVersion()); - - ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> - execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync)); - assertEquals(RestStatus.CONFLICT, exception.status()); - assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " + - "current version [2] is different than the one provided [1]]", exception.getMessage()); - - final UpdateRequest updateRequestSeqNoConflict = new UpdateRequest("index", "id"); - updateRequestSeqNoConflict.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values())); - if (randomBoolean()) { - updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo() + 1); - updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm()); - } else { - updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo() + (randomBoolean() ? 0 : 1)); - updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm() + 1); + long lastUpdateSeqNo; + long lastUpdatePrimaryTerm; + { + UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values())); + final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); + assertEquals(RestStatus.OK, updateResponse.status()); + assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion()); + lastUpdateSeqNo = updateResponse.getSeqNo(); + lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm(); + assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L)); + assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L)); } - final UpdateRequest updateRequestSeqNo = new UpdateRequest("index", "id"); - updateRequestSeqNo.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values())); - updateRequestConflict.setIfSeqNo(updateResponse.getSeqNo()); - updateRequestConflict.setIfPrimaryTerm(updateResponse.getPrimaryTerm()); - updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); - assertEquals(RestStatus.OK, updateResponse.status()); - assertEquals(updateRequestSeqNo.ifSeqNo() + 1, updateResponse.getSeqNo()); + { + final UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values())); + if (randomBoolean()) { + updateRequest.setIfSeqNo(lastUpdateSeqNo + 1); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm); + } else { + updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1)); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1); + } + ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> + execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync)); + assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status()); + assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception")); + } + { + final UpdateRequest updateRequest = new UpdateRequest("index", "id"); + updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values())); + updateRequest.setIfSeqNo(lastUpdateSeqNo); + updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm); + final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync); + assertEquals(RestStatus.OK, updateResponse.status()); + assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo()); + assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm()); + } } { IndexRequest indexRequest = new IndexRequest("index").id("with_script"); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index d90cb05fa3818..e7b472347a54e 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -94,7 +94,7 @@ public class UpdateRequest extends InstanceShardOperationRequest (parser, context) -> FetchSourceContext.fromXContent(parser), SOURCE_FIELD, ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); PARSER.declareLong(UpdateRequest::setIfSeqNo, IF_SEQ_NO); - PARSER.declareLong(UpdateRequest::setIfPrimaryTerm, IF_SEQ_NO); + PARSER.declareLong(UpdateRequest::setIfPrimaryTerm, IF_PRIMARY_TERM); } // Set to null initially so we can know to override in bulk requests that have a default type. From a7c75e5853f0047452d48cdeda7b90ec3c261497 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 28 Jan 2019 16:39:12 -0500 Subject: [PATCH 12/13] feedback --- .../elasticsearch/action/DocWriteRequest.java | 64 +++++++++++++++++++ .../action/delete/DeleteRequest.java | 21 +----- .../action/index/IndexRequest.java | 21 +----- .../action/update/UpdateRequest.java | 17 +---- .../index/get/ShardGetService.java | 4 +- .../index/engine/InternalEngineTests.java | 2 +- 6 files changed, 72 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index fdf62e951a517..d8a9a3503a617 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -24,11 +24,16 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; import java.io.IOException; import java.util.Locale; +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} @@ -117,6 +122,39 @@ public interface DocWriteRequest extends IndicesRequest { */ T versionType(VersionType versionType); + /** + * only perform this request if the document was last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + T setIfSeqNo(long seqNo); + + /** + * only performs this request if the document was last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + T setIfPrimaryTerm(long term); + + /** + * If set, only perform this request if the document was last modification was assigned this sequence number. + * If the document last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + long ifSeqNo(); + + /** + * If set, only perform this request if the document was last modification was assigned this primary term. + * + * If the document last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + long ifPrimaryTerm(); + /** * Get the requested document operation type of the request * @return the operation type {@link OpType} @@ -216,4 +254,30 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); } } + + static ActionRequestValidationException validateSeqNoBasedCASParams( + DocWriteRequest request, ActionRequestValidationException validationException) { + if (request.versionType().validateVersionForWrites(request.version()) == false) { + validationException = addValidationError("illegal version value [" + request.version() + "] for version type [" + + request.versionType().name() + "]", validationException); + } + if (request.versionType() == VersionType.FORCE) { + validationException = addValidationError("version type [force] may no longer be used", validationException); + } + + if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && ( + request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY + )) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); + } + if (request.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() != UNASSIGNED_SEQ_NO) { + validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); + } + if (request.ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() == UNASSIGNED_SEQ_NO) { + validationException = + addValidationError("ifSeqNo is unassigned, but primary term is [" + request.ifPrimaryTerm() + "]", validationException); + } + + return validationException; + } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 8d2967fd28ba4..a033bf3cb000f 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -110,27 +110,8 @@ public ActionRequestValidationException validate() { if (Strings.isEmpty(id)) { validationException = addValidationError("id is missing", validationException); } - if (versionType.validateVersionForWrites(version) == false) { - validationException = addValidationError("illegal version value [" + version + "] for version type [" - + versionType.name() + "]", validationException); - } - if (versionType == VersionType.FORCE) { - validationException = addValidationError("version type [force] may no longer be used", validationException); - } - - if (ifSeqNo != UNASSIGNED_SEQ_NO && ( - versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY - )) { - validationException = addValidationError("compare and write operations can not use versioning", validationException); - } - if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { - validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); - } - if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { - validationException = - addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); - } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); return validationException; } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index a9aac3025de1e..37d960831776d 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -188,14 +188,7 @@ public ActionRequestValidationException validate() { addValidationError("an id is required for a " + opType() + " operation", validationException); } - if (!versionType.validateVersionForWrites(resolvedVersion)) { - validationException = addValidationError("illegal version value [" + resolvedVersion + "] for version type [" - + versionType.name() + "]", validationException); - } - - if (versionType == VersionType.FORCE) { - validationException = addValidationError("version type [force] may no longer be used", validationException); - } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) { validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " + @@ -210,18 +203,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - if (ifSeqNo != UNASSIGNED_SEQ_NO && ( - versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY - )) { - validationException = addValidationError("compare and write operations can not use versioning", validationException); - } - if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { - validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); - } - if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { - validationException = - addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); - } return validationException; } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index e7b472347a54e..2a1865aa80818 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -179,18 +179,7 @@ public ActionRequestValidationException validate() { } } - if (ifSeqNo != UNASSIGNED_SEQ_NO && ( - versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY - )) { - validationException = addValidationError("compare and write operations can not use versioning", validationException); - } - if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { - validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); - } - if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { - validationException = - addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); - } + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) { validationException = addValidationError("compare and write operations can not be retried", validationException); @@ -562,7 +551,7 @@ public VersionType versionType() { } /** - * only perform this update request if the document was last modification was assigned the given + * only perform this update request if the document's modification was assigned the given * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} * * If the document last modification was assigned a different sequence number a @@ -577,7 +566,7 @@ public UpdateRequest setIfSeqNo(long seqNo) { } /** - * only performs this update request if the document was last modification was assigned the given + * only performs this update request if the document's last modification was assigned the given * primary term. Must be used in combination with {@link #setIfSeqNo(long)} * * If the document last modification was assigned a different term a diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 76668267a5cf0..9fb1cb804946f 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -117,7 +117,7 @@ public GetResult getForUpdate(String type, String id, long version, VersionType public GetResult get(Engine.GetResult engineGetResult, String id, String type, String[] fields, FetchSourceContext fetchSourceContext) { if (!engineGetResult.exists()) { - return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, 0, -1, false, null, null); + return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null); } currentMetric.inc(); @@ -173,7 +173,7 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea } if (get == null || get.exists() == false) { - return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, 0, -1, false, null, null); + return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null); } try { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 66375697bbdd1..f57bff72fc57c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1391,7 +1391,7 @@ public void testGetIfSeqNoIfPrimaryTerm() throws IOException { engine.flush(); } try (Engine.GetResult get = engine.get( - new Engine.Get(true, false, doc.type(), doc.id(), create.uid()) + new Engine.Get(true, true, doc.type(), doc.id(), create.uid()) .setIfSeqNo(indexResult.getSeqNo()).setIfPrimaryTerm(primaryTerm.get()), searcherFactory)) { assertEquals(indexResult.getSeqNo(), get.docIdAndVersion().seqNo); From 42ad9d43bc6bf13f0522574d6e5431efe1cb0e6d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 29 Jan 2019 06:31:14 -0500 Subject: [PATCH 13/13] conditional --- docs/reference/docs/delete.asciidoc | 2 +- docs/reference/docs/index_.asciidoc | 2 +- docs/reference/docs/update.asciidoc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/docs/delete.asciidoc b/docs/reference/docs/delete.asciidoc index bc6f7b840048d..22301b98f1031 100644 --- a/docs/reference/docs/delete.asciidoc +++ b/docs/reference/docs/delete.asciidoc @@ -39,7 +39,7 @@ The result of the above delete operation is: [[optimistic-concurrency-control-delete]] === Optimistic concurrency control -Delete operations can be made optional and only be performed if the last +Delete operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number and primary term specified by the `if_seq_no` and `if_primary_term` parameters. If a mismatch is detected, the operation will result in a `VersionConflictException` diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index c7ca42bfaf4c4..257b88289d87a 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -185,7 +185,7 @@ The result of the above index operation is: [[optimistic-concurrency-control-index]] === Optimistic concurrency control -Index operations can be made optional and only be performed if the last +Index operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number and primary term specified by the `if_seq_no` and `if_primary_term` parameters. If a mismatch is detected, the operation will result in a `VersionConflictException` diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 313cf2be5cdf8..42840b1b0a5ec 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -352,7 +352,7 @@ version numbers being out of sync with the external system. Use the `if_seq_no` and `if_primary_term`:: -Update operations can be made optional and only be performed if the last +Update operations can be made conditional and only be performed if the last modification to the document was assigned the sequence number and primary term specified by the `if_seq_no` and `if_primary_term` parameters. If a mismatch is detected, the operation will result in a `VersionConflictException`