Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public static ByteRange computeRange(long rangeSize, long position, long size, l
);
}

public static ByteRange computeRange(long rangeSize, long position, long size) {
return ByteRange.of((position / rangeSize) * rangeSize, (((position + size - 1) / rangeSize) + 1) * rangeSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to reuse computeRange(rangeSize, position, size, Long.MAX_VALUE) instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it would be OK, but I prefer to have a dedicated method.

}

public static void ensureSlice(String sliceName, long sliceOffset, long sliceLength, IndexInput input) {
if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > input.length()) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,8 @@ public int populateAndRead(
final RangeAvailableHandler reader,
final RangeMissingHandler writer
) throws Exception {
assert assertOffsetsWithinFileLength(rangeToWrite.start(), rangeToWrite.length(), length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add assert rangeToWrite.start() >= 0 instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might also be great to comment on why we allow rangeToWrite beyond the blob length since it is not intuitive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both comments make sense, I pushed fe84866

// some cache files can grow after being created, so rangeToWrite can be larger than the initial {@code length}
assert rangeToWrite.start() >= 0 : rangeToWrite;
assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), length);
// We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
// our measurement here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -184,6 +185,37 @@ public static void copyToCacheFileAligned(
}
}

/**
* Copy all bytes from {@code input} to {@code fc}, only doing writes aligned along {@link #PAGE_SIZE}.
*
* @param fc output cache file reference
* @param input stream to read from
* @param fileChannelPos position in {@code fc} to write to
* @param progressUpdater callback to invoke with the number of copied bytes as they are copied
* @param buffer bytebuffer to use for writing
* @return the number of bytes copied
* @throws IOException on failure
*/
public static int copyToCacheFileAligned(IO fc, InputStream input, int fileChannelPos, IntConsumer progressUpdater, ByteBuffer buffer)
throws IOException {
int bytesCopied = 0;
while (true) {
final int bytesRead = Streams.read(input, buffer, buffer.remaining());
if (bytesRead <= 0) {
break;
}
if (buffer.hasRemaining()) {
// ensure that last write is aligned on 4k boundaries (= page size)
final int remainder = buffer.position() % PAGE_SIZE;
final int adjustment = remainder == 0 ? 0 : PAGE_SIZE - remainder;
buffer.position(buffer.position() + adjustment);
}
bytesCopied += positionalWrite(fc, fileChannelPos + bytesCopied, buffer);
progressUpdater.accept(bytesCopied);
Comment on lines +207 to +214
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for my education: With the existing version of this method, we adjust bytesCopied with the page alignment before invoke progressUpdater. I think this is beacuse ProgressListenableActionFuture has a few assertions that expect bytesCopied not exceeding its end value (example).

We don't need the same adjustment here for the callback because ProgressListenableActionFuture should always be created with page aligned end value and this is guaranteed by the new BlobCacheUtils#computeRange method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is beacuse ProgressListenableActionFuture has a few assertions that expect bytesCopied not exceeding its end value (example).

Yes, we cannot update progress to a value that would exceed the pending range in the SparseFileTracker.

We don't need the same adjustment here for the callback because ProgressListenableActionFuture should always be created with page aligned end value and this is guaranteed by the new BlobCacheUtils#computeRange method?

Yes 👍

}
return bytesCopied;
}

private static int positionalWrite(IO fc, int start, ByteBuffer byteBuffer) throws IOException {
byteBuffer.flip();
int written = fc.write(byteBuffer, start);
Expand Down