Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Seq# based optimistic concurrency control to UpdateRequest #37872

Merged
merged 20 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestGetAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestMultiGetAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
Expand All @@ -90,8 +90,10 @@
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -606,22 +608,46 @@ public void testUpdate() throws IOException {
IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
assertEquals(RestStatus.CREATED, indexResponse.status());

UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());

UpdateRequest updateRequestConflict = new UpdateRequest("index", "id");
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
updateRequestConflict.version(indexResponse.getVersion());
long lastUpdateSeqNo;
long lastUpdatePrimaryTerm;
{
UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
lastUpdateSeqNo = updateResponse.getSeqNo();
lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm();
assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L));
assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L));
}

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " +
"current version [2] is different than the one provided [1]]", exception.getMessage());
{
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values()));
if (randomBoolean()) {
updateRequest.setIfSeqNo(lastUpdateSeqNo + 1);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
} else {
updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1));
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1);
}
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status());
assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception"));
}
{
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values()));
updateRequest.setIfSeqNo(lastUpdateSeqNo);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo());
assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm());
}
}
{
IndexRequest indexRequest = new IndexRequest("index").id("with_script");
Expand Down
8 changes: 8 additions & 0 deletions docs/reference/docs/update.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,11 @@ version numbers being out of sync with the external system. Use the
<<docs-index_,`index` API>> instead.

=====================================================

`if_seq_no` and `if_primary_term`::

Update operations can be made optional and only be performed if the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean "conditional" and not "optional"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. I took this over from the text in IndexRequest. I can change all if you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to change it all

modification to the document was assigned the sequence number and primary
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
mismatch is detected, the operation will result in a `VersionConflictException`
and a status code of 409. See <<optimistic-concurrency-control>> for more details.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@
"type": "time",
"description": "Explicit operation timeout"
},
"if_seq_no" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
"Update with if_seq_no":

- skip:
version: " - 6.99.99"
reason: if_seq_no was added in 7.0

- do:
catch: missing
update:
index: test_1
id: 1
if_seq_no: 1
if_primary_term: 1
body:
doc: { foo: baz }

- do:
index:
index: test_1
id: 1
body:
foo: baz

- do:
catch: conflict
update:
index: test_1
id: 1
if_seq_no: 234
if_primary_term: 1
body:
doc: { foo: baz }

- do:
update:
index: test_1
id: 1
if_seq_no: 0
if_primary_term: 1
body:
doc: { foo: bar }

- do:
get:
index: test_1
id: 1

- match: { _source: { foo: bar } }

- do:
bulk:
body:
- update:
_index: test_1
_id: 1
if_seq_no: 100
if_primary_term: 200
- doc:
foo: baz

- match: { errors: true }
- match: { items.0.update.status: 409 }

Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public UpdateHelper(ScriptService scriptService) {
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(),
request.versionType());
final GetResult getResult = indexShard.getService().getForUpdate(
request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.util.Map;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
implements DocWriteRequest<UpdateRequest>, WriteRequest<UpdateRequest>, ToXContentObject {
Expand All @@ -66,6 +68,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private static final ParseField DOC_AS_UPSERT_FIELD = new ParseField("doc_as_upsert");
private static final ParseField DETECT_NOOP_FIELD = new ParseField("detect_noop");
private static final ParseField SOURCE_FIELD = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");

static {
PARSER = new ObjectParser<>(UpdateRequest.class.getSimpleName());
Expand All @@ -89,6 +93,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
PARSER.declareField(UpdateRequest::fetchSource,
(parser, context) -> FetchSourceContext.fromXContent(parser), SOURCE_FIELD,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
PARSER.declareLong(UpdateRequest::setIfSeqNo, IF_SEQ_NO);
PARSER.declareLong(UpdateRequest::setIfPrimaryTerm, IF_PRIMARY_TERM);
}

// Set to null initially so we can know to override in bulk requests that have a default type.
Expand All @@ -105,6 +111,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private int retryOnConflict = 0;
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

Expand Down Expand Up @@ -170,6 +179,27 @@ public ActionRequestValidationException validate() {
}
}

if (ifSeqNo != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 3 conditions are the same as for index request. Can we share this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I now changed to take another approach which I also took in a future PR for something else. Let me know if you prefer it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}

if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) {
validationException = addValidationError("compare and write operations can not be retried", validationException);
}

if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) {
validationException = addValidationError("compare and write operations can not be used with upsert", validationException);
}

if (script == null && doc == null) {
validationException = addValidationError("script or doc is missing", validationException);
}
Expand Down Expand Up @@ -531,6 +561,55 @@ public VersionType versionType() {
return this.versionType;
}

/**
* only perform this update request if the document was last modification was assigned the given
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something wrong with this sentence (same for some of these other docs here and in the requestbuilder class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thanks

* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public UpdateRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only performs this update request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public UpdateRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}

/**
* If set, only perform this update request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

/**
* If set, only perform this update request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}

@Override
public OpType opType() {
return OpType.UPDATE;
Expand Down Expand Up @@ -811,6 +890,10 @@ public void readFrom(StreamInput in) throws IOException {
docAsUpsert = in.readBoolean();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
}
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
}
Expand Down Expand Up @@ -862,6 +945,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(docAsUpsert);
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
}
out.writeBoolean(detectNoop);
out.writeBoolean(scriptedUpsert);
}
Expand All @@ -880,6 +967,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.copyCurrentStructure(parser);
}
}

if (ifSeqNo != UNASSIGNED_SEQ_NO) {
builder.field(IF_SEQ_NO.getPreferredName(), ifSeqNo);
builder.field(IF_PRIMARY_TERM.getPreferredName(), ifPrimaryTerm);
}

if (script != null) {
builder.field("script", script);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ public UpdateRequestBuilder setVersionType(VersionType versionType) {
return this;
}

/**
* only perform this update request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public UpdateRequestBuilder setIfSeqNo(long seqNo) {
request.setIfSeqNo(seqNo);
return this;
}

/**
* only perform this update request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public UpdateRequestBuilder setIfPrimaryTerm(long term) {
request.setIfPrimaryTerm(term);
return this;
}

/**
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
Expand Down
Loading