Skip to content

Commit

Permalink
[AMORO-3356] Include ResourceSerde interface and impl with SimpleSeri…
Browse files Browse the repository at this point in the history
…alize and KryoSerialize
  • Loading branch information
czy006 committed Dec 10, 2024
1 parent 2298331 commit eb0882f
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.serialization;

import static org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;

import java.io.Serializable;

public class JavaSerializer<R extends Serializable> implements ResourceSerde<R> {

private static final KryoSerialize kryoSerialize = new KryoSerialize<>();

public static final JavaSerializer INSTANT = new JavaSerializer<>();

@Override
public byte[] serializeResource(R resource) {
checkNotNull(resource);
return kryoSerialize.serializeResource(resource);
}

@Override
public DeserializedResource<R> deserializeResource(byte[] input) {
return kryoSerialize.deserializeResource(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.serialization;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayOutputStream;
import java.io.Serializable;

public class KryoSerialize<R> implements ResourceSerde<R> {

private static final ThreadLocal<KryoSerializerInstance> KRYO_SERIALIZER =
ThreadLocal.withInitial(KryoSerializerInstance::new);

@Override
public byte[] serializeResource(R resource) {
try {
return KRYO_SERIALIZER.get().serialize(resource);
} catch (Exception e) {
throw new IllegalArgumentException("serialization error of " + resource, e);
}
}

@Override
public DeserializedResource<R> deserializeResource(byte[] input) {
try {
if (input == null) {
return new DeserializedResource<R>(null, false);
}
R deserialize = (R) KRYO_SERIALIZER.get().deserialize(input);
return new DeserializedResource<R>(deserialize, false);
} catch (Exception e) {
throw new IllegalArgumentException("deserialization error ", e);
}
}

private static class KryoSerializerInstance implements Serializable {
public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
private final Kryo kryo;
private final ByteArrayOutputStream outputStream;

KryoSerializerInstance() {
KryoInstantiation kryoInstantiation = new KryoInstantiation();
kryo = kryoInstantiation.newKryo();
outputStream = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
kryo.setRegistrationRequired(false);
}

byte[] serialize(Object obj) {
kryo.reset();
outputStream.reset();
Output output = new Output(outputStream);
this.kryo.writeClassAndObject(output, obj);
output.close();
return outputStream.toByteArray();
}

Object deserialize(byte[] objectData) {
return this.kryo.readClassAndObject(new Input(objectData));
}
}

private static class KryoInstantiation implements Serializable {

public Kryo newKryo() {
Kryo kryo = new Kryo();

// This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
// Handle cases where we may have an odd classloader setup like with lib jars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register serializers
kryo.register(Utf8.class, new AvroUtf8Serializer());

return kryo;
}
}

private static class AvroUtf8Serializer extends Serializer<Utf8> {

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, Utf8 utf8String) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
bytesSerializer.write(kryo, output, utf8String.getBytes());
}

@SuppressWarnings("unchecked")
@Override
public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
return new Utf8(bytes);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.serialization;

/**
* ResourceSerde interface
*
* @param <R> resource serde with obj
*/
public interface ResourceSerde<R> {

/**
* serialize resource
*
* @param resource input object
*/
byte[] serializeResource(R resource);

/**
* deserialize resource
*
* @param input bytes
* @return output deserialize obj
*/
DeserializedResource<R> deserializeResource(byte[] input);

final class DeserializedResource<R> {
private final R resource;
private final boolean modifiedDuringDeserialization;

public DeserializedResource(R resource, boolean modifiedDuringDeserialization) {
this.resource = resource;
this.modifiedDuringDeserialization = modifiedDuringDeserialization;
}

public R getResource() {
return resource;
}

public boolean isModifiedDuringDeserialization() {
return modifiedDuringDeserialization;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.serialization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class SimpleSerialize<R> implements ResourceSerde<R> {

@Override
public byte[] serializeResource(R resource) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(resource);
oos.flush();
return bos.toByteArray();
}
} catch (IOException e) {
throw new IllegalArgumentException("serialization error of " + resource, e);
}
}

@Override
public DeserializedResource<R> deserializeResource(byte[] input) {
if (input == null) {
return null;
}
try (ByteArrayInputStream bis = new ByteArrayInputStream(input)) {
try (ObjectInputStream ois = new ObjectInputStream(bis)) {
return new DeserializedResource<R>((R) ois.readObject(), false);
}
} catch (IOException | ClassNotFoundException e) {
throw new IllegalArgumentException("deserialization error ", e);
}
}
}
Loading

0 comments on commit eb0882f

Please sign in to comment.