Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BWC layer to seq no infra and enable BWC tests #22185

Merged
merged 24 commits into from
Dec 19, 2016
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a9d297d
wire level compatibility
bleskes Dec 9, 2016
adabd89
bwc test
bleskes Dec 10, 2016
4cacc8b
strengthen test
bleskes Dec 12, 2016
c2cb646
only account for shards on new nodes for global checkpoints
bleskes Dec 12, 2016
02b223c
fix timestamp for now as it makes assertion fail
bleskes Dec 12, 2016
2db7210
linting
bleskes Dec 12, 2016
634d92f
improve assertion message
bleskes Dec 12, 2016
4ea5dd9
line length
bleskes Dec 12, 2016
26596ff
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 13, 2016
001f7de
add skip version to cat.shards help test
bleskes Dec 13, 2016
cc9a486
force all replication requests to have toString
bleskes Dec 14, 2016
f2ca825
missing else :(
bleskes Dec 14, 2016
a945030
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 14, 2016
f4dada9
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 14, 2016
059638b
feedback and nocommit removal
bleskes Dec 15, 2016
2a15b15
feedback
bleskes Dec 16, 2016
647d5ee
update gitignore to explicitly point to backwards release folder (ins…
bleskes Dec 16, 2016
76338bf
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 16, 2016
896482f
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 17, 2016
f73d165
fix compilation
bleskes Dec 17, 2016
2cc2610
don't replicate on failures (like version conflicts)
bleskes Dec 17, 2016
bc60297
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 18, 2016
2ecbac0
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 19, 2016
7577b12
put back diamond operator for ecplise
bleskes Dec 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ html_docs

# random old stuff that we should look at the necessity of...
/tmp/
backwards/
eclipse-build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? this is where our bwc index creation python tool stores their versions?

Copy link
Contributor Author

@bleskes bleskes Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... maybe we need a different solution then - the problem is that with this line the this file was ignored by git.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call the folder bwc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw I pushed another commit updating gitignore to be more explicit about the backwords folder (rather than using a global glob pattern). I also updated the comments around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class ClusterFormationTasks {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
Map esConfig = [
'cluster.name' : node.clusterName,
'node.name' : "node-" + node.nodeNum,
'pidfile' : node.pidFile,
'path.repo' : "${node.sharedDir}/repo",
'path.shared_data' : "${node.sharedDir}/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
Expand Down Expand Up @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException {
type = in.readString();
id = in.readString();
version = in.readZLong();
seqNo = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}
Expand All @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
out.writeZLong(seqNo);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
}
out.writeBoolean(forcedRefresh);
result.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "flush {" + super.toString() + "}";
return "flush {" + shardId + "}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -102,7 +103,9 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
}

@Override
Expand All @@ -113,7 +116,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWriteable(seqNoStats);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalWriteable(seqNoStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(routing);
out.writeOptionalString(parent);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalString(null);
// timestamp, at this point #proccess was called which for previous versions meant this was set
// nocommit: can we fix this in 5.x? how?
out.writeOptionalString("0");
out.writeOptionalWriteable(null);
}
out.writeBytesReference(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public BasicReplicationRequest() {
public BasicReplicationRequest(ShardId shardId) {
super(shardId);
}

@Override
public String toString() {
return "BasicReplicationRequest{" + shardId + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -36,6 +38,9 @@
public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>> extends ReplicationRequest<R> implements WriteRequest<R> {
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

long seqNo;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra whitespaces? can this be private or protected?


/**
* Constructor for deserialization.
*/
Expand All @@ -62,11 +67,32 @@ public RefreshPolicy getRefreshPolicy() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readVLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
refreshPolicy.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeVLong(seqNo);
}
}

/**
* Returns the sequence number for this operation. The sequence number is assigned while the operation
* is performed on the primary shard.
*/
public long seqNo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get/set prefix if possible :)

return seqNo;
}

/** sets the sequence number for this operation. should only be called on the primary shard */
public void seqNo(long seqNo) {
this.seqNo = seqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
}

private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0;
assert pendingActions.get() > 0 : "pending action count goes bellow 0 for request [" + request + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/bellow/below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. thx.

if (pendingActions.decrementAndGet() == 0) {
finish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long seqNo;
long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -171,19 +170,6 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/**
* Returns the sequence number for this operation. The sequence number is assigned while the operation
* is performed on the primary shard.
*/
public long seqNo() {
return seqNo;
}

/** sets the sequence number for this operation. should only be called on the primary shard */
public void seqNo(long seqNo) {
this.seqNo = seqNo;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
return primaryTerm;
Expand Down Expand Up @@ -215,7 +201,6 @@ public void readFrom(StreamInput in) throws IOException {
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
seqNo = in.readVLong();
primaryTerm = in.readVLong();
}

Expand All @@ -232,7 +217,6 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(seqNo);
out.writeVLong(primaryTerm);
}

Expand All @@ -252,13 +236,7 @@ public Request setShardId(ShardId shardId) {
}

@Override
public String toString() {
if (shardId != null) {
return shardId.toString();
} else {
return index;
}
}
public abstract String toString(); // force a proper to string to ease debugging

@Override
public String getDescription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -983,16 +984,26 @@ public ReplicaResponse(String allocationId, long localCheckpoint) {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
allocationId = in.readString();
} else {
// we use to read empty responses
Empty.INSTANCE.readFrom(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remove this? it's doing nothing so a comment is fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeString(allocationId);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
}
}

@Override
Expand All @@ -1016,10 +1027,9 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
transportService.sendRequest(node, transportReplicaAction,
new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions,
// Eclipse can't handle when this is <> so we specify the type here.
new ActionListenerResponseHandler<ReplicaResponse>(listener, ReplicaResponse::new));
final ConcreteShardRequest<ReplicaRequest> concreteShardRequest =
new ConcreteShardRequest<>(request, replica.allocationId().getId());
sendReplicaRequest(concreteShardRequest, node, listener);
}

@Override
Expand Down Expand Up @@ -1060,6 +1070,14 @@ public void onFailure(Exception shardFailedError) {
}
}

/** sends the give replica request to the supplied nodes */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/give/given

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check

protected void sendReplicaRequest(ConcreteShardRequest<ReplicaRequest> concreteShardRequest, DiscoveryNode node,
ActionListener<ReplicationOperation.ReplicaResponse> listener) {
transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions,
// Eclipse can't handle when this is <> so we specify the type here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment still valid? I see that you removed the type..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. @jpountz maybe you can help verify eclipse is happy with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed it is not happy about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx @jpountz, I'll revert the change

new ActionListenerResponseHandler<>(listener, ReplicaResponse::new));
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static final class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException {
// timestamp
out.writeBoolean(false); // enabled
out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format());
out.writeOptionalString(null);
out.writeOptionalString("now"); // old default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/old/5.x/

out.writeOptionalBoolean(null);
}
out.writeBoolean(hasParentField());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,28 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;

public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
Expand All @@ -65,6 +64,17 @@ protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

@Override
protected void sendReplicaRequest(ConcreteShardRequest<ReplicaRequest> concreteShardRequest, DiscoveryNode node,
ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
super.sendReplicaRequest(concreteShardRequest, node, listener);
} else {
listener.onResponse(
new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}

@Override
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request, IndexShard indexShard) throws Exception {
long checkpoint = indexShard.getGlobalCheckpoint();
Expand Down Expand Up @@ -105,6 +115,11 @@ private PrimaryRequest() {
public PrimaryRequest(ShardId shardId) {
super(shardId);
}

@Override
public String toString() {
return "GlobalCkpSyncPrimary{" + shardId + "}";
}
}

public static final class ReplicaRequest extends ReplicationRequest<GlobalCheckpointSyncAction.ReplicaRequest> {
Expand Down Expand Up @@ -134,6 +149,14 @@ public void writeTo(StreamOutput out) throws IOException {
public long getCheckpoint() {
return checkpoint;
}

@Override
public String toString() {
return "GlobalCkpSyncReplica{" +
"checkpoint=" + checkpoint +
", shardId=" + shardId +
'}';
}
}

}
Loading