diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 6940dfe1bd97e..26f1cd9d2b0d9 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -334,7 +334,6 @@
-
diff --git a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
index d86c4491b63e9..6d941873ff30a 100644
--- a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
+++ b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
@@ -19,6 +19,7 @@
package org.elasticsearch.index.translog;
+import com.carrotsearch.hppc.LongLongMap;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import java.io.IOException;
@@ -77,15 +78,30 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO
return size;
}
+ /**
+ * Creates a new snapshot.
+ *
+ * @return a snapshot
+ */
public TranslogSnapshot newSnapshot() {
return new TranslogSnapshot(this, sizeInBytes());
}
+ /**
+ * Creates a new random-access snapshot using the specified index to access the underlying snapshot by sequence number.
+ *
+ * @param index the random-access index for this snapshot
+ * @return a random-access snapshot
+ */
+ public RandomAccessTranslogSnapshot newRandomAccessSnapshot(final LongLongMap index) {
+ return new RandomAccessTranslogSnapshot(this, sizeInBytes(), index);
+ }
+
/**
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
*/
- protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
+ final BufferedChecksumStreamInput checksumStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
@@ -106,7 +122,7 @@ protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws I
/**
* reads bytes at position into the given buffer, filling it.
*/
- protected abstract void readBytes(ByteBuffer buffer, long position) throws IOException;
+ protected abstract void readBytes(ByteBuffer buffer, long position) throws IOException;
@Override
public String toString() {
diff --git a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java
index 910d71a51a0a7..c7cd6f468239e 100644
--- a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java
+++ b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java
@@ -66,8 +66,9 @@ public int overriddenOperations() {
public Translog.Operation next() throws IOException {
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
- Translog.Operation op;
- while ((op = current.next()) != null) {
+ Translog.OperationWithPosition it;
+ while ((it = current.next()) != null) {
+ final Translog.Operation op = it.operation();
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
diff --git a/server/src/main/java/org/elasticsearch/index/translog/RandomAccessMultiSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/RandomAccessMultiSnapshot.java
new file mode 100644
index 0000000000000..811db03d9b6b4
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/translog/RandomAccessMultiSnapshot.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A random-access multi-snapshot providing random access to a collection of indexed snapshots.
+ */
+final class RandomAccessMultiSnapshot implements Translog.RandomAccessSnapshot {
+
+ private final RandomAccessTranslogSnapshot[] snapshots;
+ private final Closeable onClose;
+
+ /**
+ * Creates a new random-access multi-snapshot from the specified random-access snapshots.
+ *
+ * @param snapshots the random-access snapshots to wrap in this multi-snapshot
+ * @param onClose resource to close when this snapshot is closed
+ */
+ RandomAccessMultiSnapshot(final RandomAccessTranslogSnapshot[] snapshots, final Closeable onClose) {
+ this.snapshots = snapshots;
+ this.onClose = onClose;
+ }
+
+ @Override
+ public Translog.Operation operation(final long seqNo) throws IOException {
+ for (int i = snapshots.length - 1; i >= 0; i--) {
+ final Translog.Operation operation = snapshots[i].operation(seqNo);
+ if (operation != null) {
+ return operation;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ onClose.close();
+ }
+
+}
diff --git a/server/src/main/java/org/elasticsearch/index/translog/RandomAccessTranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/RandomAccessTranslogSnapshot.java
new file mode 100644
index 0000000000000..703d6f7218d22
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/translog/RandomAccessTranslogSnapshot.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import com.carrotsearch.hppc.LongLongMap;
+
+import java.io.IOException;
+
+/**
+ * A random-access iterator over a fixed snapshot of a single translog generation.
+ */
+final class RandomAccessTranslogSnapshot extends TranslogSnapshotReader {
+
+ private final LongLongMap index;
+
+ /**
+ * Create a snapshot of the translog file channel. This gives a random-access iterator over the operations in the snapshot, indexed by
+ * the specified map from sequence number to position.
+ *
+ * @param reader the underlying reader
+ * @param length the size in bytes of the underlying snapshot
+ * @param index the random-access index from sequence number to position for this snapshot
+ */
+ RandomAccessTranslogSnapshot(final BaseTranslogReader reader, final long length, final LongLongMap index) {
+ super(reader, length);
+ this.index = index;
+ }
+
+ Translog.Operation operation(final long seqNo) throws IOException {
+ if (index.containsKey(seqNo)) {
+ return readOperation(index.get(seqNo));
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java
index ccc478e258928..ed07e49e6534a 100644
--- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -19,11 +19,14 @@
package org.elasticsearch.index.translog;
+import com.carrotsearch.hppc.LongLongHashMap;
+import com.carrotsearch.hppc.LongLongMap;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.UUIDs;
@@ -61,12 +64,18 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -123,6 +132,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final String translogUUID;
private final TranslogDeletionPolicy deletionPolicy;
+ private final ConcurrentHashMap> randomAccessIndexes = new ConcurrentHashMap<>();
+
+ LongLongMap getRandomAccessIndexForGeneration(final long generation) throws ExecutionException, InterruptedException {
+ final Future future = randomAccessIndexes.get(generation);
+ return future == null ? null : future.get();
+ }
+
+ private LongLongMap currentRandomAccessIndex;
+
+ LongLongMap getRandomAccessIndexForCurrentGeneration() {
+ try (Releasable ignored = readLock.acquire()) {
+ return currentRandomAccessIndex;
+ }
+ }
+
/**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
* {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If
@@ -466,6 +490,11 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
* @param initialGlobalCheckpoint the global checkpoint to be written in the first checkpoint.
*/
TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint) throws IOException {
+ final LongLongMap index = new LongLongHashMap();
+ final CompletableFuture future = new CompletableFuture<>();
+ future.complete(index);
+ currentRandomAccessIndex = index;
+ randomAccessIndexes.put(fileGeneration, future);
final TranslogWriter newFile;
try {
newFile = TranslogWriter.create(
@@ -504,7 +533,7 @@ public Location add(final Operation operation) throws IOException {
final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
- return current.add(bytes, operation.seqNo());
+ return current.add(bytes, operation.seqNo(), position -> currentRandomAccessIndex.put(operation.seqNo(), position));
}
} catch (final AlreadyClosedException | IOException ex) {
try {
@@ -595,7 +624,7 @@ public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
- return newMultiSnapshot(snapshots);
+ return newMultiSnapshot(snapshots, MultiSnapshot::new);
}
}
@@ -616,22 +645,101 @@ public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOExcept
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
.map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
- return newMultiSnapshot(snapshots);
+ return newMultiSnapshot(snapshots, MultiSnapshot::new);
}
}
- private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException {
+ /**
+ * Returns a random-access snapshot to operations in the translog with sequence number between the specified range (inclusive).
+ *
+ * @param minSeqNo the minimum sequence number in the snapshot
+ * @param maxSeqNo the maximum sequence number in the snapshot
+ * @return a random-access snapshot
+ * @throws IOException if an I/O exception occurs building the underlying random-access index
+ */
+ public RandomAccessSnapshot getRandomAccessSnapshot(final long minSeqNo, final long maxSeqNo) throws IOException {
+ if (minSeqNo < 0) {
+ throw new IllegalArgumentException("minSeqNo [" + minSeqNo + "] must be positive");
+ }
+ if (maxSeqNo < minSeqNo) {
+ throw new IllegalArgumentException("maxSeqNo [" + maxSeqNo + "] must be at least minSeqNo [" + minSeqNo + "]");
+ }
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ final List readersBetweenMinAndMaxSeqNo =
+ readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo).collect(Collectors.toList());
+ final List snapshots = new ArrayList<>();
+ for (final BaseTranslogReader reader : readersBetweenMinAndMaxSeqNo) {
+ snapshots.add(reader.newRandomAccessSnapshot(getRandomAccessIndex(reader.getGeneration())));
+ }
+ return newMultiSnapshot(snapshots.toArray(new RandomAccessTranslogSnapshot[0]), RandomAccessMultiSnapshot::new);
+ }
+ }
+
+ /**
+ * Gets or builds a random-access index to a snapshot for the specified generation.
+ *
+ * @param generation the generation
+ * @return a random-access index from sequence number to position
+ * @throws IOException if an I/O exception occurs building the underlying random-access index
+ */
+ private LongLongMap getRandomAccessIndex(final long generation) throws IOException {
+ assert readLock.isHeldByCurrentThread();
+ // only one thread can win the race to put a future to the index in the map; all other threads will block until the index is built
+ final CompletableFuture value = new CompletableFuture<>();
+ final CompletableFuture future = randomAccessIndexes.putIfAbsent(generation, value);
+ if (future == null) {
+ final LongLongMap index = new LongLongHashMap();
+ final Optional optional = readers.stream().filter(r -> r.getGeneration() == generation).findFirst();
+ if (optional.isPresent()) {
+ final BaseTranslogReader reader = optional.get();
+ final TranslogSnapshot snapshot = reader.newSnapshot();
+ Translog.OperationWithPosition it;
+ while ((it = snapshot.next()) != null) {
+ index.put(it.operation().seqNo(), it.position());
+ }
+ value.complete(index);
+ return index;
+ } else {
+ throw new IllegalStateException("could not find translog reader matching generation [" + generation + "]");
+ }
+ } else {
+ try {
+ return future.get();
+ } catch (final ExecutionException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ /**
+ * Construct a new multi-snapshot over the specified snapshots.
+ *
+ * @param snapshots the snapshots to form a multi-snapshot over
+ * @param constructor the multi-snapshot constructor
+ * @param the type of the multi-snapshot
+ * @param the type of the underlying snapshots
+ * @return a multi-snapshot over the specified snapshots
+ * @throws IOException if an I/O exception occurs closing any acquired resources
+ */
+ private T newMultiSnapshot(
+ final U[] snapshots,
+ final BiFunction constructor) throws IOException {
final Closeable onClose;
if (snapshots.length == 0) {
onClose = () -> {};
} else {
- assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get()
- == snapshots[0].generation : "first reader generation of " + snapshots + " is not the smallest";
+ if (Assertions.ENABLED) {
+ final long[] generations = Arrays.stream(snapshots).mapToLong(BaseTranslogReader::getGeneration).toArray();
+ assert generations.length > 0;
+ assert Arrays.stream(generations).boxed().min(Long::compareTo).get()
+ == snapshots[0].generation : "first reader generation of " + Arrays.toString(generations) + " is not the smallest";
+ }
onClose = acquireTranslogGenFromDeletionPolicy(snapshots[0].generation);
}
boolean success = false;
try {
- Snapshot result = new MultiSnapshot(snapshots, onClose);
+ T result = constructor.apply(snapshots, onClose);
success = true;
return result;
} finally {
@@ -840,6 +948,10 @@ public int hashCode() {
}
}
+ public interface RandomAccessSnapshot extends Closeable {
+ Translog.Operation operation(long seqNo) throws IOException;
+ }
+
/**
* A snapshot of the transaction log, allows to iterate over all the transaction log operations.
*/
@@ -958,6 +1070,30 @@ static void writeOperation(final StreamOutput output, final Operation operation)
}
+ /**
+ * Wrapper for a translog operation with its position in the translog generation to which it belongs.
+ */
+ static final class OperationWithPosition {
+
+ private final Translog.Operation operation;
+
+ public Operation operation() {
+ return operation;
+ }
+
+ private final long position;
+
+ public long position() {
+ return position;
+ }
+
+ OperationWithPosition(final Operation operation, final long position) {
+ this.operation = operation;
+ this.position = position;
+ }
+
+ }
+
public static class Source {
public final BytesReference source;
@@ -1608,6 +1744,7 @@ public void trimUnreferencedReaders() throws IOException {
// but crashed before we could delete the file.
current.sync();
deleteReaderFiles(reader);
+ randomAccessIndexes.remove(reader.getGeneration());
}
assert readers.isEmpty() == false || current.generation == minReferencedGen :
"all readers were cleaned but the minReferenceGen [" + minReferencedGen + "] is not the current writer's gen [" +
diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
index 656772fa8169d..756829e893ac5 100644
--- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
+++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
@@ -18,91 +18,36 @@
*/
package org.elasticsearch.index.translog;
-import org.elasticsearch.common.io.Channels;
-
-import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
-final class TranslogSnapshot extends BaseTranslogReader {
-
- private final int totalOperations;
- private final Checkpoint checkpoint;
- protected final long length;
- private final ByteBuffer reusableBuffer;
- private long position;
- private int readOperations;
- private BufferedChecksumStreamInput reuse;
+/**
+ * A forward-only iterator over a fixed snapshot of a single translog generation.
+ */
+final class TranslogSnapshot extends TranslogSnapshotReader {
/**
- * Create a snapshot of translog file channel.
+ * Create a snapshot of the translog file channel. This gives a forward-only iterator over the operations in the snapshot.
+ *
+ * @param reader the underlying reader
+ * @param length the size in bytes of the underlying snapshot
*/
TranslogSnapshot(final BaseTranslogReader reader, final long length) {
- super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset);
- this.length = length;
- this.totalOperations = reader.totalOperations();
- this.checkpoint = reader.getCheckpoint();
- this.reusableBuffer = ByteBuffer.allocate(1024);
- readOperations = 0;
- position = firstOperationOffset;
- reuse = null;
- }
-
- @Override
- public int totalOperations() {
- return totalOperations;
+ super(reader, length);
}
- @Override
- Checkpoint getCheckpoint() {
- return checkpoint;
- }
-
- public Translog.Operation next() throws IOException {
- if (readOperations < totalOperations) {
+ /**
+ * The current operation. The iterator is advanced to the next operation. If the iterator reaches the end of the snapshot, further
+ * invocations of this method will return null.
+ *
+ * @return the current operation, or null if at the end of this this snapshot
+ * @throws IOException if an I/O exception occurs reading the snapshot
+ */
+ Translog.OperationWithPosition next() throws IOException {
+ if (getReadOperations() < totalOperations()) {
return readOperation();
} else {
return null;
}
}
- protected Translog.Operation readOperation() throws IOException {
- final int opSize = readSize(reusableBuffer, position);
- reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
- Translog.Operation op = read(reuse);
- position += opSize;
- readOperations++;
- return op;
- }
-
- public long sizeInBytes() {
- return length;
- }
-
- /**
- * reads an operation at the given position into the given buffer.
- */
- protected void readBytes(ByteBuffer buffer, long position) throws IOException {
- if (position >= length) {
- throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" + getGeneration() + "], path: [" + path + "]");
- }
- if (position < getFirstOperationOffset()) {
- throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]");
- }
- Channels.readFromFileChannelWithEofException(channel, position, buffer);
- }
-
- @Override
- public String toString() {
- return "TranslogSnapshot{" +
- "readOperations=" + readOperations +
- ", position=" + position +
- ", estimateTotalOperations=" + totalOperations +
- ", length=" + length +
- ", generation=" + generation +
- ", reusableBuffer=" + reusableBuffer +
- '}';
- }
-
}
diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshotReader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshotReader.java
new file mode 100644
index 0000000000000..6d81590315593
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshotReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import org.elasticsearch.common.io.Channels;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Locale;
+
+public class TranslogSnapshotReader extends BaseTranslogReader {
+
+ private final int totalOperations;
+
+ @Override
+ public int totalOperations() {
+ return totalOperations;
+ }
+
+ private final Checkpoint checkpoint;
+ protected final long length;
+
+ private final ByteBuffer reusableBuffer;
+ private long position;
+
+ public long getPosition() {
+ return position;
+ }
+
+ private int readOperations;
+
+ public int getReadOperations() {
+ return readOperations;
+ }
+
+ private BufferedChecksumStreamInput reuse;
+
+ TranslogSnapshotReader(final BaseTranslogReader reader, final long length) {
+ super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset);
+ this.length = length;
+ this.totalOperations = reader.totalOperations();
+ this.checkpoint = reader.getCheckpoint();
+ this.reusableBuffer = ByteBuffer.allocate(1024);
+ readOperations = 0;
+ position = firstOperationOffset;
+ reuse = null;
+ }
+
+ protected Translog.OperationWithPosition readOperation() throws IOException {
+ return readOperationWithPosition(position);
+ }
+
+ protected Translog.Operation readOperation(final long readPosition) throws IOException {
+ return readOperationWithPosition(readPosition).operation();
+ }
+
+ private Translog.OperationWithPosition readOperationWithPosition(final long readPosition) throws IOException {
+ final int size = readSize(reusableBuffer, readPosition);
+ reuse = checksumStream(reusableBuffer, readPosition, size, reuse);
+ final Translog.Operation op = read(reuse);
+ position += size;
+ readOperations++;
+ return new Translog.OperationWithPosition(op, position);
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return length;
+ }
+
+ @Override
+ Checkpoint getCheckpoint() {
+ return checkpoint;
+ }
+
+ /**
+ * reads an operation at the given position into the given buffer.
+ */
+ @Override
+ protected void readBytes(ByteBuffer buffer, long position) throws IOException {
+ if (position >= length) {
+ final String message = String.format(
+ Locale.ROOT,
+ "read requested at [%d] past EOF [%d] of generation [%d] at [%s]",
+ position,
+ length,
+ getGeneration(),
+ path);
+ throw new EOFException(message);
+ }
+ if (position < getFirstOperationOffset()) {
+ final String message = String.format(
+ Locale.ROOT,
+ "read requested at [%d] before position [%d] of first operation of generation [%d] at [%s]",
+ position,
+ getFirstOperationOffset(),
+ getGeneration(),
+ path);
+ throw new IOException(message);
+ }
+ Channels.readFromFileChannelWithEofException(channel, position, buffer);
+ }
+
+ @Override
+ public String toString() {
+ return "TranslogSnapshot{" +
+ "readOperations=" + readOperations +
+ ", position=" + position +
+ ", estimateTotalOperations=" + totalOperations +
+ ", length=" + length +
+ ", generation=" + generation +
+ ", reusableBuffer=" + reusableBuffer +
+ '}';
+ }
+
+}
diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
index 1eed393208c9c..841d7d88c797b 100644
--- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
+++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
@@ -19,6 +19,8 @@
package org.elasticsearch.index.translog;
+import com.carrotsearch.hppc.LongLongHashMap;
+import com.carrotsearch.hppc.LongLongMap;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.OutputStreamDataOutput;
@@ -44,7 +46,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
+import java.util.function.Supplier;
public class TranslogWriter extends BaseTranslogReader implements Closeable {
@@ -177,18 +181,16 @@ private synchronized void closeWithTragicEvent(Exception exception) throws IOExc
}
/**
- * add the given bytes to the translog and return the location they were written at
- */
-
- /**
- * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to.
+ * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to. The specified
+ * callback is invoked under lock with the position of the added operation.
*
- * @param data the bytes to write
- * @param seqNo the sequence number associated with the operation
+ * @param data the bytes to write
+ * @param seqNo the sequence number associated with the operation
+ * @param consumer a callback with the position of the added operation
* @return the location the bytes were written to
* @throws IOException if writing to the translog resulted in an I/O exception
*/
- public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
+ public synchronized Translog.Location add(final BytesReference data, final long seqNo, final LongConsumer consumer) throws IOException {
ensureOpen();
final long offset = totalOffset;
try {
@@ -217,6 +219,7 @@ public synchronized Translog.Location add(final BytesReference data, final long
assert assertNoSeqNumberConflict(seqNo, data);
+ consumer.accept(offset);
return new Translog.Location(generation, offset, data.length());
}
@@ -310,17 +313,33 @@ public TranslogReader closeIntoReader() throws IOException {
@Override
public TranslogSnapshot newSnapshot() {
- // make sure to acquire the sync lock first, to prevent dead locks with threads calling
- // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
+ return snapshot(super::newSnapshot);
+ }
+
+ @Override
+ public RandomAccessTranslogSnapshot newRandomAccessSnapshot(final LongLongMap index) {
+ // we have to make a copy, otherwise this map could be concurrently modified while we are reading from it
+ return snapshot(() -> super.newRandomAccessSnapshot(new LongLongHashMap(index)));
+ }
+
+ private T snapshot(final Supplier supplier) {
+ /*
+ * We have to acquire the sync lock first to prevent deadlocks with threads calling syncUpTo where the syncLock is acquired first
+ * followed by acquiring the monitor on this.
+ */
synchronized (syncLock) {
synchronized (this) {
ensureOpen();
try {
sync();
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
}
- return super.newSnapshot();
+ /*
+ * This must run under lock; see newRandomAccessSnapshot where we make a copy of the translog index and this must be done
+ * when we are certain that there are no additional modifications being made to the map as it is not thread-safe.
+ */
+ return supplier.get();
}
}
}
diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
index 0779ba0f5a7e7..82870758ed333 100644
--- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
+++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
@@ -181,7 +181,7 @@ private Tuple, TranslogWriter> createReadersAndWriter(final
for (int ops = randomIntBetween(0, 20); ops > 0; ops--) {
out.reset(bytes);
out.writeInt(ops);
- writer.add(new BytesArray(bytes), ops);
+ writer.add(new BytesArray(bytes), ops, position -> {});
}
}
return new Tuple<>(readers, writer);
diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 25ecfe3b1ba63..ad1c7c2dfff94 100644
--- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -75,6 +75,7 @@
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
@@ -99,9 +100,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -1107,15 +1110,13 @@ public void testBasicCheckpoint() throws IOException {
for (int op = 0; op < translogOperations; op++) {
if (op <= lastSynced) {
- final Translog.Operation read = snapshot.next();
+ final Translog.Operation read = snapshot.next().operation();
assertEquals(Integer.toString(op), read.getSource().source.utf8ToString());
} else {
- Translog.Operation next = snapshot.next();
- assertNull(next);
+ assertNull(snapshot.next());
}
}
- Translog.Operation next = snapshot.next();
- assertNull(next);
+ assertNull(snapshot.next());
}
assertEquals(translogOperations + 1, translog.totalOperations());
assertThat(checkpoint.globalCheckpoint, equalTo(lastSyncedGlobalCheckpoint));
@@ -1140,7 +1141,7 @@ public void testTranslogWriter() throws IOException {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seenSeqNos.add(seqNo);
}
- writer.add(new BytesArray(bytes), seqNo);
+ writer.add(new BytesArray(bytes), seqNo, position -> {});
}
writer.sync();
@@ -1159,7 +1160,7 @@ public void testTranslogWriter() throws IOException {
out.reset(bytes);
out.writeInt(2048);
- writer.add(new BytesArray(bytes), randomNonNegativeLong());
+ writer.add(new BytesArray(bytes), randomNonNegativeLong(), position -> {});
if (reader instanceof TranslogReader) {
ByteBuffer buffer = ByteBuffer.allocate(4);
@@ -1190,7 +1191,7 @@ public void testCloseIntoReader() throws IOException {
for (int i = 0; i < numOps; i++) {
out.reset(bytes);
out.writeInt(i);
- writer.add(new BytesArray(bytes), randomNonNegativeLong());
+ writer.add(new BytesArray(bytes), randomNonNegativeLong(), position -> {});
}
writer.sync();
final Checkpoint writerCheckpoint = writer.getCheckpoint();
@@ -2510,9 +2511,9 @@ public void testMinSeqNoBasedAPI() throws IOException {
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
TranslogSnapshot snapshot = reader.newSnapshot();
- Translog.Operation operation;
- while ((operation = snapshot.next()) != null) {
- generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm()));
+ Translog.OperationWithPosition it;
+ while ((it = snapshot.next()) != null) {
+ generationSeenSeqNos.add(Tuple.tuple(it.operation().seqNo(), it.operation().primaryTerm()));
opCount++;
}
assertThat(opCount, equalTo(reader.totalOperations()));
@@ -2645,8 +2646,9 @@ public void testSnapshotReadOperationInReverse() throws Exception {
public void testSnapshotDedupOperations() throws Exception {
final Map latestOperations = new HashMap<>();
final int generations = between(2, 20);
+ final int operations = 500;
for (int gen = 0; gen < generations; gen++) {
- List batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList());
+ List batch = LongStream.rangeClosed(0, between(0, operations)).boxed().collect(Collectors.toList());
Randomness.shuffle(batch);
for (Long seqNo : batch) {
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1});
@@ -2658,6 +2660,92 @@ public void testSnapshotDedupOperations() throws Exception {
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values()));
}
+
+ try (Translog.RandomAccessSnapshot snapshot = translog.getRandomAccessSnapshot(0, operations - 1)) {
+ for (long i = 0; i < operations; i++) {
+ assertThat(snapshot.operation(i), equalTo(latestOperations.get(i)));
+ }
+ }
+ }
+
+ public void testRandomAccessSnapshot() throws ExecutionException, InterruptedException, IOException {
+ long seqNo = 0;
+ final int generations = randomIntBetween(1, 32);
+ final Map> translogIndex = new HashMap<>();
+ for (int i = 0; i < generations; i++) {
+ final int documents = scaledRandomIntBetween(1, 512);
+ for (int j = 0; j < documents; j++) {
+ final Translog.Index index = new Translog.Index("doc", randomAlphaOfLength(16), seqNo, new byte[]{});
+ final Translog.Location location = translog.add(index);
+ translogIndex.put(seqNo, Tuple.tuple(location.generation, location.translogLocation));
+ seqNo++;
+ }
+ if (i < generations - 1) {
+ translog.rollGeneration();
+ }
+ }
+
+ try (Translog.RandomAccessSnapshot snapshot = translog.getRandomAccessSnapshot(0, seqNo - 1)) {
+ for (int i = 0; i < seqNo; i++) {
+ assertNotNull("missing operation [" + i + "] out of [" + (seqNo - 1) + "]", snapshot.operation(i));
+ final Tuple value = translogIndex.get((long) i);
+ assertThat(translog.getRandomAccessIndexForGeneration(value.v1()).get(i), equalTo(value.v2()));
+ if (value.v1() == generations) {
+ assertThat(translog.getRandomAccessIndexForCurrentGeneration().get(i), equalTo(value.v2()));
+ }
+ }
+ }
+ }
+
+ public void testRandomAccessSnapshotWithConcurrency() throws BrokenBarrierException, InterruptedException, IOException {
+ final AtomicLong seqNo = new AtomicLong();
+ final ConcurrentHashMap translogIndex = new ConcurrentHashMap<>();
+
+ final int numberOfThreads = scaledRandomIntBetween(1, 16);
+ final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
+ final List threads = new ArrayList<>();
+ for (int i = 0; i < numberOfThreads; i++) {
+ final Thread thread = new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (final BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ final long currentSeqNo = seqNo.getAndIncrement();
+ final Translog.Index index = new Translog.Index("doc", randomAlphaOfLength(16), currentSeqNo, new byte[] {});
+ try {
+ final Location location = translog.add(index);
+ translogIndex.put(currentSeqNo, location.translogLocation);
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ try {
+ barrier.await();
+ } catch (final BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ threads.add(thread);
+ thread.start();
+ }
+
+ barrier.await();
+
+ barrier.await();
+
+ try (Translog.RandomAccessSnapshot snapshot = translog.getRandomAccessSnapshot(0, seqNo.get() - 1)) {
+ for (long i = 0; i < seqNo.get(); i++) {
+ assertNotNull("missing operation [" + i + "] out of [" + (seqNo.get() - 1) + "]", snapshot.operation(i));
+ assertThat(translog.getRandomAccessIndexForCurrentGeneration().get(i), equalTo(translogIndex.get(i)));
+ }
+ }
+
+ for (final Thread thread : threads) {
+ thread.join();
+ }
+
}
static class SortedSnapshot implements Translog.Snapshot {