From bbba8aee2b646963d250d3659861113938190832 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Thu, 18 Nov 2021 17:43:21 -0500 Subject: [PATCH 1/2] Chain translog locations. --- .../action/bulk/TransportShardBulkAction.java | 34 ++-- .../elasticsearch/index/engine/Engine.java | 25 +-- .../index/engine/InternalEngine.java | 23 ++- .../index/engine/ReadOnlyEngine.java | 16 +- .../elasticsearch/index/shard/IndexShard.java | 41 +++-- .../index/shard/ShardTransactionRegistry.java | 3 +- .../index/translog/Translog.java | 164 ++++++++++++------ .../bulk/TransportShardBulkActionTests.java | 22 +-- 8 files changed, 199 insertions(+), 129 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index c8e225c5255a5..3f1be994761f8 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardTransactionRegistry; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.IndicesService; @@ -64,6 +65,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.LongSupplier; @@ -79,6 +81,7 @@ public class TransportShardBulkAction extends TransportWriteAction> waitForMappingUpdate, ActionListener itemDoneListener, - long transactionId + Translog.Location[] transactionId ) throws Exception { final DocWriteRequest.OpType opType = context.getCurrent().opType(); @@ -344,7 +350,7 @@ static boolean executeBulkItemRequest( request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm(), - transactionId + transactionId[0] ); } else { final IndexRequest request = context.getRequestToExecute(); @@ -363,9 +369,14 @@ static boolean executeBulkItemRequest( request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry(), - transactionId + transactionId[0] ); } + + if (result.getTranslogLocation() != null) { + transactionId[0] = result.getTranslogLocation(); + } + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { @@ -564,7 +575,8 @@ protected int replicaOperationCount(BulkShardRequest request) { return request.items().length; } - public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, long transactionId) throws Exception { + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica, Translog.Location transactionId) + throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { final BulkItemRequest item = request.items()[i]; @@ -604,7 +616,7 @@ private static Engine.Result performOpOnReplica( DocWriteResponse primaryResponse, DocWriteRequest docWriteRequest, IndexShard replica, - long transactionId + Translog.Location transactionId ) throws Exception { final Engine.Result result; switch (docWriteRequest.opType()) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7b59811db1a4f..902a09aa1d79a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -321,13 +321,13 @@ public Condition newCondition() { } } - public abstract long startTransaction(String id) throws IOException; + public abstract Translog.Location startTransaction(String id) throws IOException; - public abstract boolean commitTransaction(String id, long transactionId) throws IOException; + public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException; - public abstract boolean rollbackTransaction(String id, long transactionId) throws IOException; + public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException; - public abstract boolean closeTransaction(String id, long transactionId) throws IOException; + public abstract Translog.Location closeTransaction(Translog.Location prevId) throws IOException; /** * Perform document index operation on the engine @@ -1372,7 +1372,7 @@ public static class Index extends Operation { private final boolean isRetry; private final long ifSeqNo; private final long ifPrimaryTerm; - private final long transactionId; + private final Translog.Location transactionId; public Index( Term uid, @@ -1401,7 +1401,7 @@ public Index( isRetry, ifSeqNo, ifPrimaryTerm, - -1L + new Translog.Location(0, 0, 0) ); } @@ -1418,7 +1418,7 @@ public Index( boolean isRetry, long ifSeqNo, long ifPrimaryTerm, - long transactionId + Translog.Location transactionId ) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; @@ -1511,7 +1511,7 @@ public long getIfPrimaryTerm() { return ifPrimaryTerm; } - public long getTransactionId() { + public Translog.Location getTransactionId() { return transactionId; } } @@ -1521,7 +1521,7 @@ public static class Delete extends Operation { private final String id; private final long ifSeqNo; private final long ifPrimaryTerm; - private final long transactionId; + private final Translog.Location transactionId; public Delete( String id, @@ -1535,7 +1535,8 @@ public Delete( long ifSeqNo, long ifPrimaryTerm ) { - this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, -1L); + this(id, uid, seqNo, primaryTerm, version, versionType, origin, + startTime, ifSeqNo, ifPrimaryTerm, new Translog.Location(0, 0, 0)); } public Delete( @@ -1549,7 +1550,7 @@ public Delete( long startTime, long ifSeqNo, long ifPrimaryTerm, - long transactionId + Translog.Location transactionId ) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; @@ -1616,7 +1617,7 @@ public long getIfPrimaryTerm() { return ifPrimaryTerm; } - public long getTransactionId() { + public Translog.Location getTransactionId() { return transactionId; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 36bcf090a9662..a401289183ef7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2078,27 +2078,26 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E } @Override - public long startTransaction(String id) throws IOException { - Translog.Location location = translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())))); - return location.translogLocation; + public Translog.Location startTransaction(String id) throws IOException { + return translog.add(new Translog.TxStart(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())))); } @Override - public boolean commitTransaction(String id, long transactionId) throws IOException { - translog.add(new Translog.TxCommit(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); - return true; + public Translog.Location commitTransaction(Translog.Location prevId) throws IOException { + return translog.add( + new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); } @Override - public boolean rollbackTransaction(String id, long transactionId) throws IOException { - translog.add(new Translog.TxRollback(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); - return true; + public Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException { + return translog.add( + new Translog.TxRollback(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); } @Override - public boolean closeTransaction(String id, long transactionId) throws IOException { - translog.add(new Translog.TxClose(id, doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), transactionId)); - return true; + public Translog.Location closeTransaction(Translog.Location prevId) throws IOException { + return translog.add( + new Translog.TxClose(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); } private void pruneDeletedTombstones() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index de543f48b0e4d..55179a4575917 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -528,23 +528,23 @@ public void skipTranslogRecovery() {} public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {} @Override - public long startTransaction(String id) throws IOException { - return 0; + public Translog.Location startTransaction(String id) throws IOException { + return null; } @Override - public boolean commitTransaction(String id, long transactionId) throws IOException { - return false; + public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException { + return null; } @Override - public boolean rollbackTransaction(String id, long transactionId) throws IOException { - return false; + public Translog.Location rollbackTransaction(Translog.Location transactionId) throws IOException { + return null; } @Override - public boolean closeTransaction(String id, long transactionId) throws IOException { - return false; + public Translog.Location closeTransaction(Translog.Location transactionId) throws IOException { + return null; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cabac56b73525..c3830d0981101 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -275,7 +275,7 @@ Runnable getGlobalCheckpointSyncer() { IndexShardState.STARTED ); - public static final long NO_TRANSACTION_ID = -1L; + public static final Translog.Location NO_TRANSACTION_ID = new Translog.Location(0, 0, 0); private final CheckedFunction readerWrapper; @@ -882,20 +882,23 @@ private IndexShardState changeState(IndexShardState newState, String reason) { return previousState; } - public long startTransaction(String id) throws IOException { + public Translog.Location startTransaction(String id) throws IOException { return getEngine().startTransaction(id); } - public boolean commitTransaction(String id, long transactionId) throws IOException { - return getEngine().commitTransaction(id, transactionId); + public boolean commitTransaction(Translog.Location[] transactionId) throws IOException { + transactionId[0] = getEngine().commitTransaction(transactionId[0]); + return true; } - public boolean rollbackTransaction(String id, long transactionId) throws IOException { - return getEngine().rollbackTransaction(id, transactionId); + public boolean rollbackTransaction(Translog.Location[] transactionId) throws IOException { + transactionId[0] = getEngine().rollbackTransaction(transactionId[0]); + return true; } - public boolean closeTransaction(String id, long transactionId) throws IOException { - return getEngine().closeTransaction(id, transactionId); + public boolean closeTransaction(Translog.Location[] transactionId) throws IOException { + transactionId[0] = getEngine().closeTransaction(transactionId[0]); + return true; } public Engine.IndexResult applyIndexOperationOnPrimary( @@ -906,7 +909,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary( long ifPrimaryTerm, long autoGeneratedTimestamp, boolean isRetry, - long transactionId + Translog.Location transactionId ) throws IOException { assert versionType.validateVersionForWrites(version); return applyIndexOperation( @@ -932,7 +935,7 @@ public Engine.IndexResult applyIndexOperationOnReplica( long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse, - long transactionId + Translog.Location transactionId ) throws IOException { return applyIndexOperation( getEngine(), @@ -962,7 +965,7 @@ private Engine.IndexResult applyIndexOperation( boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse, - long transactionId + Translog.Location transactionId ) throws IOException { assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; @@ -1011,7 +1014,7 @@ public static Engine.Index prepareIndex( boolean isRetry, long ifSeqNo, long ifPrimaryTerm, - long transactionId + Translog.Location transactionId ) { long startTime = System.nanoTime(); assert source.dynamicTemplates().isEmpty() || origin == Engine.Operation.Origin.PRIMARY @@ -1139,7 +1142,7 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary( VersionType versionType, long ifSeqNo, long ifPrimaryTerm, - long transactionId + Translog.Location transactionId ) throws IOException { assert versionType.validateVersionForWrites(version); return applyDeleteOperation( @@ -1156,7 +1159,8 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary( ); } - public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id, long transactionId) + public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, + long version, String id, Translog.Location transactionId) throws IOException { return applyDeleteOperation( getEngine(), @@ -1182,7 +1186,7 @@ private Engine.DeleteResult applyDeleteOperation( long ifSeqNo, long ifPrimaryTerm, Engine.Operation.Origin origin, - long transactionId + Translog.Location transactionId ) throws IOException { assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]"; @@ -1210,11 +1214,12 @@ public static Engine.Delete prepareDelete( Engine.Operation.Origin origin, long ifSeqNo, long ifPrimaryTerm, - long transactionId + Translog.Location transactionId ) { long startTime = System.nanoTime(); final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); - return new Engine.Delete(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm); + return new Engine.Delete(id, uid, seqNo, primaryTerm, version, + versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, transactionId); } private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { @@ -4135,7 +4140,7 @@ public void registerTransaction(TxID id, Set keys) { public Map prepareCommit(TxID txID) { // todo: lookup in transaction table transactionRegistry.prepare(txID); - return Collections.EMPTY_MAP; + return (Map)Collections.EMPTY_MAP; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java b/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java index 3fcc0f0fd495a..a7878d8781922 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardTransactionRegistry.java @@ -50,7 +50,8 @@ public synchronized Map prepare(TxID txID) { prepared.add(txID); Set conflictingKeys = conflictingKeysByTxID.get(txID); if (conflictingKeys != null) { - return conflictingKeys.stream().flatMap(id -> byKey.get(id).stream()).filter(conflict -> conflict.equals(txID) == false).collect(Collectors.toMap(Function.identity(), + return conflictingKeys.stream().flatMap(id -> byKey.get(id).stream()) + .filter(conflict -> conflict.equals(txID) == false).collect(Collectors.toMap(Function.identity(), this::winConflict)); } else { return Map.of(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 02e90fc1a649c..937e06b506dcc 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -927,6 +927,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { public static class Location implements Comparable { + pu + public final long generation; public final long translogLocation; public final int size; @@ -978,6 +980,16 @@ public int hashCode() { result = 31 * result + size; return result; } + + public String id() { + return new StringBuilder() + .append(generation) + .append('.') + .append(translogLocation) + .append('.') + .append(size) + .toString(); + } } /** @@ -1196,29 +1208,25 @@ public Source(BytesReference source, String routing) { } abstract static class TransactionBase implements Operation { - private final String id; private final long seqNo; private TransactionBase(final StreamInput in) throws IOException { in.readVInt(); // SERIALIZATION_FORMAT - this.id = in.readString(); this.seqNo = in.readLong(); } protected void write(final StreamOutput out) throws IOException { out.writeVInt(1); - out.writeString(id); out.writeLong(seqNo); } - TransactionBase(String id, Long seqNo) { - this.id = id; + TransactionBase(Long seqNo) { this.seqNo = seqNo; } @Override public long estimateSize() { - return 2 * id.length() + Long.BYTES; + return Long.BYTES; } @Override @@ -1246,28 +1254,36 @@ public boolean equals(Object o) { } TransactionBase tb = (TransactionBase) o; - - return id.equals(tb.id); + return seqNo == tb.seqNo; } @Override public int hashCode() { - return id.hashCode(); + return (int)seqNo; } @Override public String toString() { - return "{" + "id='" + id + '}'; + return "{" + "seqNo='" + seqNo + '}'; } } public static class TxStart extends TransactionBase { + private final String id; + private TxStart(final StreamInput in) throws IOException { super(in); + this.id = in.readString(); } public TxStart(String id, long seqNo) { - super(id, seqNo); + super(seqNo); + this.id = id; + } + + protected void write(final StreamOutput out) throws IOException { + super.write(out); + out.writeString(id); } @Override @@ -1277,26 +1293,34 @@ public Type opType() { @Override public String toString() { - return "TxStart" + super.toString(); + return "TxStart" + super.toString() + "[id=" + this.id + "]"; } } abstract static class WithTransaction extends TransactionBase { - private final long transactionId; + public final long generation; + public final long translogLocation; + public final int size; private WithTransaction(final StreamInput in) throws IOException { super(in); - this.transactionId = in.readLong(); + this.translogLocation = in.readLong(); + this.generation = in.readLong(); + this.size = in.readInt(); } - WithTransaction(String id, long seqNo, long transactionId) { - super(id, seqNo); - this.transactionId = transactionId; + WithTransaction(long seqNo, long translogLocation, long generation, int size) { + super(seqNo); + this.translogLocation = translogLocation; + this.generation = generation; + this.size = size; } protected void write(final StreamOutput out) throws IOException { super.write(out); - out.writeLong(transactionId); + out.writeLong(translogLocation); + out.writeLong(generation); + out.writeInt(size); } @Override @@ -1308,19 +1332,24 @@ public boolean equals(Object o) { return false; } - return super.equals(o) && ((WithTransaction) o).transactionId == this.transactionId; + return super.equals(o) && + ((WithTransaction) o).translogLocation == this.translogLocation && + ((WithTransaction) o).generation == this.generation && + ((WithTransaction) o).size == this.size; } @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + Long.hashCode(transactionId); + result = 31 * result + Long.hashCode(translogLocation); + result = 31 * result + Long.hashCode(generation); + result = 31 * result + Long.hashCode(size); return result; } @Override public String toString() { - return super.toString() + "[tx_id = " + transactionId + "]"; + return super.toString() + "[tx_id = " + translogLocation + "]"; } } @@ -1329,8 +1358,8 @@ private TxPrepare(final StreamInput in) throws IOException { super(in); } - public TxPrepare(String id, long seqNo, long transactionId) { - super(id, seqNo, transactionId); + public TxPrepare(long seqNo, Location prevLoc) { + super(seqNo, prevLoc.translogLocation, prevLoc.generation, prevLoc.size); } @Override @@ -1349,8 +1378,8 @@ private TxCommit(final StreamInput in) throws IOException { super(in); } - public TxCommit(String id, long seqNo, long transactionId) { - super(id, seqNo, transactionId); + public TxCommit(long seqNo, Location prevLoc) { + super(seqNo, prevLoc.translogLocation, prevLoc.generation, prevLoc.size); } @Override @@ -1369,8 +1398,8 @@ private TxRollback(final StreamInput in) throws IOException { super(in); } - public TxRollback(String id, long seqNo, long transactionId) { - super(id, seqNo, transactionId); + public TxRollback(long seqNo, Location prevLoc) { + super(seqNo, prevLoc.translogLocation, prevLoc.generation, prevLoc.size); } @Override @@ -1389,8 +1418,8 @@ private TxClose(final StreamInput in) throws IOException { super(in); } - public TxClose(String id, long seqNo, long transactionId) { - super(id, seqNo, transactionId); + public TxClose(long seqNo, Location prevLoc) { + super(seqNo, prevLoc.translogLocation, prevLoc.generation, prevLoc.size); } @Override @@ -1418,7 +1447,9 @@ public static class Index implements Operation { private final long version; private final BytesReference source; private final String routing; - private final long transactionId; + private final long translogLocation; + private final long txGeneration; + private final int txSize; private Index(final StreamInput in) throws IOException { final int format = in.readVInt(); // SERIALIZATION_FORMAT @@ -1440,7 +1471,9 @@ private Index(final StreamInput in) throws IOException { this.autoGeneratedIdTimestamp = in.readLong(); seqNo = in.readLong(); primaryTerm = in.readLong(); - transactionId = in.readLong(); + translogLocation = in.readLong(); + txGeneration = in.readLong(); + txSize = in.readInt(); } public Index(Engine.Index index, Engine.IndexResult indexResult) { @@ -1451,15 +1484,18 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { this.primaryTerm = index.primaryTerm(); this.version = indexResult.getVersion(); this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp(); - this.transactionId = index.getTransactionId(); + Location loc = index.getTransactionId(); + this.translogLocation = loc.translogLocation; + this.txGeneration = loc.generation; + this.txSize = loc.size; } public Index(String id, long seqNo, long primaryTerm, byte[] source) { - this(id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1, -1); + this(id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1, new Location(0, 0, 0)); } public Index(String id, long seqNo, long primaryTerm, long version, byte[] source, String routing, long autoGeneratedIdTimestamp) { - this(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp, -1L); + this(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp, new Location(0, 0, 0)); } public Index( @@ -1470,7 +1506,7 @@ public Index( byte[] source, String routing, long autoGeneratedIdTimestamp, - long transactionId + Translog.Location transactionId ) { this.id = id; this.source = new BytesArray(source); @@ -1479,7 +1515,9 @@ public Index( this.version = version; this.routing = routing; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; - this.transactionId = transactionId; + this.translogLocation = transactionId.translogLocation; + this.txGeneration = transactionId.generation; + this.txSize = transactionId.size; } @Override @@ -1489,7 +1527,7 @@ public Type opType() { @Override public long estimateSize() { - return (2 * id.length()) + source.length() + (routing != null ? 2 * routing.length() : 0) + (5 * Long.BYTES); // timestamp, + return (2 * id.length()) + source.length() + (routing != null ? 2 * routing.length() : 0) + (7 * Long.BYTES); // timestamp, // seq_no, // primary_term, // version @@ -1546,7 +1584,9 @@ private void write(final StreamOutput out) throws IOException { out.writeLong(autoGeneratedIdTimestamp); out.writeLong(seqNo); out.writeLong(primaryTerm); - out.writeLong(transactionId); + out.writeLong(translogLocation); + out.writeLong(txGeneration); + out.writeInt(txSize); } @Override @@ -1565,7 +1605,8 @@ public boolean equals(Object o) { || primaryTerm != index.primaryTerm || id.equals(index.id) == false || autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp - || transactionId != index.transactionId + || translogLocation != index.translogLocation + || txGeneration != index.txGeneration || source.equals(index.source) == false) { return false; } @@ -1578,7 +1619,8 @@ public int hashCode() { result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + Long.hashCode(transactionId); + result = 31 * result + Long.hashCode(translogLocation); + result = 31 * result + Long.hashCode(txGeneration); result = 31 * result + source.hashCode(); result = 31 * result + (routing != null ? routing.hashCode() : 0); result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp); @@ -1600,7 +1642,7 @@ public String toString() { + ", autoGeneratedIdTimestamp=" + autoGeneratedIdTimestamp + ", transactionId=" - + transactionId + + translogLocation + '}'; } @@ -1608,8 +1650,8 @@ public long getAutoGeneratedIdTimestamp() { return autoGeneratedIdTimestamp; } - public long getTransactionId() { - return transactionId; + public Location getTransactionId() { + return new Location(txGeneration, translogLocation, txSize); } } @@ -1625,7 +1667,9 @@ public static class Delete implements Operation { private final long seqNo; private final long primaryTerm; private final long version; - private final long transactionId; + private final long translogLocation; + private final long txGeneration; + private final int txSize; private Delete(final StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT @@ -1646,7 +1690,9 @@ private Delete(final StreamInput in) throws IOException { } seqNo = in.readLong(); primaryTerm = in.readLong(); - transactionId = in.readLong(); + translogLocation = in.readLong(); + txGeneration = in.readLong(); + txSize = in.readInt(); } public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { @@ -1655,19 +1701,21 @@ public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { /** utility for testing */ public Delete(String id, long seqNo, long primaryTerm) { - this(id, seqNo, primaryTerm, Versions.MATCH_ANY, -1L); + this(id, seqNo, primaryTerm, Versions.MATCH_ANY, new Location(0, 0, 0)); } public Delete(String id, long seqNo, long primaryTerm, long version) { - this(id, seqNo, primaryTerm, version, -1L); + this(id, seqNo, primaryTerm, version, new Location(0, 0, 0)); } - public Delete(String id, long seqNo, long primaryTerm, long version, long transactionId) { + public Delete(String id, long seqNo, long primaryTerm, long version, Location transactionId) { this.id = Objects.requireNonNull(id); this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; - this.transactionId = transactionId; + this.translogLocation = transactionId.translogLocation; + this.txGeneration = transactionId.generation; + this.txSize = transactionId.size; } @Override @@ -1677,7 +1725,7 @@ public Type opType() { @Override public long estimateSize() { - return (2 * id.length()) + (4 * Long.BYTES); // seq_no, primary_term, v1ersion, transactionId; + return (2 * id.length()) + (6 * Long.BYTES); // seq_no, primary_term, v1ersion, transactionId; } public String id() { @@ -1698,8 +1746,8 @@ public long version() { return this.version; } - public long getTransactionId() { - return this.transactionId; + public Location getTransactionId() { + return new Location(txGeneration, translogLocation, txSize); } @Override @@ -1724,7 +1772,9 @@ private void write(final StreamOutput out) throws IOException { } out.writeLong(seqNo); out.writeLong(primaryTerm); - out.writeLong(transactionId); + out.writeLong(translogLocation); + out.writeLong(txGeneration); + out.writeInt(txSize); } @Override @@ -1742,7 +1792,8 @@ public boolean equals(Object o) { && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && version == delete.version - && transactionId == delete.transactionId; + && translogLocation == delete.translogLocation + && txGeneration == delete.txGeneration; } @Override @@ -1751,7 +1802,8 @@ public int hashCode() { result += 31 * Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + Long.hashCode(transactionId); + result = 31 * result + Long.hashCode(translogLocation); + result = 31 * result + Long.hashCode(txGeneration); return result; } @@ -1767,7 +1819,7 @@ public String toString() { + ", version=" + version + ", transactionId=" - + transactionId + + translogLocation + '}'; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 31d0b8a833eda..4344f8e36ec67 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -254,7 +254,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.shardId()).thenReturn(shardId); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(mappingUpdate); when(shard.mapperService()).thenReturn(mock(MapperService.class)); @@ -283,10 +283,10 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { anyLong(), anyLong(), anyBoolean(), - anyLong() + any() ); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest( @@ -308,7 +308,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { anyLong(), anyLong(), anyBoolean(), - anyLong() + any() ); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); @@ -534,7 +534,7 @@ public void testUpdateRequestWithFailure() throws Exception { Exception err = new ElasticsearchException("I'm dead <(x.x)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); @@ -590,7 +590,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); @@ -645,7 +645,7 @@ public void testUpdateRequestWithSuccess() throws Exception { Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult indexResult = new FakeIndexResult(1, 1, 13, created, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(indexResult); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); @@ -702,7 +702,7 @@ public void testUpdateWithDelete() throws Exception { final long resultSeqNo = 13; Engine.DeleteResult deleteResult = new FakeDeleteResult(1, 1, resultSeqNo, found, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong())).thenReturn(deleteResult); + when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), any())).thenReturn(deleteResult); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); @@ -862,7 +862,7 @@ public void testRetries() throws Exception { Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenAnswer(ir -> { if (randomBoolean()) { return conflictedResult; @@ -957,7 +957,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.shardId()).thenReturn(shardId); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), anyLong())) + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean(), any())) .thenReturn(success1, mappingUpdate, success2); when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod(); when(shard.mapperService()).thenReturn(mock(MapperService.class)); @@ -1004,7 +1004,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { anyLong(), anyLong(), anyBoolean(), - anyLong() + any() ); BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse(); From 816ab604c9fa5b7c7918b4aee58bbb0d071e0d62 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Thu, 18 Nov 2021 19:49:14 -0500 Subject: [PATCH 2/2] Chain translog locations. --- .../action/bulk/TransportShardBulkAction.java | 3 +-- .../org/elasticsearch/action/bulk/TxID.java | 4 +++ .../index/engine/InternalEngine.java | 25 ++++++++++++++++++- .../elasticsearch/index/shard/IndexShard.java | 3 ++- .../index/translog/Translog.java | 24 ++++++++++++------ 5 files changed, 47 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3f1be994761f8..e5d12eecb0241 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -189,11 +189,10 @@ public static void performOnPrimary( @Override protected void doRun() throws Exception { - String uid = UUIDs.base64UUID(); TxID txID1 = TxID.create(); Translog.Location[] transactionId = new Translog.Location[1]; try { - transactionId[0] = primary.startTransaction(uid); + transactionId[0] = primary.startTransaction(txID1.id()); transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id())); while (context.hasMoreOperationsToExecute()) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TxID.java b/server/src/main/java/org/elasticsearch/action/bulk/TxID.java index b265ab4a8988c..295bf19acd6c0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TxID.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TxID.java @@ -59,4 +59,8 @@ public int hashCode() { public String toString() { return "[tx=" + id + "]"; } + + public String id() { + return id; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a401289183ef7..c027cbc3f8680 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -982,7 +982,8 @@ public IndexResult index(Index index) throws IOException { index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), - index.getIfPrimaryTerm() + index.getIfPrimaryTerm(), + index.getTransactionId() ); final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; @@ -2084,8 +2085,30 @@ public Translog.Location startTransaction(String id) throws IOException { @Override public Translog.Location commitTransaction(Translog.Location prevId) throws IOException { + Translog.Location loc = prevId; + + while (loc != null) { + Translog.Operation op = translog.readOperation(loc); + if (op == null) { + logger.error("Couldn't read translog location " + loc); + break; + } + + logger.info("Committing op " + op); + + if (op instanceof Translog.TransactionMember) { + loc = ((Translog.TransactionMember)op).getTransactionId(); + } else if (op instanceof Translog.TxStart) { + break; + } else { + logger.error("Found op that doesn't have transaction loc?"); + break; + } + } + return translog.add( new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId)); + } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c3830d0981101..20c2d6d2f795a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -162,6 +162,7 @@ import java.util.Collections; import java.util.Deque; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -4140,7 +4141,7 @@ public void registerTransaction(TxID id, Set keys) { public Map prepareCommit(TxID txID) { // todo: lookup in transaction table transactionRegistry.prepare(txID); - return (Map)Collections.EMPTY_MAP; + return new HashMap(); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 937e06b506dcc..8cf26eada0deb 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -927,7 +927,7 @@ public TranslogDeletionPolicy getDeletionPolicy() { public static class Location implements Comparable { - pu + public static Location NO_LOCATION = new Location(-1L, -1L, 0); public final long generation; public final long translogLocation; @@ -1207,6 +1207,10 @@ public Source(BytesReference source, String routing) { } + public interface TransactionMember { + Location getTransactionId(); + } + abstract static class TransactionBase implements Operation { private final long seqNo; @@ -1297,7 +1301,7 @@ public String toString() { } } - abstract static class WithTransaction extends TransactionBase { + abstract static class WithTransaction extends TransactionBase implements TransactionMember { public final long generation; public final long translogLocation; public final int size; @@ -1351,6 +1355,10 @@ public int hashCode() { public String toString() { return super.toString() + "[tx_id = " + translogLocation + "]"; } + + public Location getTransactionId() { + return new Location(generation, translogLocation, size); + } } public static class TxPrepare extends WithTransaction { @@ -1433,7 +1441,7 @@ public String toString() { } } - public static class Index implements Operation { + public static class Index implements Operation, TransactionMember { public static final int FORMAT_NO_PARENT = 9; // since 7.0 public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; @@ -1491,11 +1499,11 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { } public Index(String id, long seqNo, long primaryTerm, byte[] source) { - this(id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1, new Location(0, 0, 0)); + this(id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1, Location.NO_LOCATION); } public Index(String id, long seqNo, long primaryTerm, long version, byte[] source, String routing, long autoGeneratedIdTimestamp) { - this(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp, new Location(0, 0, 0)); + this(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp, Location.NO_LOCATION); } public Index( @@ -1655,7 +1663,7 @@ public Location getTransactionId() { } } - public static class Delete implements Operation { + public static class Delete implements Operation, TransactionMember { private static final int FORMAT_6_0 = 4; // 6.0 - * public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 @@ -1701,11 +1709,11 @@ public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { /** utility for testing */ public Delete(String id, long seqNo, long primaryTerm) { - this(id, seqNo, primaryTerm, Versions.MATCH_ANY, new Location(0, 0, 0)); + this(id, seqNo, primaryTerm, Versions.MATCH_ANY, Location.NO_LOCATION); } public Delete(String id, long seqNo, long primaryTerm, long version) { - this(id, seqNo, primaryTerm, version, new Location(0, 0, 0)); + this(id, seqNo, primaryTerm, version, Location.NO_LOCATION); } public Delete(String id, long seqNo, long primaryTerm, long version, Location transactionId) {