diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
index 49e8249e9d370..bf61febb741eb 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java
@@ -58,34 +58,21 @@ public final long getFirstOperationOffset() {
return firstOperationOffset;
}
- public Translog.Operation read(Translog.Location location) throws IOException {
- assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
- ByteBuffer buffer = ByteBuffer.allocate(location.size);
- try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
- return read(checksumStreamInput);
- }
- }
-
/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
- protected final int readSize(ByteBuffer reusableBuffer, long position) {
+ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IOException {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
- try {
- reusableBuffer.clear();
- reusableBuffer.limit(4);
- readBytes(reusableBuffer, position);
- reusableBuffer.flip();
- // Add an extra 4 to account for the operation size integer itself
- final int size = reusableBuffer.getInt() + 4;
- final long maxSize = sizeInBytes() - position;
- if (size < 0 || size > maxSize) {
- throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
- }
-
- return size;
- } catch (IOException e) {
- throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e);
+ reusableBuffer.clear();
+ reusableBuffer.limit(4);
+ readBytes(reusableBuffer, position);
+ reusableBuffer.flip();
+ // Add an extra 4 to account for the operation size integer itself
+ final int size = reusableBuffer.getInt() + 4;
+ final long maxSize = sizeInBytes() - position;
+ if (size < 0 || size > maxSize) {
+ throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
+ return size;
}
public Translog.Snapshot newSnapshot() {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 198d9b4cd4508..056716a29bd9a 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -384,31 +384,6 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
return newFile;
}
-
- /**
- * Read the Operation object from the given location. This method will try to read the given location from
- * the current or from the currently committing translog file. If the location is in a file that has already
- * been closed or even removed the method will return null instead.
- */
- Translog.Operation read(Location location) { // TODO this is only here for testing - we can remove it?
- try (ReleasableLock lock = readLock.acquire()) {
- final BaseTranslogReader reader;
- final long currentGeneration = current.getGeneration();
- if (currentGeneration == location.generation) {
- reader = current;
- } else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) {
- reader = readers.get(readers.size() - 1);
- } else if (currentGeneration < location.generation) {
- throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]");
- } else {
- return null;
- }
- return reader.read(location);
- } catch (IOException e) {
- throw new ElasticsearchException("failed to read source from translog location " + location, e);
- }
- }
-
/**
* Adds a delete / index operations to the transaction log.
*
@@ -432,7 +407,6 @@ public Location add(Operation operation) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
- assert assertBytesAtLocation(location, bytes);
return location;
}
} catch (AlreadyClosedException | IOException ex) {
@@ -469,12 +443,6 @@ public Location getLastWriteLocation() {
}
}
- boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
- // tests can override this
- ByteBuffer buffer = ByteBuffer.allocate(location.size);
- current.readBytes(buffer, location.translogLocation);
- return new BytesArray(buffer.array()).equals(expectedBytes);
- }
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
index f33ec1bd60702..a08259ef32dc7 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
@@ -26,7 +26,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
-public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
+final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
private final int totalOperations;
protected final long length;
@@ -51,7 +51,7 @@ public TranslogSnapshot(long generation, FileChannel channel, Path path, long fi
}
@Override
- public final int totalOperations() {
+ public int totalOperations() {
return totalOperations;
}
@@ -64,7 +64,7 @@ public Translog.Operation next() throws IOException {
}
}
- protected final Translog.Operation readOperation() throws IOException {
+ protected Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 9961637c32362..cb76763363e91 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -85,6 +85,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -206,53 +207,6 @@ private String randomNonTranslogPatternString(int min, int max) {
return string;
}
- public void testRead() throws IOException {
- Location loc0 = translog.getLastWriteLocation();
- assertNotNull(loc0);
-
- Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
- assertThat(loc1, greaterThan(loc0));
- assertThat(translog.getLastWriteLocation(), greaterThan(loc1));
- Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
- assertThat(loc2, greaterThan(loc1));
- assertThat(translog.getLastWriteLocation(), greaterThan(loc2));
- assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
- assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
-
- Translog.Location lastLocBeforeSync = translog.getLastWriteLocation();
- translog.sync();
- assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
- assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
- assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));
-
- Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
- assertThat(loc3, greaterThan(loc2));
- assertThat(translog.getLastWriteLocation(), greaterThan(loc3));
- assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
-
- lastLocBeforeSync = translog.getLastWriteLocation();
- translog.sync();
- assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
- assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
- translog.prepareCommit();
- /*
- * The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can
- * safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation
- * and less than the location of the next write operation.
- */
- assertThat(translog.getLastWriteLocation(), greaterThan(lastLocBeforeSync));
- assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
- translog.commit();
- assertNull(translog.read(loc1));
- assertNull(translog.read(loc2));
- assertNull(translog.read(loc3));
- try {
- translog.read(new Translog.Location(translog.currentFileGeneration() + 1, 17, 35));
- fail("generation is greater than the current");
- } catch (IllegalStateException ex) {
- // expected
- }
- }
public void testSimpleOperations() throws IOException {
ArrayList ops = new ArrayList<>();
@@ -441,7 +395,7 @@ public void assertFileDeleted(Translog translog, long id) {
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id))));
}
- static class LocationOperation {
+ static class LocationOperation implements Comparable {
final Translog.Operation operation;
final Translog.Location location;
@@ -450,6 +404,10 @@ public LocationOperation(Translog.Operation operation, Translog.Location locatio
this.location = location;
}
+ @Override
+ public int compareTo(LocationOperation o) {
+ return location.compareTo(o.location);
+ }
}
public void testConcurrentWritesWithVaryingSize() throws Throwable {
@@ -478,8 +436,12 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
threads[i].join(60 * 1000);
}
- for (LocationOperation locationOperation : writtenOperations) {
- Translog.Operation op = translog.read(locationOperation.location);
+ List collect = writtenOperations.stream().collect(Collectors.toList());
+ Collections.sort(collect);
+ Translog.Snapshot snapshot = translog.newSnapshot();
+ for (LocationOperation locationOperation : collect) {
+ Translog.Operation op = snapshot.next();
+ assertNotNull(op);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
@@ -505,6 +467,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
}
}
+ assertNull(snapshot.next());
}
@@ -521,13 +484,16 @@ public void testTranslogChecksums() throws Exception {
corruptTranslogs(translogDir);
AtomicInteger corruptionsCaught = new AtomicInteger(0);
+ Translog.Snapshot snapshot = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
- translog.read(location);
+ Translog.Operation next = snapshot.next();
+ assertNotNull(next);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}
}
+ expectThrows(TranslogCorruptedException.class, () -> snapshot.next());
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
}
@@ -544,15 +510,12 @@ public void testTruncatedTranslogs() throws Exception {
truncateTranslogs(translogDir);
AtomicInteger truncations = new AtomicInteger(0);
+ Translog.Snapshot snap = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
- translog.read(location);
- } catch (ElasticsearchException e) {
- if (e.getCause() instanceof EOFException) {
- truncations.incrementAndGet();
- } else {
- throw e;
- }
+ assertNotNull(snap.next());
+ } catch (EOFException e) {
+ truncations.incrementAndGet();
}
}
assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1));
@@ -860,8 +823,14 @@ public void testLocationComparison() throws IOException {
}
assertEquals(max.generation, translog.currentFileGeneration());
- final Translog.Operation read = translog.read(max);
- assertEquals(read.getSource().source.utf8ToString(), Integer.toString(count));
+ Translog.Snapshot snap = translog.newSnapshot();
+ Translog.Operation next;
+ Translog.Operation maxOp = null;
+ while ((next = snap.next()) != null) {
+ maxOp = next;
+ }
+ assertNotNull(maxOp);
+ assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count));
}
public static Translog.Location max(Translog.Location a, Translog.Location b) {
@@ -884,30 +853,24 @@ public void testBasicCheckpoint() throws IOException {
}
}
assertEquals(translogOperations, translog.totalOperations());
- final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
+ translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
assertEquals(lastSynced + 1, reader.totalOperations());
+ Translog.Snapshot snapshot = reader.newSnapshot();
+
for (int op = 0; op < translogOperations; op++) {
- Translog.Location location = locations.get(op);
if (op <= lastSynced) {
- final Translog.Operation read = reader.read(location);
+ final Translog.Operation read = snapshot.next();
assertEquals(Integer.toString(op), read.getSource().source.utf8ToString());
} else {
- try {
- reader.read(location);
- fail("read past checkpoint");
- } catch (EOFException ex) {
-
- }
+ Translog.Operation next = snapshot.next();
+ assertNull(next);
}
}
- try {
- reader.read(lastLocation);
- fail("read past checkpoint");
- } catch (EOFException ex) {
- }
+ Translog.Operation next = snapshot.next();
+ assertNull(next);
}
assertEquals(translogOperations + 1, translog.totalOperations());
translog.close();
@@ -1618,11 +1581,6 @@ ChannelFactory getChannelFactory() {
}
};
}
-
- @Override
- protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
- return true; // we don't wanna fail in the assert
- }
};
}