From fd3976076a27f6f906c79bedb7c0c23a34a3abca Mon Sep 17 00:00:00 2001 From: Shawn Yang Date: Sat, 27 Jul 2024 18:11:03 +0800 Subject: [PATCH] feat(java): support deep ref copy (#1771) ## What does this PR do? support deep ref copy ## Related issues Closes #1747 https://github.com/apache/fury/issues/1679 ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark --- .../src/main/java/org/apache/fury/Fury.java | 29 ++++++++++---- .../serializer/AbstractObjectSerializer.java | 32 +++++++-------- .../fury/serializer/ArraySerializers.java | 7 ++-- .../collection/CollectionSerializer.java | 4 -- .../collection/CollectionSerializers.java | 4 +- .../serializer/collection/MapSerializer.java | 4 -- .../serializer/collection/MapSerializers.java | 4 +- .../collection/SynchronizedSerializers.java | 6 +++ .../collection/UnmodifiableSerializers.java | 6 +++ .../java/org/apache/fury/FuryCopyTest.java | 40 +++++++++++++++++++ .../SynchronizedSerializersTest.java | 9 ++++- 11 files changed, 103 insertions(+), 42 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 52c8e66987..e430a42638 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -309,7 +309,7 @@ private StackOverflowError processCopyStackOverflowError(StackOverflowError e) { if (!copyRefTracking) { String msg = "Object may contain circular references, please enable ref tracking " - + "by `FuryBuilder#withCopyRefTracking(true)`"; + + "by `FuryBuilder#withRefCopy(true)`"; StackOverflowError t1 = ExceptionUtils.trySetStackOverflowErrorMessage(e, msg); if (t1 != null) { return t1; @@ -1337,9 +1337,7 @@ public T copyObject(T obj) { break; // todo: add fastpath for other types. default: - copyDepth++; - copy = classInfo.getSerializer().copy(obj); - copyDepth--; + copy = copyObject(obj, classInfo.getSerializer()); } return (T) copy; } @@ -1369,10 +1367,26 @@ public T copyObject(T obj, int classId) { case ClassResolver.STRING_CLASS_ID: return obj; default: - return (T) classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer().copy(obj); + return copyObject(obj, classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer()); } } + public T copyObject(T obj, Serializer serializer) { + copyDepth++; + T copyObject; + if (serializer.needToCopyRef()) { + copyObject = getCopyObject(obj); + if (copyObject == null) { + copyObject = serializer.copy(obj); + originToCopyMap.put(obj, copyObject); + } + } else { + copyObject = serializer.copy(obj); + } + copyDepth--; + return copyObject; + } + /** * Track ref for copy. * @@ -1388,8 +1402,9 @@ public void reference(T o1, T o2) { } } - public Object getCopyObject(Object originObj) { - return originToCopyMap.get(originObj); + @SuppressWarnings("unchecked") + public T getCopyObject(T originObj) { + return (T) originToCopyMap.get(originObj); } private void serializeToStream(OutputStream outputStream, Consumer function) { diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java index 7a53478c93..2366c6adaa 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/AbstractObjectSerializer.java @@ -80,31 +80,29 @@ public T copy(T originObj) { return originObj; } if (isRecord) { - Object[] fieldValues = copyFields(originObj); - try { - T t = (T) constructor.invokeWithArguments(fieldValues); - Arrays.fill(copyRecordInfo.getRecordComponents(), null); - return t; - } catch (Throwable e) { - Platform.throwException(e); - } - return originObj; + return copyRecord(originObj); } - T newObj; + T newObj = newBean(); if (needToCopyRef) { - T copyObject = (T) fury.getCopyObject(originObj); - if (copyObject != null) { - return copyObject; - } - newObj = newBean(); fury.reference(originObj, newObj); - } else { - newObj = newBean(); } copyFields(originObj, newObj); return newObj; } + private T copyRecord(T originObj) { + Object[] fieldValues = copyFields(originObj); + try { + T t = (T) constructor.invokeWithArguments(fieldValues); + Arrays.fill(copyRecordInfo.getRecordComponents(), null); + fury.reference(originObj, t); + return t; + } catch (Throwable e) { + Platform.throwException(e); + } + return originObj; + } + private Object[] copyFields(T originObj) { InternalFieldInfo[] fieldInfos = this.fieldInfos; if (fieldInfos == null) { diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java b/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java index 6ac83a2ed1..c38b2bae96 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/ArraySerializers.java @@ -113,6 +113,9 @@ public void write(MemoryBuffer buffer, T[] arr) { public T[] copy(T[] originArray) { int length = originArray.length; Object[] newArray = newArray(length); + if (needToCopyRef) { + fury.reference(originArray, newArray); + } Serializer componentSerializer = this.componentTypeSerializer; if (componentSerializer != null) { if (componentSerializer.isImmutable()) { @@ -142,7 +145,6 @@ public void xwrite(MemoryBuffer buffer, T[] arr) { @Override public T[] read(MemoryBuffer buffer) { - // Some jdk8 will crash if use varint, why? int numElements = buffer.readVarUint32Small7(); boolean isFinal = (numElements & 0b1) != 0; numElements >>>= 1; @@ -151,9 +153,6 @@ public T[] read(MemoryBuffer buffer) { refResolver.reference(value); if (isFinal) { final Serializer componentTypeSerializer = this.componentTypeSerializer; - if (componentTypeSerializer == null) { - System.out.println("======="); - } for (int i = 0; i < numElements; i++) { Object elem; int nextReadRefId = refResolver.tryPreserveRefId(buffer); diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java index bc94dde15e..4a507b6963 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializer.java @@ -59,10 +59,6 @@ public T copy(T originCollection) { } Collection newCollection = newCollection(originCollection); if (needToCopyRef) { - Collection copyObject = (Collection) fury.getCopyObject(originCollection); - if (copyObject != null) { - return (T) copyObject; - } fury.reference(originCollection, newCollection); } copyElements(originCollection, newCollection); diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java index 8616a39e5c..96a8b6b4ca 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/CollectionSerializers.java @@ -707,7 +707,7 @@ public void write(MemoryBuffer buffer, T value) { @Override public T copy(T originCollection) { - return dataSerializer.copy(originCollection); + return fury.copyObject(originCollection, dataSerializer); } @Override @@ -755,7 +755,7 @@ public void write(MemoryBuffer buffer, T value) { @Override public T copy(T value) { - return (T) serializer.copy(value); + return fury.copyObject(value, (Serializer) serializer); } } diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java index 20af82ab26..046f0a7e82 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java @@ -51,10 +51,6 @@ public T copy(T originMap) { } Map newMap = newMap(originMap); if (needToCopyRef) { - Map copyMap = (Map) fury.getCopyObject(originMap); - if (copyMap != null) { - return (T) copyMap; - } fury.reference(originMap, newMap); } copyEntry(originMap, newMap); diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java index bf4f3af617..be4abfcbbb 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializers.java @@ -449,7 +449,7 @@ public void write(MemoryBuffer buffer, T value) { @Override public T copy(T value) { - return dataSerializer.copy(value); + return fury.copyObject(value, dataSerializer); } @Override @@ -501,7 +501,7 @@ public void write(MemoryBuffer buffer, T value) { @Override public T copy(T value) { - return (T) serializer.copy(value); + return fury.copyObject(value, (Serializer) serializer); } } diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java index 6d79b5f0f8..c7e7c87357 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/SynchronizedSerializers.java @@ -100,6 +100,12 @@ public Collection read(MemoryBuffer buffer) { final Object sourceCollection = fury.readRef(buffer); return (Collection) factory.apply(sourceCollection); } + + @Override + public Collection copy(Collection object) { + final Object collection = Platform.getObject(object, offset); + return (Collection) factory.apply(fury.copyObject(collection)); + } } public static final class SynchronizedMapSerializer extends MapSerializer { diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java index d6d0c52c07..84c71ae4c9 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/UnmodifiableSerializers.java @@ -99,6 +99,12 @@ public Collection read(MemoryBuffer buffer) { final Object sourceCollection = fury.readRef(buffer); return (Collection) factory.apply(sourceCollection); } + + @Override + public Collection copy(Collection object) { + final Object collection = Platform.getObject(object, offset); + return (Collection) factory.apply(fury.copyObject(collection)); + } } public static final class UnmodifiableMapSerializer extends MapSerializer { diff --git a/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java b/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java index 213e82e962..14abfa941f 100644 --- a/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java @@ -19,6 +19,9 @@ package org.apache.fury; +import static org.apache.fury.serializer.collection.MapSerializersTest.createMapFieldsObject; +import static org.testng.Assert.assertEquals; + import com.google.common.collect.ImmutableList; import java.math.BigDecimal; import java.math.BigInteger; @@ -77,9 +80,13 @@ import org.apache.fury.serializer.EnumSerializerTest; import org.apache.fury.serializer.EnumSerializerTest.EnumFoo; import org.apache.fury.serializer.collection.ChildContainerSerializersTest.ChildArrayDeque; +import org.apache.fury.serializer.collection.SynchronizedSerializersTest; +import org.apache.fury.serializer.collection.UnmodifiableSerializersTest; import org.apache.fury.test.bean.BeanA; import org.apache.fury.test.bean.BeanB; +import org.apache.fury.test.bean.CollectionFields; import org.apache.fury.test.bean.Cyclic; +import org.apache.fury.test.bean.MapFields; import org.apache.fury.util.DateTimeUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -344,4 +351,37 @@ public String toString() { return "B{" + "name='" + name + '\'' + ", list=" + list + '}'; } } + + @Test + public void testCircularRefCopy() { + Cyclic cyclic = Cyclic.create(true); + Fury fury = builder().withRefTracking(true).withRefCopy(true).build(); + assertEquals(fury.copy(cyclic), cyclic); + } + + @Test + public void testComplexMapCopy() { + Fury fury = builder().withRefTracking(true).withRefCopy(true).build(); + { + MapFields mapFields = UnmodifiableSerializersTest.createMapFields(); + assertEquals(fury.copy(mapFields), mapFields); + } + { + MapFields obj = createMapFieldsObject(); + assertEquals(fury.copy(obj), obj); + } + } + + @Test + public void testComplexCollectionCopy() { + Fury fury = builder().withRefTracking(true).withRefCopy(true).build(); + { + CollectionFields collectionFields = SynchronizedSerializersTest.createCollectionFields(); + assertEquals(fury.copy(collectionFields).toCanEqual(), collectionFields.toCanEqual()); + } + { + CollectionFields collectionFields = UnmodifiableSerializersTest.createCollectionFields(); + assertEquals(fury.copy(collectionFields).toCanEqual(), collectionFields.toCanEqual()); + } + } } diff --git a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/SynchronizedSerializersTest.java b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/SynchronizedSerializersTest.java index 522ff96972..4056023e8f 100644 --- a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/SynchronizedSerializersTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/SynchronizedSerializersTest.java @@ -98,6 +98,12 @@ public void testWrite() throws Exception { @Test(dataProvider = "javaFury") public void testCollectionFieldSerializers(Fury fury) { + CollectionFields obj = createCollectionFields(); + Object newObj = serDe(fury, obj); + assertEquals(((CollectionFields) (newObj)).toCanEqual(), obj.toCanEqual()); + } + + public static CollectionFields createCollectionFields() { CollectionFields obj = new CollectionFields(); Collection collection = Collections.synchronizedCollection(Arrays.asList(1, 2)); obj.collection = collection; @@ -126,7 +132,6 @@ public void testCollectionFieldSerializers(Fury fury) { Collections.synchronizedSortedMap(new TreeMap<>(ImmutableMap.of(1, 2))); obj.sortedMap = sortedMap; obj.sortedMap2 = sortedMap; - Object newObj = serDe(fury, obj); - assertEquals(((CollectionFields) (newObj)).toCanEqual(), obj.toCanEqual()); + return obj; } }