Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 12 additions & 32 deletions common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@
* Abstraction for a local key/value store for storing app data.
*
* <p>
* Use {@link KVStoreBuilder} to create an instance. There are two main features provided by the
* implementations of this interface:
* There are two main features provided by the implementations of this interface:
* </p>
*
* <ul>
* <li>serialization: this feature is not optional; data will be serialized to and deserialized
* from the underlying data store using a {@link KVStoreSerializer}, which can be customized by
* the application. The serializer is based on Jackson, so it supports all the Jackson annotations
* for controlling the serialization of app-defined types.</li>
* <h3>Serialization</h3>
*
* <li>key management: by using {@link #read(Class, Object)} and {@link #write(Class, Object)},
* applications can leave key management to the implementation. For applications that want to
* manage their own keys, the {@link #get(byte[], Class)} and {@link #set(byte[], Object)} methods
* are available.</li>
* </ul>
* <p>
* Data will be serialized to and deserialized from the underlying data store using a
* {@link KVStoreSerializer}, which can be customized by the application. The serializer is
* based on Jackson, so it supports all the Jackson annotations for controlling the serialization
* of app-defined types.
* </p>
*
* <p>
* Data is also automatically compressed to save disk space.
* </p>
*
* <h3>Automatic Key Management</h3>
*
Expand Down Expand Up @@ -78,26 +78,6 @@ public interface KVStore extends Closeable {
*/
void setMetadata(Object value) throws Exception;

/**
* Returns the value of a specific key, deserialized to the given type.
*/
<T> T get(byte[] key, Class<T> klass) throws Exception;

/**
* Write a single key directly to the store, atomically.
*/
void put(byte[] key, Object value) throws Exception;

/**
* Removes a key from the store.
*/
void delete(byte[] key) throws Exception;

/**
* Returns an iterator that will only list values with keys starting with the given prefix.
*/
<T> KVStoreIterator<T> iterator(byte[] prefix, Class<T> klass) throws Exception;

/**
* Read a specific instance of an object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
* </p>
*
* <p>
* The iterators returns by this view are of type {@link KVStoreIterator}; they auto-close
* The iterators returned by this view are of type {@link KVStoreIterator}; they auto-close
* when used in a for loop that exhausts their contents, but when used manually, they need
* to be closed explicitly unless all elements are read.
* to be closed explicitly unless all elements are read. For this reason, it's recommended
* that {@link #last(Object)} and {@link #max(long)} be used to make it easier to release
* resources associated with the iterator by better controlling how many elements will be
* retrieved.
* </p>
*/
public abstract class KVStoreView<T> implements Iterable<T> {
Expand All @@ -43,7 +46,9 @@ public abstract class KVStoreView<T> implements Iterable<T> {
boolean ascending = true;
String index = KVIndex.NATURAL_INDEX_NAME;
Object first = null;
Object last = null;
long skip = 0L;
long max = Long.MAX_VALUE;

public KVStoreView(Class<T> type) {
this.type = type;
Expand Down Expand Up @@ -74,7 +79,25 @@ public KVStoreView<T> first(Object value) {
}

/**
* Skips a number of elements in the resulting iterator.
* Stops iteration at the given value of the chosen index.
*/
public KVStoreView<T> last(Object value) {
this.last = value;
return this;
}

/**
* Stops iteration after a number of elements has been retrieved.
*/
public KVStoreView<T> max(long max) {
Preconditions.checkArgument(max > 0L, "max must be positive.");
this.max = max;
return this;
}

/**
* Skips a number of elements at the start of iteration. Skipped elements are not accounted
* when using {@link #max(long)}.
*/
public KVStoreView<T> skip(long n) {
this.skip = n;
Expand Down
20 changes: 5 additions & 15 deletions common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
this.types = new ConcurrentHashMap<>();

Options options = new Options();
options.createIfMissing(!path.exists());
options.createIfMissing(true);
this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));

byte[] versionData = db().get(STORE_VERSION_KEY);
Expand Down Expand Up @@ -109,31 +109,19 @@ public void setMetadata(Object value) throws Exception {
}
}

@Override
public <T> T get(byte[] key, Class<T> klass) throws Exception {
<T> T get(byte[] key, Class<T> klass) throws Exception {
byte[] data = db().get(key);
if (data == null) {
throw new NoSuchElementException(new String(key, UTF_8));
}
return serializer.deserialize(data, klass);
}

@Override
public void put(byte[] key, Object value) throws Exception {
private void put(byte[] key, Object value) throws Exception {
Preconditions.checkArgument(value != null, "Null values are not allowed.");
db().put(key, serializer.serialize(value));
}

@Override
public void delete(byte[] key) throws Exception {
db().delete(key);
}

@Override
public <T> KVStoreIterator<T> iterator(byte[] prefix, Class<T> klass) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
Expand Down Expand Up @@ -189,6 +177,8 @@ public void delete(Class<?> type, Object naturalKey, boolean sync) throws Except
batch.write(sync);
}
}
} catch (NoSuchElementException nse) {
// Ignore.
} finally {
batch.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
private final LevelDBTypeInfo.Index index;
private final byte[] indexKeyPrefix;
private final byte[] end;
private final long max;

private boolean checkedNext;
private T next;
private boolean closed;
private long count;

/**
* Creates a simple iterator over db keys.
Expand All @@ -55,6 +57,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
this.it = db.db().iterator();
this.indexKeyPrefix = keyPrefix;
this.end = null;
this.max = -1L;
it.seek(keyPrefix);
}

Expand All @@ -69,6 +72,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
this.ti = db.getTypeInfo(type);
this.index = ti.index(params.index);
this.indexKeyPrefix = index.keyPrefix();
this.max = params.max;

byte[] firstKey;
if (params.first != null) {
Expand All @@ -84,14 +88,27 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
}
it.seek(firstKey);

byte[] end = null;
if (ascending) {
this.end = index.end();
end = params.last != null ? index.end(params.last) : index.end();
} else {
this.end = null;
if (params.last != null) {
end = index.start(params.last);
}
if (it.hasNext()) {
it.next();
// When descending, the caller may have set up the start of iteration at a non-existant
// entry that is guaranteed to be after the desired entry. For example, if you have a
// compound key (a, b) where b is a, integer, you may seek to the end of the elements that
// have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
// exist in the database. So need to check here whether the next value actually belongs to
// the set being returned by the iterator before advancing.
byte[] nextKey = it.peekNext().getKey();
if (compare(nextKey, indexKeyPrefix) <= 0) {
it.next();
}
}
}
this.end = end;

if (params.skip > 0) {
skip(params.skip);
Expand Down Expand Up @@ -164,7 +181,23 @@ public synchronized void close() throws IOException {
}
}

/**
* Because it's tricky to expose closeable iterators through many internal APIs, especially
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@Override
protected void finalize() throws Throwable {
if (db.db() != null) {
close();
}
}

private T loadNext() {
if (count >= max) {
return null;
}

try {
while (true) {
boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
Expand All @@ -191,11 +224,16 @@ private T loadNext() {
return null;
}

// If there's a known end key and it's found, stop.
if (end != null && Arrays.equals(nextKey, end)) {
return null;
// If there's a known end key and iteration has gone past it, stop.
if (end != null) {
int comp = compare(nextKey, end) * (ascending ? 1 : -1);
if (comp > 0) {
return null;
}
}

count++;

// Next element is part of the iteration, return it.
if (index == null || index.isCopy()) {
return db.serializer.deserialize(nextEntry.getValue(), type);
Expand Down Expand Up @@ -246,4 +284,17 @@ private byte[] stitch(byte[]... comps) {
return dest;
}

private int compare(byte[] a, byte[] b) {
int diff = 0;
int minLen = Math.min(a.length, b.length);
for (int i = 0; i < minLen; i++) {
diff += (a[i] - b[i]);
if (diff != 0) {
return diff;
}
}

return a.length - b.length;
}

}
Loading