diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7a095b5281a33..6058c1a3e3892 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; @@ -181,20 +182,32 @@ public static void performOnPrimary( @Override protected void doRun() throws Exception { - while (context.hasMoreOperationsToExecute()) { - if (executeBulkItemRequest( - context, - updateHelper, - nowInMillisSupplier, - mappingUpdater, - waitForMappingUpdate, - ActionListener.wrap(v -> executor.execute(this), this::onRejection) - ) == false) { - // We are waiting for a mapping update on another thread, that will invoke this action again once its done - // so we just break out here. - return; + String uid = UUIDs.base64UUID(); + long transactionId = -1L; + try { + transactionId = primary.startTransaction(uid); + while (context.hasMoreOperationsToExecute()) { + if (executeBulkItemRequest( + context, + updateHelper, + nowInMillisSupplier, + mappingUpdater, + waitForMappingUpdate, + ActionListener.wrap(v -> executor.execute(this), this::onRejection) + ) == false) { + // We are waiting for a mapping update on another thread, that will invoke this action again once its done + // so we just break out here. + return; + } + assert context.isInitial(); // either completed and moved to next or reset } - assert context.isInitial(); // either completed and moved to next or reset + + primary.commitTransaction(uid, transactionId); + } catch (Exception x) { + logger.warn("Encountered an error while executing bulk transaction", x); + primary.rollbackTransaction(uid, transactionId); + } finally { + primary.closeTransaction(uid, transactionId); } primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); // We're done, there's no more operations to execute so we resolve the wrapped listener @@ -206,7 +219,6 @@ public void onRejection(Exception e) { // We must finish the outstanding request. Finishing the outstanding request can include // refreshing and fsyncing. Therefore, we must force execution on the WRITE thread. executor.execute(new ActionRunnable<>(listener) { - @Override protected void doRun() { // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7ebd45336fe77..c6c482c3e0b5d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -321,6 +321,14 @@ public Condition newCondition() { } } + public abstract long startTransaction(String id) throws IOException; + + public abstract boolean commitTransaction(String id, long transactionId) throws IOException; + + public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException; + + public abstract boolean closeTransaction(String id, long transactionId) throws IOException; + /** * Perform document index operation on the engine * @param index operation to perform @@ -1271,7 +1279,8 @@ public abstract static class Operation { public enum TYPE { INDEX, DELETE, - NO_OP; + NO_OP, + TX_OP; private final String lowercase; @@ -1597,6 +1606,43 @@ public int estimatedSizeInBytes() { } + public static class TxOp extends Operation { + public TxOp(final long startTime) { + super(null, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, Versions.NOT_FOUND, null, null, startTime); + } + + @Override + public Term uid() { + throw new UnsupportedOperationException(); + } + + @Override + public long version() { + throw new UnsupportedOperationException(); + } + + @Override + public VersionType versionType() { + throw new UnsupportedOperationException(); + } + + @Override + String id() { + throw new UnsupportedOperationException(); + } + + @Override + public TYPE operationType() { + return TYPE.TX_OP; + } + + @Override + public int estimatedSizeInBytes() { + return 2 * Long.BYTES; + } + + } + public static class Get { private final boolean realtime; private final Term uid; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c47c6aac0b983..06c819a4ca4e2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2053,6 +2053,30 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E } } + @Override + public long startTransaction(String id) throws IOException { + Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())))); + return location.translogLocation; + } + + @Override + public boolean commitTransaction(String id, long transactionId) throws IOException { + translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); + return true; + } + + @Override + public boolean rollbackTransaction(String id, long transactionId) throws IOException { + translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); + return true; + } + + @Override + public boolean closeTransaction(String id, long transactionId) throws IOException { + translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); + return true; + } + private void pruneDeletedTombstones() { /* * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b1814317ebbc9..de543f48b0e4d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -527,6 +527,26 @@ public void skipTranslogRecovery() {} @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {} + @Override + public long startTransaction(String id) throws IOException { + return 0; + } + + @Override + public boolean commitTransaction(String id, long transactionId) throws IOException { + return false; + } + + @Override + public boolean rollbackTransaction(String id, long transactionId) throws IOException { + return false; + } + + @Override + public boolean closeTransaction(String id, long transactionId) throws IOException { + return false; + } + @Override public void maybePruneDeletes() {} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 298d4ea34e765..226b9a4770da3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -878,6 +878,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) { return previousState; } + public long startTransaction(String id) throws IOException { + return getEngine().startTransaction(id); + } + + public boolean commitTransaction(String id, long transactionId) throws IOException { + return getEngine().commitTransaction(id, transactionId); + } + + public boolean rollbackTransaction(String id, long transactionId) throws IOException { + return getEngine().rollbackTransaction(id, transactionId); + } + + public boolean closeTransaction(String id, long transactionId) throws IOException { + return getEngine().closeTransaction(id, transactionId); + } + public Engine.IndexResult applyIndexOperationOnPrimary( long version, VersionType versionType, diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 2a3af7872a166..c51ef18c3ae10 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1062,7 +1062,12 @@ enum Type { CREATE((byte) 1), INDEX((byte) 2), DELETE((byte) 3), - NO_OP((byte) 4); + NO_OP((byte) 4), + TX_START((byte) 5), + TX_PREPARE((byte) 6), + TX_COMMIT((byte) 7), + TX_ROLLBACK((byte) 8), + TX_CLOSE((byte) 9); private final byte id; @@ -1084,6 +1089,16 @@ public static Type fromId(byte id) { return DELETE; case 4: return NO_OP; + case 5: + return TX_START; + case 6: + return TX_PREPARE; + case 7: + return TX_COMMIT; + case 8: + return TX_ROLLBACK; + case 9: + return TX_CLOSE; default: throw new IllegalArgumentException("no type mapped for [" + id + "]"); } @@ -1115,6 +1130,16 @@ static Operation readOperation(final StreamInput input) throws IOException { return new Delete(input); case NO_OP: return new NoOp(input); + case TX_START: + return new TxStart(input); + case TX_PREPARE: + return new TxPrepare(input); + case TX_COMMIT: + return new TxCommit(input); + case TX_ROLLBACK: + return new TxRollback(input); + case TX_CLOSE: + return new TxClose(input); default: throw new AssertionError("no case for [" + type + "]"); } @@ -1137,11 +1162,25 @@ static void writeOperation(final StreamOutput output, final Operation operation) case NO_OP: ((NoOp) operation).write(output); break; + case TX_START: + ((TxStart) operation).write(output); + break; + case TX_PREPARE: + ((TxPrepare) operation).write(output); + break; + case TX_COMMIT: + ((TxCommit) operation).write(output); + break; + case TX_ROLLBACK: + ((TxRollback) operation).write(output); + break; + case TX_CLOSE: + ((TxClose) operation).write(output); + break; default: throw new AssertionError("no case for [" + operation.opType() + "]"); } } - } public static class Source { @@ -1156,6 +1195,215 @@ public Source(BytesReference source, String routing) { } + abstract static class TransactionBase implements Operation { + private final String id; + private final long seqNo; + + private TransactionBase(final StreamInput in) throws IOException { + in.readVInt(); // SERIALIZATION_FORMAT + this.id = in.readString(); + this.seqNo = in.readLong(); + } + + protected void write(final StreamOutput out) throws IOException { + out.writeVInt(1); + out.writeString(id); + out.writeLong(seqNo); + } + + TransactionBase(String id, Long seqNo) { + this.id = id; + this.seqNo = seqNo; + } + + @Override + public long estimateSize() { + return 2 * id.length() + Long.BYTES; + } + + @Override + public Source getSource() { + throw new UnsupportedOperationException("source does not exist for a no-op"); + } + + @Override + public long seqNo() { + return seqNo; + } + + @Override + public long primaryTerm() { + return 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TransactionBase tb = (TransactionBase) o; + + return id.equals(tb.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return "{" + "id='" + id + '}'; + } + } + + public static class TxStart extends TransactionBase { + private TxStart(final StreamInput in) throws IOException { + super(in); + } + + public TxStart(String id, long seqNo) { + super(id, seqNo); + } + + @Override + public Type opType() { + return Type.TX_START; + } + + @Override + public String toString() { + return "TxStart" + super.toString(); + } + } + + abstract static class WithTransaction extends TransactionBase { + private final long transactionId; + + private WithTransaction(final StreamInput in) throws IOException { + super(in); + this.transactionId = in.readLong(); + } + + WithTransaction(String id, long seqNo, long transactionId) { + super(id, seqNo); + this.transactionId = transactionId; + } + + protected void write(final StreamOutput out) throws IOException { + super.write(out); + out.writeLong(transactionId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return super.equals(o) && ((WithTransaction) o).transactionId == this.transactionId; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + Long.hashCode(transactionId); + return result; + } + + @Override + public String toString() { + return super.toString() + "[tx_id = " + transactionId + "]"; + } + } + + public static class TxPrepare extends WithTransaction { + private TxPrepare(final StreamInput in) throws IOException { + super(in); + } + + public TxPrepare(String id, long seqNo, long transactionId) { + super(id, seqNo, transactionId); + } + + @Override + public Type opType() { + return Type.TX_PREPARE; + } + + @Override + public String toString() { + return "TxPrepare" + super.toString(); + } + } + + public static class TxCommit extends WithTransaction { + private TxCommit(final StreamInput in) throws IOException { + super(in); + } + + public TxCommit(String id, long seqNo, long transactionId) { + super(id, seqNo, transactionId); + } + + @Override + public Type opType() { + return Type.TX_COMMIT; + } + + @Override + public String toString() { + return "TxCommit" + super.toString(); + } + } + + public static class TxRollback extends WithTransaction { + private TxRollback(final StreamInput in) throws IOException { + super(in); + } + + public TxRollback(String id, long seqNo, long transactionId) { + super(id, seqNo, transactionId); + } + + @Override + public Type opType() { + return Type.TX_ROLLBACK; + } + + @Override + public String toString() { + return "TxRollback" + super.toString(); + } + } + + public static class TxClose extends WithTransaction { + private TxClose(final StreamInput in) throws IOException { + super(in); + } + + public TxClose(String id, long seqNo, long transactionId) { + super(id, seqNo, transactionId); + } + + @Override + public Type opType() { + return Type.TX_CLOSE; + } + + @Override + public String toString() { + return "TxClose" + super.toString(); + } + } + public static class Index implements Operation { public static final int FORMAT_NO_PARENT = 9; // since 7.0