Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,21 @@ public final long getFirstOperationOffset() {
return firstOperationOffset;
}

public Translog.Operation read(Translog.Location location) throws IOException {
assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]";
ByteBuffer buffer = ByteBuffer.allocate(location.size);
try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) {
return read(checksumStreamInput);
}
}

/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
protected final int readSize(ByteBuffer reusableBuffer, long position) {
protected final int readSize(ByteBuffer reusableBuffer, long position) throws IOException {
// read op size from disk
assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]";
try {
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}

return size;
} catch (IOException e) {
throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e);
reusableBuffer.clear();
reusableBuffer.limit(4);
readBytes(reusableBuffer, position);
reusableBuffer.flip();
// Add an extra 4 to account for the operation size integer itself
final int size = reusableBuffer.getInt() + 4;
final long maxSize = sizeInBytes() - position;
if (size < 0 || size > maxSize) {
throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size);
}
return size;
}

public Translog.Snapshot newSnapshot() {
Expand Down
32 changes: 0 additions & 32 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,31 +384,6 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
return newFile;
}


/**
* Read the Operation object from the given location. This method will try to read the given location from
* the current or from the currently committing translog file. If the location is in a file that has already
* been closed or even removed the method will return <code>null</code> instead.
*/
Translog.Operation read(Location location) { // TODO this is only here for testing - we can remove it?
try (ReleasableLock lock = readLock.acquire()) {
final BaseTranslogReader reader;
final long currentGeneration = current.getGeneration();
if (currentGeneration == location.generation) {
reader = current;
} else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) {
reader = readers.get(readers.size() - 1);
} else if (currentGeneration < location.generation) {
throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]");
} else {
return null;
}
return reader.read(location);
} catch (IOException e) {
throw new ElasticsearchException("failed to read source from translog location " + location, e);
}
}

/**
* Adds a delete / index operations to the transaction log.
*
Expand All @@ -432,7 +407,6 @@ public Location add(Operation operation) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
assert assertBytesAtLocation(location, bytes);
return location;
}
} catch (AlreadyClosedException | IOException ex) {
Expand Down Expand Up @@ -469,12 +443,6 @@ public Location getLastWriteLocation() {
}
}

boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
// tests can override this
ByteBuffer buffer = ByteBuffer.allocate(location.size);
current.readBytes(buffer, location.translogLocation);
return new BytesArray(buffer.array()).equals(expectedBytes);
}

/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;

public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {

private final int totalOperations;
protected final long length;
Expand All @@ -51,7 +51,7 @@ public TranslogSnapshot(long generation, FileChannel channel, Path path, long fi
}

@Override
public final int totalOperations() {
public int totalOperations() {
return totalOperations;
}

Expand All @@ -64,7 +64,7 @@ public Translog.Operation next() throws IOException {
}
}

protected final Translog.Operation readOperation() throws IOException {
protected Translog.Operation readOperation() throws IOException {
final int opSize = readSize(reusableBuffer, position);
reuse = checksummedStream(reusableBuffer, position, opSize, reuse);
Translog.Operation op = read(reuse);
Expand Down
116 changes: 37 additions & 79 deletions core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -206,53 +207,6 @@ private String randomNonTranslogPatternString(int min, int max) {
return string;
}

public void testRead() throws IOException {
Location loc0 = translog.getLastWriteLocation();
assertNotNull(loc0);

Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
assertThat(loc1, greaterThan(loc0));
assertThat(translog.getLastWriteLocation(), greaterThan(loc1));
Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
assertThat(loc2, greaterThan(loc1));
assertThat(translog.getLastWriteLocation(), greaterThan(loc2));
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));

Translog.Location lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc1).getSource().source, equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source, equalTo(new BytesArray(new byte[]{2})));

Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
assertThat(loc3, greaterThan(loc2));
assertThat(translog.getLastWriteLocation(), greaterThan(loc3));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));

lastLocBeforeSync = translog.getLastWriteLocation();
translog.sync();
assertEquals(lastLocBeforeSync, translog.getLastWriteLocation());
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.prepareCommit();
/*
* The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can
* safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation
* and less than the location of the next write operation.
*/
assertThat(translog.getLastWriteLocation(), greaterThan(lastLocBeforeSync));
assertThat(translog.read(loc3).getSource().source, equalTo(new BytesArray(new byte[]{3})));
translog.commit();
assertNull(translog.read(loc1));
assertNull(translog.read(loc2));
assertNull(translog.read(loc3));
try {
translog.read(new Translog.Location(translog.currentFileGeneration() + 1, 17, 35));
fail("generation is greater than the current");
} catch (IllegalStateException ex) {
// expected
}
}

public void testSimpleOperations() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
Expand Down Expand Up @@ -441,7 +395,7 @@ public void assertFileDeleted(Translog translog, long id) {
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id))));
}

static class LocationOperation {
static class LocationOperation implements Comparable<LocationOperation> {
final Translog.Operation operation;
final Translog.Location location;

Expand All @@ -450,6 +404,10 @@ public LocationOperation(Translog.Operation operation, Translog.Location locatio
this.location = location;
}

@Override
public int compareTo(LocationOperation o) {
return location.compareTo(o.location);
}
}

public void testConcurrentWritesWithVaryingSize() throws Throwable {
Expand Down Expand Up @@ -478,8 +436,12 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
threads[i].join(60 * 1000);
}

for (LocationOperation locationOperation : writtenOperations) {
Translog.Operation op = translog.read(locationOperation.location);
List<LocationOperation> collect = writtenOperations.stream().collect(Collectors.toList());
Collections.sort(collect);
Translog.Snapshot snapshot = translog.newSnapshot();
for (LocationOperation locationOperation : collect) {
Translog.Operation op = snapshot.next();
assertNotNull(op);
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
Expand All @@ -505,6 +467,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
}

}
assertNull(snapshot.next());

}

Expand All @@ -521,13 +484,16 @@ public void testTranslogChecksums() throws Exception {
corruptTranslogs(translogDir);

AtomicInteger corruptionsCaught = new AtomicInteger(0);
Translog.Snapshot snapshot = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
translog.read(location);
Translog.Operation next = snapshot.next();
assertNotNull(next);
} catch (TranslogCorruptedException e) {
corruptionsCaught.incrementAndGet();
}
}
expectThrows(TranslogCorruptedException.class, () -> snapshot.next());
assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
}

Expand All @@ -544,15 +510,12 @@ public void testTruncatedTranslogs() throws Exception {
truncateTranslogs(translogDir);

AtomicInteger truncations = new AtomicInteger(0);
Translog.Snapshot snap = translog.newSnapshot();
for (Translog.Location location : locations) {
try {
translog.read(location);
} catch (ElasticsearchException e) {
if (e.getCause() instanceof EOFException) {
truncations.incrementAndGet();
} else {
throw e;
}
assertNotNull(snap.next());
} catch (EOFException e) {
truncations.incrementAndGet();
}
}
assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1));
Expand Down Expand Up @@ -860,8 +823,14 @@ public void testLocationComparison() throws IOException {
}

assertEquals(max.generation, translog.currentFileGeneration());
final Translog.Operation read = translog.read(max);
assertEquals(read.getSource().source.utf8ToString(), Integer.toString(count));
Translog.Snapshot snap = translog.newSnapshot();
Translog.Operation next;
Translog.Operation maxOp = null;
while ((next = snap.next()) != null) {
maxOp = next;
}
assertNotNull(maxOp);
assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count));
}

public static Translog.Location max(Translog.Location a, Translog.Location b) {
Expand All @@ -884,30 +853,24 @@ public void testBasicCheckpoint() throws IOException {
}
}
assertEquals(translogOperations, translog.totalOperations());
final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));

final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
assertEquals(lastSynced + 1, reader.totalOperations());
Translog.Snapshot snapshot = reader.newSnapshot();

for (int op = 0; op < translogOperations; op++) {
Translog.Location location = locations.get(op);
if (op <= lastSynced) {
final Translog.Operation read = reader.read(location);
final Translog.Operation read = snapshot.next();
assertEquals(Integer.toString(op), read.getSource().source.utf8ToString());
} else {
try {
reader.read(location);
fail("read past checkpoint");
} catch (EOFException ex) {

}
Translog.Operation next = snapshot.next();
assertNull(next);
}
}
try {
reader.read(lastLocation);
fail("read past checkpoint");
} catch (EOFException ex) {
}
Translog.Operation next = snapshot.next();
assertNull(next);
}
assertEquals(translogOperations + 1, translog.totalOperations());
translog.close();
Expand Down Expand Up @@ -1618,11 +1581,6 @@ ChannelFactory getChannelFactory() {
}
};
}

@Override
protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
return true; // we don't wanna fail in the assert
}
};
}

Expand Down