Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(java): fix collection view serialization #1833

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public abstract class AbstractCollectionSerializer<T> extends Serializer<T> {
private MethodHandle constructor;
private int numElements;
private final boolean supportCodegenHook;
protected final boolean supportCodegenHook;
// TODO remove elemSerializer, support generics in CompatibleSerializer.
private Serializer<?> elemSerializer;
protected final ClassInfoHolder elementClassInfoHolder;
Expand Down Expand Up @@ -514,23 +514,9 @@ public Collection newCollection(MemoryBuffer buffer) {
}
}

/** Create a new empty collection for copy. */
public Collection newCollection(Collection collection) {
numElements = collection.size();
return newCollection();
}

/**
* Collection must have default constructor to be invoked by fury, otherwise created object can't
* be used to adding elements. For example:
*
* <pre>{@code new ArrayList<Integer> {add(1);}}</pre>
*
* <p>without default constructor, created list will have elementData as null, adding elements
* will raise NPE.
*
* @return empty collection instance
*/
public Collection newCollection() {
if (constructor == null) {
constructor = ReflectionUtils.getCtrHandle(type, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,23 +800,9 @@ public Map newMap(MemoryBuffer buffer) {
}
}

/** Create a new empty map for copy. */
public Map newMap(Map map) {
numElements = map.size();
return newMap();
}

/**
* Map must have default constructor to be invoked by fury, otherwise created object can't be used
* to adding elements. For example:
*
* <pre>{@code new ArrayList<Integer> {add(1);}}</pre>
*
* <p>without default constructor, created list will have elementData as null, adding elements
* will raise NPE.
*
* @return empty map instance
*/
public Map newMap() {
if (constructor == null) {
constructor = ReflectionUtils.getCtrHandle(type, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import org.apache.fury.Fury;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.util.Preconditions;

/** Base serializer for all java collections. */
@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -55,6 +56,7 @@ public T copy(T originCollection) {
if (isImmutable()) {
return originCollection;
}
Preconditions.checkArgument(supportCodegenHook);
Collection newCollection = newCollection(originCollection);
if (needToCopyRef) {
fury.reference(originCollection, newCollection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.fury.serializer.collection;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,12 +52,15 @@
import org.apache.fury.memory.Platform;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.resolver.ClassInfo;
import org.apache.fury.resolver.ClassInfoHolder;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.resolver.RefResolver;
import org.apache.fury.serializer.ReplaceResolveSerializer;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.Serializers;
import org.apache.fury.type.Type;
import org.apache.fury.util.Preconditions;
import org.apache.fury.util.unsafe._JDKAccess;

/**
* Serializers for classes implements {@link Collection}. All collection serializers should extend
Expand Down Expand Up @@ -446,21 +450,33 @@ public ConcurrentSkipListSetSerializer(Fury fury, Class<ConcurrentSkipListSet> c
public ConcurrentSkipListSet newCollection(MemoryBuffer buffer) {
int numElements = buffer.readVarUint32Small7();
setNumElements(numElements);
RefResolver refResolver = fury.getRefResolver();
int refId = refResolver.lastPreservedRefId();
// It's possible that comparator/elements has circular ref to set.
Comparator comparator = (Comparator) fury.readRef(buffer);
ConcurrentSkipListSet skipListSet = new ConcurrentSkipListSet(comparator);
fury.getRefResolver().reference(skipListSet);
refResolver.setReadObject(refId, skipListSet);
return skipListSet;
}
}

public static final class SetFromMapSerializer extends CollectionSerializer<Set<?>> {

private static final long MAP_FIELD_OFFSET;
private static final List EMPTY_COLLECTION_STUB = new ArrayList<>();

private static final MethodHandle m;

private static final MethodHandle s;

static {
try {
Field mapField = Class.forName("java.util.Collections$SetFromMap").getDeclaredField("m");
Class<?> type = Class.forName("java.util.Collections$SetFromMap");
Field mapField = type.getDeclaredField("m");
MAP_FIELD_OFFSET = Platform.objectFieldOffset(mapField);
MethodHandles.Lookup lookup = _JDKAccess._trustedLookup(type);
m = lookup.findSetter(type, "m", Map.class);
s = lookup.findSetter(type, "s", Set.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -474,12 +490,27 @@ public SetFromMapSerializer(Fury fury, Class<Set<?>> type) {
public Collection newCollection(MemoryBuffer buffer) {
final ClassInfo mapClassInfo = fury.getClassResolver().readClassInfo(buffer);
final AbstractMapSerializer mapSerializer =
(AbstractMapSerializer) fury.getClassResolver().getSerializer(mapClassInfo.getCls());
Map map = mapSerializer.newMap(buffer);
final int numElements = mapSerializer.getAndClearNumElements();
setNumElements(numElements);
final Set set = Collections.newSetFromMap(map);
fury.getRefResolver().reference(set);
(AbstractMapSerializer) mapClassInfo.getSerializer();
RefResolver refResolver = fury.getRefResolver();
// It's possible that elements or nested fields has circular ref to set.
int refId = refResolver.lastPreservedRefId();
Set set;
if (buffer.readBoolean()) {
refResolver.preserveRefId();
set = Collections.newSetFromMap(mapSerializer.newMap(buffer));
setNumElements(mapSerializer.getAndClearNumElements());
} else {
Map map = (Map) mapSerializer.read(buffer);
try {
set = Platform.newInstance(type);
m.invoke(set, map);
s.invoke(set, map.keySet());
} catch (Throwable e) {
throw new RuntimeException(e);
}
setNumElements(0);
}
refResolver.setReadObject(refId, set);
return set;
}

Expand All @@ -489,40 +520,63 @@ public Collection newCollection(Collection originCollection) {
(Map<?, Boolean>) Platform.getObject(originCollection, MAP_FIELD_OFFSET);
AbstractMapSerializer mapSerializer =
(AbstractMapSerializer) fury.getClassResolver().getSerializer(map.getClass());
Map newMap = mapSerializer.newMap();
Map newMap = mapSerializer.newMap(map);
return Collections.newSetFromMap(newMap);
}

@Override
public Collection onCollectionWrite(MemoryBuffer buffer, Set<?> value) {
final Map<?, Boolean> map = (Map<?, Boolean>) Platform.getObject(value, MAP_FIELD_OFFSET);
final ClassInfo classInfo = fury.getClassResolver().getClassInfo(map.getClass());
AbstractMapSerializer mapSerializer = (AbstractMapSerializer) classInfo.getSerializer();
fury.getClassResolver().writeClass(buffer, classInfo);
// newMap will read num size first.
buffer.writeVarUint32Small7(value.size());
return value;
if (mapSerializer.supportCodegenHook) {
buffer.writeBoolean(true);
mapSerializer.onMapWrite(buffer, map);
return value;
} else {
buffer.writeBoolean(false);
mapSerializer.write(buffer, map);
return EMPTY_COLLECTION_STUB;
}
}
}

public static final class ConcurrentHashMapKeySetView
public static final class ConcurrentHashMapKeySetViewSerializer
extends CollectionSerializer<ConcurrentHashMap.KeySetView> {
private final ClassInfoHolder mapClassInfoHolder;
private final ClassInfoHolder valueClassInfoHolder;

public ConcurrentHashMapKeySetView(Fury fury, Class<ConcurrentHashMap.KeySetView> type) {
super(fury, type);
public ConcurrentHashMapKeySetViewSerializer(
Fury fury, Class<ConcurrentHashMap.KeySetView> type) {
super(fury, type, false);
mapClassInfoHolder = fury.getClassResolver().nilClassInfoHolder();
valueClassInfoHolder = fury.getClassResolver().nilClassInfoHolder();
}

@Override
public ConcurrentHashMap.KeySetView newCollection(MemoryBuffer buffer) {
int numElements = buffer.readVarUint32Small7();
setNumElements(numElements);
ConcurrentHashMap.KeySetView keySetView = ConcurrentHashMap.newKeySet(numElements);
fury.getRefResolver().reference(keySetView);
return keySetView;
public void write(MemoryBuffer buffer, ConcurrentHashMap.KeySetView value) {
fury.writeRef(buffer, value.getMap(), mapClassInfoHolder);
fury.writeRef(buffer, value.getMappedValue(), valueClassInfoHolder);
}

@Override
public Collection newCollection(Collection collection) {
return ConcurrentHashMap.newKeySet(collection.size());
public ConcurrentHashMap.KeySetView read(MemoryBuffer buffer) {
ConcurrentHashMap map = (ConcurrentHashMap) fury.readRef(buffer, mapClassInfoHolder);
Object value = fury.readRef(buffer, valueClassInfoHolder);
return map.keySet(value);
}

@Override
public ConcurrentHashMap.KeySetView copy(ConcurrentHashMap.KeySetView value) {
ConcurrentHashMap newMap = fury.copyObject(value.getMap());
return newMap.keySet(fury.copyObject(value.getMappedValue()));
}

@Override
public Collection newCollection(MemoryBuffer buffer) {
throw new IllegalStateException(
"Should not be invoked since we set supportCodegenHook to false");
}
}

Expand Down Expand Up @@ -794,6 +848,6 @@ public static void registerDefaultSerializers(Fury fury) {
fury.registerSerializer(setFromMapClass, new SetFromMapSerializer(fury, setFromMapClass));
fury.registerSerializer(
ConcurrentHashMap.KeySetView.class,
new ConcurrentHashMapKeySetView(fury, ConcurrentHashMap.KeySetView.class));
new ConcurrentHashMapKeySetViewSerializer(fury, ConcurrentHashMap.KeySetView.class));
}
}
12 changes: 6 additions & 6 deletions java/fury-core/src/test/java/org/apache/fury/FuryTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,19 @@ public static <T> T serDeCheckSerializer(Fury fury, Object obj, String classRege
return (T) fury.deserialize(bytes);
}

public static Object serDe(Fury fury1, Fury fury2, Object obj) {
public static <T> T serDe(Fury fury1, Fury fury2, T obj) {
byte[] bytes = fury1.serialize(obj);
return fury2.deserialize(bytes);
return (T) fury2.deserialize(bytes);
}

public static Object serDeCheck(Fury fury1, Fury fury2, Object obj) {
Object o = serDe(fury1, fury2, obj);
public static <T> T serDeCheck(Fury fury1, Fury fury2, T obj) {
T o = serDe(fury1, fury2, obj);
Assert.assertEquals(o, obj);
return o;
}

public static Object serDeCheck(Fury fury, Object obj) {
Object o = serDe(fury, obj);
public static <T> T serDeCheck(Fury fury, T obj) {
T o = serDe(fury, obj);
Assert.assertEquals(o, obj);
return o;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.testng.annotations.Test;
import org.testng.collections.Maps;

@SuppressWarnings("rawtypes")
public class CollectionSerializersTest extends FuryTestBase {

@Test(dataProvider = "referenceTrackingConfig")
Expand Down Expand Up @@ -392,47 +394,70 @@ public void testCopyOnWriteArrayList(Fury fury) {
copyCheck(fury, list);
}

@Test(dataProvider = "enableCodegen")
public void testSetFromMap(boolean enableCodegen) {
final Fury fury =
Fury.builder()
.withLanguage(Language.JAVA)
.requireClassRegistration(false)
.withCodegen(enableCodegen)
.build();
final Set<String> set = Collections.newSetFromMap(Maps.newConcurrentMap());
@Data
@AllArgsConstructor
public static class CollectionViewTestStruct {
Collection<String> collection;
Set<String> set;
}

@Test(dataProvider = "javaFury")
public void testSetFromMap(Fury fury) {
Set<String> set = Collections.newSetFromMap(Maps.newConcurrentMap());
set.add("a");
set.add("b");
set.add("c");
Assert.assertEquals(set, serDe(fury, set));
serDeCheck(fury, set);
Assert.assertEquals(
getJavaFury().getClassResolver().getSerializerClass(set.getClass()),
fury.getClassResolver().getSerializerClass(set.getClass()),
CollectionSerializers.SetFromMapSerializer.class);
CollectionViewTestStruct struct1 = serDeCheck(fury, new CollectionViewTestStruct(set, set));
if (fury.trackingRef()) {
assertSame(struct1.collection, struct1.set);
}
set = Collections.newSetFromMap(new HashMap<String, Boolean>() {});
set.add("a");
set.add("b");
serDeCheck(fury, set);
CollectionViewTestStruct struct2 = serDeCheck(fury, new CollectionViewTestStruct(set, set));
if (fury.trackingRef()) {
assertSame(struct2.collection, struct2.set);
}
}

@Test(dataProvider = "furyCopyConfig")
public void testSetFromMap(Fury fury) {
public void testSetFromMapCopy(Fury fury) {
final Set<Object> set = Collections.newSetFromMap(Maps.newConcurrentMap());
set.add("a");
set.add("b");
set.add(Cyclic.create(true));
copyCheck(fury, set);
}

@Test
public void testConcurrentMapKeySetViewMap() {
final ConcurrentHashMap.KeySetView<Object, Boolean> set = ConcurrentHashMap.newKeySet();
@Test(dataProvider = "javaFury")
public void testConcurrentMapKeySetViewMap(Fury fury) {
final ConcurrentHashMap.KeySetView<String, Boolean> set = ConcurrentHashMap.newKeySet();
set.add("a");
set.add("b");
set.add("c");
Assert.assertEquals(set, serDe(getJavaFury(), set));
Assert.assertEquals(set, serDe(fury, set));
Assert.assertEquals(
getJavaFury().getClassResolver().getSerializerClass(set.getClass()),
CollectionSerializers.ConcurrentHashMapKeySetView.class);
fury.getClassResolver().getSerializerClass(set.getClass()),
CollectionSerializers.ConcurrentHashMapKeySetViewSerializer.class);
CollectionViewTestStruct o = serDeCheck(fury, new CollectionViewTestStruct(set, set));
if (fury.trackingRef()) {
assertSame(o.collection, o.set);
}
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("k1", "v1");
if (fury.trackingRef()) {
ArrayList<Serializable> list = serDeCheck(fury, ofArrayList(map.keySet("v0"), map));
assertSame(((ConcurrentHashMap.KeySetView) (list.get(0))).getMap(), list.get(1));
}
}

@Test(dataProvider = "furyCopyConfig")
public void testConcurrentMapKeySetViewMap(Fury fury) {
public void testConcurrentMapKeySetViewMapCopy(Fury fury) {
final ConcurrentHashMap.KeySetView<Object, Boolean> set = ConcurrentHashMap.newKeySet();
set.add("a");
set.add("b");
Expand Down
Loading