Skip to content

Commit

Permalink
Move watcher to use seq# and primary term for concurrency control (el…
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Feb 1, 2019
1 parent 85d7d50 commit 54bbb72
Show file tree
Hide file tree
Showing 42 changed files with 629 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
Expand Down Expand Up @@ -882,6 +883,24 @@ Params withWaitForActiveShards(ActiveShardCount currentActiveShardCount, ActiveS
return this;
}

Params withIfSeqNo(long ifSeqNo) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
return putParam("if_seq_no", Long.toString(ifSeqNo));
}
return this;
}

Params withIfPrimaryTerm(long ifPrimaryTerm) {
if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
return putParam("if_primary_term", Long.toString(ifPrimaryTerm));
}
return this;
}

Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
}

Params withIndicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions != null) {
withIgnoreUnavailable(indicesOptions.ignoreUnavailable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
.build();

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion());
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
params.putParam("active", "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class GetWatchResponse {
private final String id;
private final long version;
private final long seqNo;
private final long primaryTerm;
private final WatchStatus status;

private final BytesReference source;
Expand All @@ -43,15 +48,18 @@ public class GetWatchResponse {
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status,
BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public String getId() {
Expand All @@ -62,6 +70,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}
Expand Down Expand Up @@ -111,6 +127,8 @@ public int hashCode() {
private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no");
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

Expand All @@ -119,9 +137,10 @@ public int hashCode() {
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
XContentBuilder builder = (XContentBuilder) a[6];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5],
source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
Expand All @@ -131,6 +150,8 @@ public int hashCode() {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.util.Objects;
import java.util.regex.Pattern;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
Expand All @@ -42,6 +46,9 @@ public final class PutWatchRequest implements Validatable {
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
Expand Down Expand Up @@ -98,6 +105,56 @@ public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the watch's last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the watch last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
* If the watch last last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
*
* If the watch's last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}


public static boolean isValidId(String id) {
return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -32,20 +33,26 @@ public class PutWatchResponse {

static {
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
}

private String id;
private long version;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private boolean created;

public PutWatchResponse() {
}

public PutWatchResponse(String id, long version, boolean created) {
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
this.id = id;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.created = created;
}

Expand All @@ -57,6 +64,14 @@ private void setVersion(long version) {
this.version = version;
}

private void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}

private void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}

private void setCreated(boolean created) {
this.created = created;
}
Expand All @@ -69,6 +84,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isCreated() {
return created;
}
Expand All @@ -80,12 +103,14 @@ public boolean equals(Object o) {

PutWatchResponse that = (PutWatchResponse) o;

return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
&& Objects.equals(seqNo, that.seqNo)
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
}

@Override
public int hashCode() {
return Objects.hash(id, version, created);
return Objects.hash(id, version, seqNo, primaryTerm, created);
}

public static PutWatchResponse fromXContent(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui
return builder.startObject()
.field("_id", response.getId())
.field("_version", response.getVersion())
.field("_seq_no", response.getSeqNo())
.field("_primary_term", response.getPrimaryTerm())
.field("created", response.isCreated())
.endObject();
}

private static PutWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 200);
long version = randomLongBetween(1, 10);
boolean created = randomBoolean();
return new PutWatchResponse(id, version, created);
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
}
}
6 changes: 6 additions & 0 deletions x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ The action state of a newly-created watch is `awaits_successful_execution`:
--------------------------------------------------
{
"found": true,
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"_id": "my_watch",
"status": {
Expand Down Expand Up @@ -137,6 +139,8 @@ and the action is now in `ackable` state:
{
"found": true,
"_id": "my_watch",
"_seq_no": 1,
"_primary_term": 1,
"_version": 2,
"status": {
"version": 2,
Expand Down Expand Up @@ -186,6 +190,8 @@ GET _xpack/watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 2,
"_primary_term": 1,
"_version": 3,
"status": {
"version": 3,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ GET _xpack/watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": {
"state" : {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ GET _xpack/watcher/watch/my_watch
"found": true,
"_id": "my_watch",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"status": {
"state" : {
"active" : true,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/get-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Response:
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": { <1>
"version": 1,
Expand Down
Loading

0 comments on commit 54bbb72

Please sign in to comment.