Skip to content

Commit

Permalink
Port Primary Terms to master elastic#17044
Browse files Browse the repository at this point in the history
Primary terms is a way to make sure that operations replicated from stale primary are rejected by shards following a newly elected primary.

Original PRs adding this to the seq# feature branch elastic#14062 , elastic#14651 . Unlike those PR, here we take a different approach (based on newer code in master) where the primary terms are stored in the meta data only (and not in `ShardRouting` objects).

Relates to elastic#17038

Closes elastic#17044
  • Loading branch information
bleskes committed Mar 25, 2016
1 parent 85b06f4 commit fe43eef
Show file tree
Hide file tree
Showing 24 changed files with 1,060 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

Expand Down Expand Up @@ -148,6 +150,16 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
return primaryTerm;
}

/** marks the primary term in which the operation was performed */
public void primaryTerm(long term) {
primaryTerm = term;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -169,6 +181,7 @@ public void readFrom(StreamInput in) throws IOException {
timeout = TimeValue.readTimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();
}

@Override
Expand All @@ -184,6 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -359,32 +358,7 @@ public void onTimeout(TimeValue timeout) {
}
});
} else {
try {
failReplicaIfNeeded(t);
} catch (Throwable unexpected) {
logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
} finally {
responseWithFailure(t);
}
}
}

private void failReplicaIfNeeded(Throwable t) {
Index index = request.shardId().getIndex();
int shardId = request.shardId().id();
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
if (ignoreReplicaException(t) == false) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
return;
}
IndexShard indexShard = indexService.getShardOrNull(shardId);
if (indexShard == null) {
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
return;
}
indexShard.failShard(actionName + " failed on replica", t);
}
}

Expand All @@ -401,7 +375,7 @@ protected void responseWithFailure(Throwable t) {
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
Expand Down Expand Up @@ -707,7 +681,6 @@ protected void doRun() throws Exception {
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
executeLocally();

} else {
executeRemotely();
}
Expand All @@ -716,6 +689,7 @@ protected void doRun() throws Exception {
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
Expand Down Expand Up @@ -825,17 +799,17 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, true);
return IndexShardReferenceImpl.createOnPrimary(indexShard);
}

/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, false);
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
}

/**
Expand Down Expand Up @@ -1098,9 +1072,13 @@ private void doFinish() {
totalShards,
success.get(),
failuresArray

)
);
if (logger.isTraceEnabled()) {
logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
finalResponse.getShardInfo());
}

try {
channel.sendResponse(finalResponse);
} catch (IOException responseException) {
Expand All @@ -1125,22 +1103,33 @@ interface IndexShardReference extends Releasable {
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();

/** returns the primary term of the current operation */
long opPrimaryTerm();
}

static final class IndexShardReferenceImpl implements IndexShardReference {

private final IndexShard indexShard;
private final Releasable operationLock;

IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
this.indexShard = indexShard;
if (primaryAction) {
if (primaryTerm < 0) {
operationLock = indexShard.acquirePrimaryOperationLock();
} else {
operationLock = indexShard.acquireReplicaOperationLock();
operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
}
}

static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
return new IndexShardReferenceImpl(indexShard, -1);
}

static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
return new IndexShardReferenceImpl(indexShard, primaryTerm);
}

@Override
public void close() {
operationLock.close();
Expand All @@ -1160,6 +1149,11 @@ public void failShard(String reason, @Nullable Throwable e) {
public ShardRouting routingEntry() {
return indexShard.routingEntry();
}

@Override
public long opPrimaryTerm() {
return indexShard.getPrimaryTerm();
}
}

protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
Expand Down
51 changes: 34 additions & 17 deletions core/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

/**
* Represents the current state of the cluster.
*
* <p>
* The cluster state object is immutable with an
* exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
Expand All @@ -74,7 +74,7 @@
* the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
* method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
* publishing mechanism can be overridden by other discovery.
*
* <p>
* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state
* differences instead of the entire state on each change. The publishing mechanism should only send differences
* to a node if this node was present in the previous version of the cluster state. If a node is not present was
Expand Down Expand Up @@ -135,7 +135,7 @@ public static <T extends Custom> T lookupPrototype(String type) {

public static <T extends Custom> T lookupPrototypeSafe(String type) {
@SuppressWarnings("unchecked")
T proto = (T)customPrototypes.get(type);
T proto = (T) customPrototypes.get(type);
if (proto == null) {
throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
}
Expand Down Expand Up @@ -281,6 +281,16 @@ public String prettyPrint() {
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
sb.append("meta data version: ").append(metaData.version()).append("\n");
for (IndexMetaData indexMetaData : metaData) {
final String TAB = " ";
sb.append(TAB).append(indexMetaData.getIndex());
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
}
}
sb.append(blocks().prettyPrint());
sb.append(nodes().prettyPrint());
sb.append(routingTable().prettyPrint());
Expand Down Expand Up @@ -477,6 +487,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();

builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS);
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard));
}
builder.endObject();

builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
builder.startArray(String.valueOf(cursor.key));
Expand All @@ -487,6 +503,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();

// index metadata
builder.endObject();
}
builder.endObject();
Expand Down Expand Up @@ -683,16 +700,16 @@ public static byte[] toBytes(ClusterState state) throws IOException {
}

/**
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
*/
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
return readFrom(StreamInput.wrap(data), localNode);
}

/**
* @param in input stream
* @param localNode used to set the local node in the cluster state. can be null.
* @param in input stream
* @param localNode used to set the local node in the cluster state. can be null.
*/
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode);
Expand Down Expand Up @@ -791,17 +808,17 @@ public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
metaData = proto.metaData.readDiffFrom(in);
blocks = proto.blocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}

@Override
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
@Override
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
}

@Override
Expand Down
Loading

0 comments on commit fe43eef

Please sign in to comment.