Skip to content

Commit

Permalink
feat(java): support deep ref copy (#1771)
Browse files Browse the repository at this point in the history
## What does this PR do?

support deep ref copy

## Related issues

Closes #1747
#1679


## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?


## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Jul 27, 2024
1 parent f4865f0 commit fd39760
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 42 deletions.
29 changes: 22 additions & 7 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private StackOverflowError processCopyStackOverflowError(StackOverflowError e) {
if (!copyRefTracking) {
String msg =
"Object may contain circular references, please enable ref tracking "
+ "by `FuryBuilder#withCopyRefTracking(true)`";
+ "by `FuryBuilder#withRefCopy(true)`";
StackOverflowError t1 = ExceptionUtils.trySetStackOverflowErrorMessage(e, msg);
if (t1 != null) {
return t1;
Expand Down Expand Up @@ -1337,9 +1337,7 @@ public <T> T copyObject(T obj) {
break;
// todo: add fastpath for other types.
default:
copyDepth++;
copy = classInfo.getSerializer().copy(obj);
copyDepth--;
copy = copyObject(obj, classInfo.getSerializer());
}
return (T) copy;
}
Expand Down Expand Up @@ -1369,10 +1367,26 @@ public <T> T copyObject(T obj, int classId) {
case ClassResolver.STRING_CLASS_ID:
return obj;
default:
return (T) classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer().copy(obj);
return copyObject(obj, classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer());
}
}

public <T> T copyObject(T obj, Serializer<T> serializer) {
copyDepth++;
T copyObject;
if (serializer.needToCopyRef()) {
copyObject = getCopyObject(obj);
if (copyObject == null) {
copyObject = serializer.copy(obj);
originToCopyMap.put(obj, copyObject);
}
} else {
copyObject = serializer.copy(obj);
}
copyDepth--;
return copyObject;
}

/**
* Track ref for copy.
*
Expand All @@ -1388,8 +1402,9 @@ public <T> void reference(T o1, T o2) {
}
}

public Object getCopyObject(Object originObj) {
return originToCopyMap.get(originObj);
@SuppressWarnings("unchecked")
public <T> T getCopyObject(T originObj) {
return (T) originToCopyMap.get(originObj);
}

private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,29 @@ public T copy(T originObj) {
return originObj;
}
if (isRecord) {
Object[] fieldValues = copyFields(originObj);
try {
T t = (T) constructor.invokeWithArguments(fieldValues);
Arrays.fill(copyRecordInfo.getRecordComponents(), null);
return t;
} catch (Throwable e) {
Platform.throwException(e);
}
return originObj;
return copyRecord(originObj);
}
T newObj;
T newObj = newBean();
if (needToCopyRef) {
T copyObject = (T) fury.getCopyObject(originObj);
if (copyObject != null) {
return copyObject;
}
newObj = newBean();
fury.reference(originObj, newObj);
} else {
newObj = newBean();
}
copyFields(originObj, newObj);
return newObj;
}

private T copyRecord(T originObj) {
Object[] fieldValues = copyFields(originObj);
try {
T t = (T) constructor.invokeWithArguments(fieldValues);
Arrays.fill(copyRecordInfo.getRecordComponents(), null);
fury.reference(originObj, t);
return t;
} catch (Throwable e) {
Platform.throwException(e);
}
return originObj;
}

private Object[] copyFields(T originObj) {
InternalFieldInfo[] fieldInfos = this.fieldInfos;
if (fieldInfos == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public void write(MemoryBuffer buffer, T[] arr) {
public T[] copy(T[] originArray) {
int length = originArray.length;
Object[] newArray = newArray(length);
if (needToCopyRef) {
fury.reference(originArray, newArray);
}
Serializer componentSerializer = this.componentTypeSerializer;
if (componentSerializer != null) {
if (componentSerializer.isImmutable()) {
Expand Down Expand Up @@ -142,7 +145,6 @@ public void xwrite(MemoryBuffer buffer, T[] arr) {

@Override
public T[] read(MemoryBuffer buffer) {
// Some jdk8 will crash if use varint, why?
int numElements = buffer.readVarUint32Small7();
boolean isFinal = (numElements & 0b1) != 0;
numElements >>>= 1;
Expand All @@ -151,9 +153,6 @@ public T[] read(MemoryBuffer buffer) {
refResolver.reference(value);
if (isFinal) {
final Serializer componentTypeSerializer = this.componentTypeSerializer;
if (componentTypeSerializer == null) {
System.out.println("=======");
}
for (int i = 0; i < numElements; i++) {
Object elem;
int nextReadRefId = refResolver.tryPreserveRefId(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public T copy(T originCollection) {
}
Collection newCollection = newCollection(originCollection);
if (needToCopyRef) {
Collection copyObject = (Collection) fury.getCopyObject(originCollection);
if (copyObject != null) {
return (T) copyObject;
}
fury.reference(originCollection, newCollection);
}
copyElements(originCollection, newCollection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ public void write(MemoryBuffer buffer, T value) {

@Override
public T copy(T originCollection) {
return dataSerializer.copy(originCollection);
return fury.copyObject(originCollection, dataSerializer);
}

@Override
Expand Down Expand Up @@ -755,7 +755,7 @@ public void write(MemoryBuffer buffer, T value) {

@Override
public T copy(T value) {
return (T) serializer.copy(value);
return fury.copyObject(value, (Serializer<T>) serializer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public T copy(T originMap) {
}
Map newMap = newMap(originMap);
if (needToCopyRef) {
Map copyMap = (Map) fury.getCopyObject(originMap);
if (copyMap != null) {
return (T) copyMap;
}
fury.reference(originMap, newMap);
}
copyEntry(originMap, newMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void write(MemoryBuffer buffer, T value) {

@Override
public T copy(T value) {
return dataSerializer.copy(value);
return fury.copyObject(value, dataSerializer);
}

@Override
Expand Down Expand Up @@ -501,7 +501,7 @@ public void write(MemoryBuffer buffer, T value) {

@Override
public T copy(T value) {
return (T) serializer.copy(value);
return fury.copyObject(value, (Serializer<T>) serializer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public Collection read(MemoryBuffer buffer) {
final Object sourceCollection = fury.readRef(buffer);
return (Collection) factory.apply(sourceCollection);
}

@Override
public Collection copy(Collection object) {
final Object collection = Platform.getObject(object, offset);
return (Collection) factory.apply(fury.copyObject(collection));
}
}

public static final class SynchronizedMapSerializer extends MapSerializer<Map> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public Collection read(MemoryBuffer buffer) {
final Object sourceCollection = fury.readRef(buffer);
return (Collection) factory.apply(sourceCollection);
}

@Override
public Collection copy(Collection object) {
final Object collection = Platform.getObject(object, offset);
return (Collection) factory.apply(fury.copyObject(collection));
}
}

public static final class UnmodifiableMapSerializer extends MapSerializer<Map> {
Expand Down
40 changes: 40 additions & 0 deletions java/fury-core/src/test/java/org/apache/fury/FuryCopyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.fury;

import static org.apache.fury.serializer.collection.MapSerializersTest.createMapFieldsObject;
import static org.testng.Assert.assertEquals;

import com.google.common.collect.ImmutableList;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -77,9 +80,13 @@
import org.apache.fury.serializer.EnumSerializerTest;
import org.apache.fury.serializer.EnumSerializerTest.EnumFoo;
import org.apache.fury.serializer.collection.ChildContainerSerializersTest.ChildArrayDeque;
import org.apache.fury.serializer.collection.SynchronizedSerializersTest;
import org.apache.fury.serializer.collection.UnmodifiableSerializersTest;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.test.bean.BeanB;
import org.apache.fury.test.bean.CollectionFields;
import org.apache.fury.test.bean.Cyclic;
import org.apache.fury.test.bean.MapFields;
import org.apache.fury.util.DateTimeUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -344,4 +351,37 @@ public String toString() {
return "B{" + "name='" + name + '\'' + ", list=" + list + '}';
}
}

@Test
public void testCircularRefCopy() {
Cyclic cyclic = Cyclic.create(true);
Fury fury = builder().withRefTracking(true).withRefCopy(true).build();
assertEquals(fury.copy(cyclic), cyclic);
}

@Test
public void testComplexMapCopy() {
Fury fury = builder().withRefTracking(true).withRefCopy(true).build();
{
MapFields mapFields = UnmodifiableSerializersTest.createMapFields();
assertEquals(fury.copy(mapFields), mapFields);
}
{
MapFields obj = createMapFieldsObject();
assertEquals(fury.copy(obj), obj);
}
}

@Test
public void testComplexCollectionCopy() {
Fury fury = builder().withRefTracking(true).withRefCopy(true).build();
{
CollectionFields collectionFields = SynchronizedSerializersTest.createCollectionFields();
assertEquals(fury.copy(collectionFields).toCanEqual(), collectionFields.toCanEqual());
}
{
CollectionFields collectionFields = UnmodifiableSerializersTest.createCollectionFields();
assertEquals(fury.copy(collectionFields).toCanEqual(), collectionFields.toCanEqual());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public void testWrite() throws Exception {

@Test(dataProvider = "javaFury")
public void testCollectionFieldSerializers(Fury fury) {
CollectionFields obj = createCollectionFields();
Object newObj = serDe(fury, obj);
assertEquals(((CollectionFields) (newObj)).toCanEqual(), obj.toCanEqual());
}

public static CollectionFields createCollectionFields() {
CollectionFields obj = new CollectionFields();
Collection<Integer> collection = Collections.synchronizedCollection(Arrays.asList(1, 2));
obj.collection = collection;
Expand Down Expand Up @@ -126,7 +132,6 @@ public void testCollectionFieldSerializers(Fury fury) {
Collections.synchronizedSortedMap(new TreeMap<>(ImmutableMap.of(1, 2)));
obj.sortedMap = sortedMap;
obj.sortedMap2 = sortedMap;
Object newObj = serDe(fury, obj);
assertEquals(((CollectionFields) (newObj)).toCanEqual(), obj.toCanEqual());
return obj;
}
}

0 comments on commit fd39760

Please sign in to comment.