Skip to content

Commit

Permalink
GH-4891 LMDB store: Round trip to triple store to correctly gc any ki…
Browse files Browse the repository at this point in the history
…nd of value.
  • Loading branch information
kenwenzel committed Feb 8, 2024
1 parent 540a948 commit 592b04a
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void handleClose() throws SailException {
contextIdIt.close();
}

protected SailException causeIOException(IOException e) {
private SailException causeIOException(IOException e) {
return new SailException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
Expand Down Expand Up @@ -76,7 +77,8 @@ class LmdbSailStore implements SailStore {

private final boolean enableMultiThreading = true;

private final PersistentSet<Long> unusedIds;
private PersistentSetFactory<Long> setFactory;
private PersistentSet<Long> unusedIds, nextUnusedIds;

/**
* A fast non-blocking circular buffer backed by an array.
Expand Down Expand Up @@ -179,19 +181,15 @@ abstract static class StatefulOperation implements Operation {
* Creates a new {@link LmdbSailStore}.
*/
public LmdbSailStore(File dataDir, LmdbStoreConfig config) throws IOException, SailException {
this.unusedIds = new PersistentSet<>(dataDir) {
@Override
protected byte[] write(Long element) {
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
bb.putLong(element);
return bb.array();
}

@Override
protected Long read(ByteBuffer buffer) {
return buffer.order(ByteOrder.BIG_ENDIAN).getLong();
}
this.setFactory = new PersistentSetFactory<>(dataDir);
Function<Long, byte[]> encode = element -> {
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
bb.putLong(element);
return bb.array();
};
Function<ByteBuffer, Long> decode = buffer -> buffer.order(ByteOrder.BIG_ENDIAN).getLong();
this.unusedIds = setFactory.createSet("unusedIds", encode, decode);
this.nextUnusedIds = setFactory.createSet("nextUnusedIds", encode, decode);
boolean initialized = false;
try {
namespaceStore = new NamespaceStore(dataDir);
Expand Down Expand Up @@ -266,12 +264,11 @@ public void close() throws SailException {
} finally {
tripleStore.close();
}

}

} finally {
if (unusedIds != null) {
unusedIds.close();
if (setFactory != null) {
setFactory.close();
setFactory = null;
}
}
}
Expand Down Expand Up @@ -460,8 +457,17 @@ protected void filterUsedIdsInTripleStore() throws IOException {

protected void handleRemovedIdsInValueStore() throws IOException {
if (!unusedIds.isEmpty()) {
valueStore.gcIds(unusedIds);
unusedIds.clear();
do {
valueStore.gcIds(unusedIds, nextUnusedIds);
unusedIds.clear();
if (!nextUnusedIds.isEmpty()) {
// swap sets
PersistentSet<Long> ids = unusedIds;
unusedIds = nextUnusedIds;
nextUnusedIds = ids;
filterUsedIdsInTripleStore();
}
} while (!unusedIds.isEmpty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected void handleClose() throws SailException {
recordIt.close();
}

protected SailException causeIOException(IOException e) {
private SailException causeIOException(IOException e) {
return new SailException(e);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 Eclipse RDF4J contributors.
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
Expand All @@ -11,16 +11,9 @@
package org.eclipse.rdf4j.sail.lmdb;

import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOOVERWRITE;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
Expand All @@ -30,114 +23,52 @@
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_renew;
import static org.lwjgl.util.lmdb.LMDB.mdb_del;
import static org.lwjgl.util.lmdb.LMDB.mdb_drop;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_create;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_open;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs;
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.StampedLock;

import org.apache.commons.io.FileUtils;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Mode;
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
import org.lwjgl.PointerBuffer;
import org.lwjgl.system.MemoryStack;
import org.lwjgl.util.lmdb.MDBStat;
import org.lwjgl.util.lmdb.MDBVal;

/**
* A LMDB-based persistent set.
*/
class PersistentSet<T extends Serializable> extends AbstractSet<T> {

private final Path dbDir;
private final long env;
private PersistentSetFactory<T> factory;
private final int dbi;
private TxnManager txnManager;
private long writeTxn;
private PointerBuffer writeTxnPp = PointerBuffer.allocateDirect(1);
private long mapSize = 1048576; // 1 MiB
private long pageSize;

private int size;

public PersistentSet(File cacheDir) throws IOException {
try (MemoryStack stack = stackPush()) {
PointerBuffer pp = stack.mallocPointer(1);
E(mdb_env_create(pp));
env = pp.get(0);

txnManager = new TxnManager(env, Mode.ABORT);

E(mdb_env_set_maxdbs(env, 2));
E(mdb_env_set_mapsize(env, mapSize));

int flags = MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC;

dbDir = Files.createTempDirectory(cacheDir.toPath(), "set");
E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
dbi = openDatabase(env, "elements", MDB_CREATE, null);

MDBStat stat = MDBStat.malloc(stack);
readTransaction(env, (stack2, txn) -> {
E(mdb_stat(txn, dbi, stat));
pageSize = stat.ms_psize();
return null;
});
}
}

public synchronized void close() throws IOException {
if (writeTxn != 0) {
mdb_txn_abort(writeTxn);
writeTxn = 0;
}

// We don't need to free the pointer because it was allocated
// by java.nio.ByteBuffer, which will handle freeing for us.
// writeTxnPp.free();

mdb_env_close(env);
FileUtils.deleteDirectory(dbDir.toFile());
}

protected synchronized void commit() throws IOException {
if (writeTxn != 0) {
E(mdb_txn_commit(writeTxn));
writeTxn = 0;
}
public PersistentSet(PersistentSetFactory<T> factory, int dbi) {
this.factory = factory;
this.dbi = dbi;
}

public synchronized void clear() {
if (writeTxn != 0) {
mdb_txn_abort(writeTxn);
writeTxn = 0;
if (factory.writeTxn != 0) {
mdb_txn_abort(factory.writeTxn);
factory.writeTxn = 0;
}
try {
// start a write transaction
E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
writeTxn = writeTxnPp.get(0);
mdb_drop(writeTxn, dbi, false);
commit();
E(mdb_txn_begin(factory.env, NULL, 0, factory.writeTxnPp));
factory.writeTxn = factory.writeTxnPp.get(0);
mdb_drop(factory.writeTxn, dbi, false);
factory.commit();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -147,7 +78,7 @@ public synchronized void clear() {
@Override
public Iterator<T> iterator() {
try {
commit();
factory.commit();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -175,34 +106,14 @@ public boolean remove(Object element) {
}
}

protected synchronized boolean update(Object element, boolean add) throws IOException {
private synchronized boolean update(Object element, boolean add) throws IOException {
try (MemoryStack stack = MemoryStack.stackPush()) {
if (writeTxn == 0) {
if (factory.writeTxn == 0) {
// start a write transaction
E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
writeTxn = writeTxnPp.get(0);
}
if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) {
StampedLock lock = txnManager.lock();
long stamp = lock.writeLock();
try {
txnManager.deactivate();

// resize map
E(mdb_txn_commit(writeTxn));
mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
E(mdb_env_set_mapsize(env, mapSize));

E(mdb_txn_begin(env, NULL, 0, writeTxnPp));
writeTxn = writeTxnPp.get(0);
} finally {
try {
txnManager.activate();
} finally {
lock.unlockWrite(stamp);
}
}
E(mdb_txn_begin(factory.env, NULL, 0, factory.writeTxnPp));
factory.writeTxn = factory.writeTxnPp.get(0);
}
factory.ensureResize();

MDBVal keyVal = MDBVal.malloc(stack);
// use calloc to get an empty data value
Expand All @@ -215,13 +126,13 @@ protected synchronized boolean update(Object element, boolean add) throws IOExce
keyVal.mv_data(keyBuf);

if (add) {
if (mdb_put(writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
if (mdb_put(factory.writeTxn, dbi, keyVal, dataVal, MDB_NOOVERWRITE) == MDB_SUCCESS) {
size++;
return true;
}
} else {
// delete element
if (mdb_del(writeTxn, dbi, keyVal, dataVal) == MDB_SUCCESS) {
if (mdb_del(factory.writeTxn, dbi, keyVal, dataVal) == MDB_SUCCESS) {
size--;
return true;
}
Expand All @@ -246,7 +157,7 @@ protected T read(ByteBuffer buffer) throws IOException {
}
}

protected class ElementIterator implements Iterator<T> {
private class ElementIterator implements Iterator<T> {

private final MDBVal keyData = MDBVal.malloc();
private final MDBVal valueData = MDBVal.malloc();
Expand All @@ -259,9 +170,9 @@ protected class ElementIterator implements Iterator<T> {
private T next;
private T current;

protected ElementIterator(int dbi) {
private ElementIterator(int dbi) {
try {
this.txnRef = txnManager.createReadTxn();
this.txnRef = factory.txnManager.createReadTxn();
this.txnLock = txnRef.lock();

long stamp = txnLock.readLock();
Expand Down Expand Up @@ -306,7 +217,7 @@ public T next() {
return current;
}

public T computeNext() throws IOException {
private T computeNext() throws IOException {
long stamp = txnLock.readLock();
try {
if (txnRefVersion != txnRef.version()) {
Expand Down
Loading

0 comments on commit 592b04a

Please sign in to comment.