Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.lang.reflect.Array;

import com.google.common.base.Objects;

/**
* A class that wraps a generic array so that it can be used as a key in a map, sorted or not.
*
* The implementation of {@link #compareTo(GenericArrayKey)} makes two assumptions:
* - All elements are instances of Comparable
* - When comparing two arrays, they both contain elements of the same type in corresponding
* indices.
*
* Otherwise, ClassCastExceptions may occur. The equality method can compare any two arrays.
*
* This class is not efficient and is mostly meant to compare really small arrays, like those
* generally used as indices and keys in a KVStore.
*/
public class GenericArrayKey implements Comparable<GenericArrayKey> {

private final Object key;

public GenericArrayKey(Object key) {
this.key = key;
}

@Override
public boolean equals(Object other) {
if (!(other instanceof GenericArrayKey)) {
return false;
}

Object comp = ((GenericArrayKey) other).key;
int l1 = Array.getLength(key);
int l2 = Array.getLength(comp);

if (l1 != l2) {
return false;
}

for (int i = 0; i < l1; i++) {
if (!Objects.equal(Array.get(key, i), Array.get(comp, i))) {
return false;
}
}

return true;
}

@Override
public int hashCode() {
int code = 0;
int length = Array.getLength(key);
for (int i = 0; i < length; i++) {
code += Array.get(key, i).hashCode();
}
return 31 * code;
}

@Override
@SuppressWarnings("unchecked")
public int compareTo(GenericArrayKey other) {
int len = Math.min(Array.getLength(key), Array.getLength(other.key));
for (int i = 0; i < len; i++) {
int diff = ((Comparable<Object>) Array.get(key, i)).compareTo(
(Comparable<Object>) Array.get(other.key, i));
if (diff != 0) {
return diff;
}
}

return Array.getLength(key) - Array.getLength(other.key);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

/**
* Implementation of KVStore that keeps data deserialized in memory. This store does not index
* data; instead, whenever iterating over an indexed field, the stored data is copied and sorted
* according to the index. This saves memory but makes iteration more expensive.
*/
public class InMemoryStore implements KVStore {

private Object metadata;
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();

@Override
public <T> T getMetadata(Class<T> klass) {
return klass.cast(metadata);
}

@Override
public void setMetadata(Object value) {
this.metadata = value;
}

@Override
public long count(Class<?> type) {
InstanceList list = data.get(type);
return list != null ? list.size() : 0;
}

@Override
public <T> T read(Class<T> klass, Object naturalKey) {
InstanceList list = data.get(klass);
Object value = list != null ? list.get(naturalKey) : null;
if (value == null) {
throw new NoSuchElementException();
}
return klass.cast(value);
}

@Override
public void write(Object value) throws Exception {
InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
try {
return new InstanceList(key);
} catch (Exception e) {
throw Throwables.propagate(e);
}
});
list.put(value);
}

@Override
public void delete(Class<?> type, Object naturalKey) {
InstanceList list = data.get(type);
if (list != null) {
list.delete(naturalKey);
}
}

@Override
public <T> KVStoreView<T> view(Class<T> type){
InstanceList list = data.get(type);
return list != null ? list.view(type)
: new InMemoryView<>(type, Collections.<T>emptyList(), null);
}

@Override
public void close() {
metadata = null;
data.clear();
}

@SuppressWarnings("unchecked")
private static Comparable<Object> asKey(Object in) {
if (in.getClass().isArray()) {
in = new GenericArrayKey(in);
}
return (Comparable<Object>) in;
}

private static class InstanceList {

private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentSkipListMap<Comparable<Object>, Object> data;

private InstanceList(Class<?> type) throws Exception {
this.ti = new KVTypeInfo(type);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentSkipListMap<>((k1, k2) -> k1.compareTo(k2));
}

public Object get(Object key) {
return data.get(asKey(key));
}

public void put(Object value) throws Exception {
Preconditions.checkArgument(ti.type().equals(value.getClass()),
"Unexpected type: %s", value.getClass());
data.put(asKey(naturalKey.get(value)), value);
}

public void delete(Object key) {
data.remove(asKey(key));
}

public int size() {
return data.size();
}

@SuppressWarnings("unchecked")
public <T> InMemoryView<T> view(Class<T> type) {
Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type);
Collection<T> all = (Collection<T>) data.values();
return new InMemoryView(type, all, ti);
}

}

private static class InMemoryView<T> extends KVStoreView<T> {

private final Collection<T> elements;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
super(type);
this.elements = elements;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
}

@Override
public Iterator<T> iterator() {
if (elements.isEmpty()) {
return new InMemoryIterator<>(elements.iterator());
}

try {
Collection<T> iterable = this.elements;
KVTypeInfo.Accessor getter = ti.getAccessor(index);
int modifier = ascending ? 1 : -1;

// Unless iterating over the natural index in ascending order, make a copy of the element
// list and sort it appropriately.
if (!KVIndex.NATURAL_INDEX_NAME.equals(index)) {
List<T> copy = new ArrayList<>(iterable);
Collections.sort(copy, (e1, e2) -> modifier * compare(e1, e2, getter));
iterable = copy;
} else if (!ascending) {
List<T> copy = new ArrayList<>(iterable);
Collections.reverse(copy);
iterable = copy;
}

Stream<T> stream = iterable.stream();

if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}

if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}

if (max < iterable.size()) {
stream = stream.limit((int) max);
}

if (skip > 0) {
stream = stream.skip(skip);
}

return new InMemoryIterator<>(stream.iterator());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
try {
int diff = compare(e1, getter, getter.get(e2));
if (diff == 0) {
diff = compare(e1, natural, natural.get(e2));
}
return diff;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
try {
return asKey(getter.get(e1)).compareTo(asKey(v2));
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

}

private static class InMemoryIterator<T> implements KVStoreIterator<T> {

private final Iterator<T> iter;

InMemoryIterator(Iterator<T> iter) {
this.iter = iter;
}

@Override
public boolean hasNext() {
return iter.hasNext();
}

@Override
public T next() {
return iter.next();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public List<T> next(int max) {
List<T> list = new ArrayList<>(max);
while (hasNext() && list.size() < max) {
list.add(next());
}
return list;
}

@Override
public boolean skip(long n) {
long skipped = 0;
while (skipped < n) {
if (hasNext()) {
next();
skipped++;
} else {
return false;
}
}

return true;
}

@Override
public void close() {
// no op.
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void checkIndex(KVIndex idx) {
"Duplicate index %s for type %s.", idx.value(), type.getName());
}

public Class<?> getType() {
public Class<?> type() {
return type;
}

Expand Down
Loading