Skip to content

Commit

Permalink
feat(java): support thread safe register callback for scala kotlin (#…
Browse files Browse the repository at this point in the history
…1895)

## What does this PR do?
 support thread safe register callback for scala kotlin
<!-- Describe the purpose of this PR. -->

## Related issues

Closes #1894 

## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?

## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Oct 22, 2024
1 parent 09abde8 commit 14bad42
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,57 @@

import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.fury.annotation.Internal;
import org.apache.fury.resolver.ClassChecker;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;

public abstract class AbstractThreadSafeFury implements ThreadSafeFury {
@Override
public void register(Class<?> clz) {
processCallback(fury -> fury.register(clz));
registerCallback(fury -> fury.register(clz));
}

@Override
public void register(Class<?> cls, boolean createSerializer) {
processCallback(fury -> fury.register(cls, createSerializer));
registerCallback(fury -> fury.register(cls, createSerializer));
}

@Override
public void register(Class<?> cls, Short id) {
processCallback(fury -> fury.register(cls, id));
registerCallback(fury -> fury.register(cls, id));
}

@Override
public void register(Class<?> cls, Short id, boolean createSerializer) {
processCallback(fury -> fury.register(cls, id, createSerializer));
registerCallback(fury -> fury.register(cls, id, createSerializer));
}

@Override
public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) {
processCallback(fury -> fury.registerSerializer(type, serializerClass));
registerCallback(fury -> fury.registerSerializer(type, serializerClass));
}

@Override
public void registerSerializer(Class<?> type, Serializer<?> serializer) {
processCallback(fury -> fury.registerSerializer(type, serializer));
registerCallback(fury -> fury.registerSerializer(type, serializer));
}

@Override
public void registerSerializer(Class<?> type, Function<Fury, Serializer<?>> serializerCreator) {
processCallback(fury -> fury.registerSerializer(type, serializerCreator.apply(fury)));
registerCallback(fury -> fury.registerSerializer(type, serializerCreator.apply(fury)));
}

@Override
public void setSerializerFactory(SerializerFactory serializerFactory) {
processCallback(fury -> fury.setSerializerFactory(serializerFactory));
registerCallback(fury -> fury.setSerializerFactory(serializerFactory));
}

@Override
public void setClassChecker(ClassChecker classChecker) {
processCallback(fury -> fury.getClassResolver().setClassChecker(classChecker));
registerCallback(fury -> fury.getClassResolver().setClassChecker(classChecker));
}

protected abstract void processCallback(Consumer<Fury> callback);
@Internal
public abstract void registerCallback(Consumer<Fury> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.annotation.Internal;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.memory.MemoryBuffer;
Expand Down Expand Up @@ -68,8 +69,9 @@ public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
Fury fury = bindingThreadLocal.get().get();
}

@Internal
@Override
protected void processCallback(Consumer<Fury> callback) {
public void registerCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (LoaderBinding binding : allFury.keySet()) {
binding.visitAllFury(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.concurrent.ThreadSafe;
import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
import org.apache.fury.annotation.Internal;
import org.apache.fury.io.FuryInputStream;
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.logging.Logger;
Expand Down Expand Up @@ -61,8 +62,9 @@ public ThreadPoolFury(
fury -> factoryCallback.accept(fury));
}

@Internal
@Override
protected void processCallback(Consumer<Fury> callback) {
public void registerCallback(Consumer<Fury> callback) {
factoryCallback = factoryCallback.andThen(callback);
for (ClassLoaderFuryPooled furyPooled :
furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
Expand Down
4 changes: 3 additions & 1 deletion kotlin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import kotlin.UIntArray;
import kotlin.ULongArray;
import kotlin.UShortArray;
import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.collection.CollectionSerializers;
import org.apache.fury.serializer.collection.MapSerializers;
Expand All @@ -34,6 +36,12 @@
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class KotlinSerializers {

public static void registerSerializers(ThreadSafeFury fury) {
AbstractThreadSafeFury threadSafeFury = (AbstractThreadSafeFury) fury;
threadSafeFury.registerCallback(KotlinSerializers::registerSerializers);
}

public static void registerSerializers(Fury fury) {
ClassResolver resolver = fury.getClassResolver();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.fury.serializer.scala;

import org.apache.fury.AbstractThreadSafeFury;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.SerializerFactory;
Expand All @@ -31,6 +33,11 @@

public class ScalaSerializers {

public static void registerSerializers(ThreadSafeFury fury) {
AbstractThreadSafeFury threadSafeFury = (AbstractThreadSafeFury) fury;
threadSafeFury.registerCallback(ScalaSerializers::registerSerializers);
}

public static void registerSerializers(Fury fury) {
ClassResolver resolver = setSerializerFactory(fury);

Expand Down

0 comments on commit 14bad42

Please sign in to comment.