forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added SimpleSerDe to ser/deser basic types (apache#163)
- Loading branch information
Showing
2 changed files
with
359 additions
and
0 deletions.
There are no files selected for viewing
139 changes: 139 additions & 0 deletions
139
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/utils/SimpleSerDe.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.pulsar.functions.utils; | ||
|
||
import com.google.common.collect.Sets; | ||
import org.apache.pulsar.functions.api.SerDe; | ||
|
||
import java.io.*; | ||
import java.lang.reflect.Type; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* Simplest form of SerDe. | ||
*/ | ||
public class SimpleSerDe implements SerDe { | ||
|
||
private static final Set<Type> supportedInputTypes = Sets.newHashSet( | ||
Integer.class, | ||
Double.class, | ||
Long.class, | ||
String.class, | ||
Short.class, | ||
Byte.class, | ||
Float.class, | ||
Map.class, | ||
List.class, | ||
Object.class | ||
); | ||
private Type type; | ||
private boolean ser; | ||
|
||
public SimpleSerDe(Type type, boolean ser) { | ||
this.type = type; | ||
this.ser = ser; | ||
verifySupportedType(type, ser); | ||
} | ||
|
||
@Override | ||
public Object deserialize(byte[] input) { | ||
if (ser) { | ||
throw new RuntimeException("Serializer function cannot deserialize"); | ||
} | ||
String data = new String(input, StandardCharsets.UTF_8); | ||
if (type.equals(Integer.class)) { | ||
return Integer.valueOf(data); | ||
} else if (type.equals(Double.class)) { | ||
return Double.valueOf(data); | ||
} else if (type.equals(Long.class)) { | ||
return Long.valueOf(data); | ||
} else if (type.equals(String.class)) { | ||
return data; | ||
} else if (type.equals(Short.class)) { | ||
return Short.valueOf(data); | ||
} else if (type.equals(Byte.class)) { | ||
return Byte.decode(data); | ||
} else if (type.equals(Float.class)) { | ||
return Float.valueOf(data); | ||
} else if (type.equals(Map.class)) { | ||
try { | ||
ByteArrayInputStream byteIn = new ByteArrayInputStream(input); | ||
ObjectInputStream in = new ObjectInputStream(byteIn); | ||
return (Map<Object, Object>) in.readObject(); | ||
} catch (Exception ex) { | ||
return null; | ||
} | ||
} else if (type.equals(List.class)) { | ||
try { | ||
ByteArrayInputStream byteIn = new ByteArrayInputStream(input); | ||
ObjectInputStream in = new ObjectInputStream(byteIn); | ||
return (List<Object>) in.readObject(); | ||
} catch (Exception ex) { | ||
return null; | ||
} | ||
} else { | ||
throw new RuntimeException("Unknown type " + type); | ||
} | ||
} | ||
|
||
@Override | ||
public byte[] serialize(Object input) { | ||
if (!ser) { | ||
throw new RuntimeException("DeSerializer function cannot serialize"); | ||
} | ||
if (type.equals(Integer.class)) { | ||
return ((Integer) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Double.class)) { | ||
return ((Double) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Long.class)) { | ||
return ((Long) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(String.class)) { | ||
return ((String) input).getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Short.class)) { | ||
return ((Short) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Byte.class)) { | ||
return ((Byte) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Float.class)) { | ||
return ((Float) input).toString().getBytes(StandardCharsets.UTF_8); | ||
} else if (type.equals(Map.class) || type.equals(List.class)) { | ||
try { | ||
ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); | ||
ObjectOutputStream out = new ObjectOutputStream(byteOut); | ||
out.writeObject(input); | ||
return byteOut.toByteArray(); | ||
} catch (Exception ex) { | ||
return null; | ||
} | ||
} else { | ||
throw new RuntimeException("Unknown type " + type); | ||
} | ||
} | ||
|
||
public void verifySupportedType(Type type, boolean allowVoid) { | ||
if (!allowVoid && !supportedInputTypes.contains(type)) { | ||
throw new RuntimeException("Non Basic types not yet supported: " + type); | ||
} else if (!(supportedInputTypes.contains(type) || type.equals(Void.class))) { | ||
throw new RuntimeException("Non Basic types not yet supported: " + type); | ||
} | ||
} | ||
} |
220 changes: 220 additions & 0 deletions
220
...ar-functions/runtime/src/test/java/org/apache/pulsar/functions/utils/SimpleSerDeTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.pulsar.functions.utils; | ||
|
||
import org.testng.annotations.Test; | ||
|
||
import java.util.HashMap; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.testng.Assert.*; | ||
|
||
/** | ||
* Unit test of {@link SimpleSerDe}. | ||
*/ | ||
public class SimpleSerDeTest { | ||
@Test | ||
public void testStringSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(String.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(String.class, false); | ||
String input = new String("input"); | ||
byte[] output = serializer.serialize(input); | ||
String result = (String) deserializer.deserialize(output); | ||
assertEquals(result, input); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(String.class, false); | ||
serDe.serialize(new String("input")); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(String.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testLongSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(Long.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(Long.class, false); | ||
Long input = new Long(648292); | ||
byte[] output = serializer.serialize(input); | ||
Long result = (Long) deserializer.deserialize(output); | ||
assertEquals(result, input); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Long.class, false); | ||
serDe.serialize(new Long(34242)); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Long.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testDoubleSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(Double.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(Double.class, false); | ||
Double input = new Double(648292.32432); | ||
byte[] output = serializer.serialize(input); | ||
Double result = (Double) deserializer.deserialize(output); | ||
assertEquals(result, input); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Double.class, false); | ||
serDe.serialize(new Double(34242)); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Double.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testFloatSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(Float.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(Float.class, false); | ||
Float input = new Float(354353.54654); | ||
byte[] output = serializer.serialize(input); | ||
Float result = (Float) deserializer.deserialize(output); | ||
assertEquals(result, input); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Float.class, false); | ||
serDe.serialize(new Float(34242)); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Float.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testIntegerSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(Integer.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(Integer.class, false); | ||
Integer input = new Integer(2542352); | ||
byte[] output = serializer.serialize(input); | ||
Integer result = (Integer) deserializer.deserialize(output); | ||
assertEquals(result, input); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Integer.class, false); | ||
serDe.serialize(new Integer(34242)); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Integer.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testMapSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(Map.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(Map.class, false); | ||
Map<String, String> input = new HashMap<>(); | ||
input.put("Key", "Value"); | ||
byte[] output = serializer.serialize(input); | ||
Map result = (Map) deserializer.deserialize(output); | ||
assertEquals(result.size(), input.size()); | ||
assertEquals(result.get("Key"), input.get("Key")); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Map.class, false); | ||
serDe.serialize(new HashMap()); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(Map.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
|
||
@Test | ||
public void testListSerDe() { | ||
SimpleSerDe serializer = new SimpleSerDe(List.class, true); | ||
SimpleSerDe deserializer = new SimpleSerDe(List.class, false); | ||
List<Integer> input = new LinkedList<>(); | ||
input.add(1234); | ||
byte[] output = serializer.serialize(input); | ||
List result = (List) deserializer.deserialize(output); | ||
assertEquals(result.size(), input.size()); | ||
assertEquals(result.get(0), input.get(0)); | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(List.class, false); | ||
serDe.serialize(new LinkedList<>()); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
|
||
try { | ||
SimpleSerDe serDe = new SimpleSerDe(List.class, true); | ||
serDe.deserialize(new byte[10]); | ||
assertFalse(true); | ||
} catch (Exception ex) { | ||
// This is good | ||
} | ||
} | ||
} |