Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Madvise jnr #109

Merged
merged 7 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ if (new File('.git').exists() && (exec {
version "unspecified"
}


group 'com.upserve'

description = """Uppend: fast, append-only key-multivalue store"""

// TODO unused-dependency is broken - claims all dependencies are unused!
Expand All @@ -41,6 +39,7 @@ apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'jacoco'
apply plugin: 'com.bmuschko.nexus'
apply plugin: 'c'

jacoco {
toolVersion = "0.8.2" // Fixed to resolve issue with JDK 11 in Gradle 4.X.Y
Expand All @@ -59,6 +58,7 @@ dependencies {
compile 'info.picocli:picocli:4.0.1'
compile 'io.dropwizard.metrics:metrics-core:3.2.3'
compile 'it.unimi.dsi:fastutil:7.0.13'
compile 'com.github.jnr:jnr-ffi:2.1.1'
// compile 'me.lemire.integercompression:JavaFastPFOR:0.1.11'
compile 'org.slf4j:slf4j-api:1.7.22'

Expand All @@ -68,7 +68,9 @@ dependencies {
}

tasks.withType(JavaCompile) {

options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Werror"
//options.verbose = true
}

sourceSets {
Expand All @@ -89,6 +91,8 @@ processResources {
}
}


// TODO include the cross compiled nativeIO libs as a resource. Unpack and load from jar!
task fatJar(type: Jar) {
dependencies {
compile 'org.apache.logging.log4j:log4j-core:2.8'
Expand All @@ -103,6 +107,8 @@ task fatJar(type: Jar) {
}

tasks.withType(Test) {
maxHeapSize = "2048m"

// From https://stackoverflow.com/a/36130467/2136991
testLogging {
// set options for log level LIFECYCLE
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-all.zip
1 change: 0 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
rootProject.name = 'uppend'

18 changes: 16 additions & 2 deletions src/main/java/com/upserve/uppend/AppendOnlyStoreBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
public class AppendOnlyStoreBuilder extends FileStoreBuilder<AppendOnlyStoreBuilder> {
// Blocked Longs Config Options
public static final int DEFAULT_BLOBS_PER_BLOCK = 127;

private int blobsPerBlock = DEFAULT_BLOBS_PER_BLOCK;

// Blob Cache Options
public static final int DEFAULT_BLOB_PAGE_SIZE = 4 * 1024 * 1024;

private int blobPageSize = DEFAULT_BLOB_PAGE_SIZE;


public static final boolean DEFAULT_CACHE_BUFFERS = true; // Defaults to madvise normal LRU like page cache behavior
private boolean cacheBuffers = DEFAULT_CACHE_BUFFERS;

private BlobStoreMetrics.Adders blobStoreMetricsAdders = new BlobStoreMetrics.Adders();
private BlockedLongMetrics.Adders blockedLongMetricsAdders = new BlockedLongMetrics.Adders();

Expand All @@ -28,6 +30,11 @@ public AppendOnlyStoreBuilder withBlobPageSize(int blobPageSize) {
return this;
}

public AppendOnlyStoreBuilder withCacheBuffers(boolean cacheBuffers) {
this.cacheBuffers = cacheBuffers;
return this;
}

public AppendOnlyStore build() {
return build(false);
}
Expand All @@ -54,11 +61,18 @@ public int getBlobPageSize() {

public BlockedLongMetrics.Adders getBlockedLongMetricsAdders() { return blockedLongMetricsAdders; }

public boolean getCacheBuffers() {
return cacheBuffers;
}

@Override
public String toString() {
return "AppendOnlyStoreBuilder{" +
"blobsPerBlock=" + blobsPerBlock +
", blobPageSize=" + blobPageSize +
", cacheBuffers=" + cacheBuffers +
", blobStoreMetricsAdders=" + blobStoreMetricsAdders +
", blockedLongMetricsAdders=" + blockedLongMetricsAdders +
'}' + super.toString();
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/upserve/uppend/AppendStorePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ static AppendStorePartition openPartition(Path parentDir, String partition, bool
builder.getBlockedLongMetricsAdders()
);

// Allow control of caching buffers only for large blob content
VirtualPageFile blobs = new VirtualPageFile(
blobsFile(partitionDir),
builder.getLookupHashCount(),
builder.getBlobPageSize(),
builder.getTargetBufferSize(),
readOnly
readOnly,
builder.getCacheBuffers()
);
VirtualPageFile metadata = new VirtualPageFile(
metadataPath(partitionDir),
Expand Down
28 changes: 9 additions & 19 deletions src/main/java/com/upserve/uppend/BlockedLongs.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.upserve.uppend;

import com.google.common.util.concurrent.Striped;
import com.upserve.uppend.blobs.NativeIO;
import com.upserve.uppend.metrics.*;
import org.slf4j.Logger;

Expand Down Expand Up @@ -100,6 +101,7 @@ public class BlockedLongs implements AutoCloseable, Flushable {

try {
posBuf = blocks.map(readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE, posBufPosition, 8);
NativeIO.madvise(posBuf, NativeIO.Advice.WillNeed); // Will include the first few blocks
} catch (IOException e) {
throw new UncheckedIOException("Unable to map pos buffer at in " + file, e);
}
Expand Down Expand Up @@ -134,6 +136,8 @@ else if (pos < HEADER_BYTES) {
}
initialAppendCount = appendCountBuf.getLong(0);



posMem = new AtomicLong(pos);
}

Expand Down Expand Up @@ -420,32 +424,17 @@ public void clear() {
public void close() throws IOException {
log.debug("closing {}", file);

if (readOnly) {
blocks.close();
return;
}
Arrays.fill(pages, null);

IntStream.range(0, LOCK_SIZE).forEach(index -> stripedLocks.getAt(index).lock());
try {
flush();
blocks.close();
} finally {
IntStream.range(0, LOCK_SIZE).forEach(index -> stripedLocks.getAt(index).unlock());
}
flush();
blocks.close();
}

@Override
public void flush() {
if (readOnly) return;
log.debug("flushing {}", file);
posBuf.force();
appendCountBuf.putLong(0, initialAppendCount + appendCounter.sum());
appendCountBuf.force();

Arrays.stream(pages)
.parallel()
.filter(Objects::nonNull)
.forEach(MappedByteBuffer::force);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you no longer flushing pages inside this method called flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because flush guarantees durability - written to disk, but I now understand that is not required for multiple processes to share state in the same machine through the page cache.


log.debug("flushed {}", file);
}
Expand Down Expand Up @@ -487,7 +476,7 @@ private MappedByteBuffer page(long pos) {
private void preloadPage(int pageIndex) {
if (pageIndex < MAX_PAGES && pages[pageIndex] == null) {
// preload page
int prev = currentPage.getAndUpdate(current -> current < pageIndex ? pageIndex : current);
int prev = currentPage.getAndUpdate(current -> Math.max(pageIndex, current));
if (prev < pageIndex) {
ensurePage(pageIndex);
}
Expand All @@ -504,6 +493,7 @@ private MappedByteBuffer ensurePage(int pageIndex) {
try {
FileChannel.MapMode mapMode = readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE;
page = blocks.map(mapMode, pageStart, PAGE_SIZE);
// Could experiment with advise_random to reduce memory use or advise_willneed to hold more in page cache?
} catch (IOException e) {
throw new UncheckedIOException("unable to map page at page index " + pageIndex + " (" + pageStart + " + " + PAGE_SIZE + ") in " + file, e);
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/upserve/uppend/blobs/FilePage.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static java.lang.Integer.min;

/**
* File backed implementation of Page
*/
public class FilePage implements Page {

private final FileChannel channel;
private final int pageSize;
private final long pageStart;
Expand All @@ -25,7 +22,6 @@ public class FilePage implements Page {
this.channel = channel;
this.pageStart = pageStart;
this.pageSize = pageSize;

}

@Override
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/upserve/uppend/blobs/NativeIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.upserve.uppend.blobs;

import jnr.ffi.*;
import jnr.ffi.types.size_t;
import org.slf4j.Logger;
import com.kenai.jffi.MemoryIO;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.*;

public class NativeIO {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c");
static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux

public enum Advice {
// These seem to be fairly stable https://github.com/torvalds/linux
// TODO add to https://github.com/jnr/jnr-constants
Normal(0), Random(1), Sequential(2), WillNeed(3), DontNeed(4);
private final int value;
Advice(int val) {
this.value = val;
}
}

public interface NativeC {
int madvise(@size_t long address, @size_t long size, int advice);
int getpagesize();
}

static long alignedAddress(long address) {
return address & (- pageSize);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value of this function will always be <= address. This means that for an unaligned buffer, madvise applies to some memory outside of the buffer. It shouldn't crash because memory is allocated in pageSize increments, but it will affect the management of memory we don't intend it to affect. We might want to consider a more conservative version of this that returns the next aligned address.

return (address + pageSize -1) & (~(pageSize-1));

}

static long alignedSize(long address, int capacity) {
long end = address + capacity;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lucene had this as long end = alignedAddress + capacity
This means that the pages you advise for may not cover the entire capacity of the buffer.
https://github.com/apache/lucene-solr/blob/master/lucene/misc/src/java/org/apache/lucene/store/NativePosixUtil.cpp#L305-L308

At this point I am not inclined to license this file under the lucene apache license, but open suggestions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry about the license. IANAL, but It doesn't look like there's a single line copied verbatim.

I'm skeptical about their approach in general. It makes a lot of assumptions about alignment that don't seem necessary and could break in the future if the underlying kernel implementation changes.

end = (end + pageSize - 1) & (-pageSize);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the size needs to be page aligned. The Linux man page for madvise requires that the address is page aligned, but doesn't put any restrictions on the size. Just to be conservative, we should probably drop this line so that the madvise applies to the contents of the buffer exactly.

Copy link
Contributor Author

@dstuebe dstuebe Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer:     |------------|
Pages: |------|------|------|

I am actually inclined to be aggressive and assert that all three pages should be advised.
We could also make this a boolean flag - aggressive mode!

I think a good solution might be to enforce that the file headers which are WILLNEED and the file content which may be RANDOM are aligned at 4096. I think this is probably already true, but would be easy to enforce.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 on the feature flag. This isn't something we want to tune. I would rather add a comment that the policy is to apply madvise to memory outside of unaligned buffers and move on.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thought of this:

Buffers:    |------A-----|-----B------|
Pages: |------|------|------|------|------|

What if A is WILLNEED and B is RANDOM or vice-versa?

return end - alignedAddress(address);
}

public static void madvise(MappedByteBuffer buffer, Advice advice) throws IOException {

final long address = MemoryIO.getInstance().getDirectBufferAddress(buffer);
final int capacity = buffer.capacity();

long alignedAddress = alignedAddress(address);
long alignedSize = alignedSize(alignedAddress, capacity);

log.debug(
"Page size {}; Address: raw - {}, aligned - {}; Size: raw - {}, aligned - {}",
pageSize, address, alignedAddress, capacity, alignedSize
);
int val = nativeC.madvise(alignedAddress, alignedSize, advice.value);

if (val != 0) {
throw new IOException(String.format("System call madvise failed with code: %d", val));
}
}
}
Loading