diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java index 1c2cb810..d935d473 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializer.java @@ -22,8 +22,11 @@ package org.apache.flink.connector.elasticsearch.sink; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayOutputStream; @@ -33,10 +36,13 @@ /** OperationSerializer is responsible for serialization and deserialization of an Operation. */ public class OperationSerializer { private final Kryo kryo = new Kryo(); + private static final ObjectMapper MAPPER = new ObjectMapper(); public OperationSerializer() { kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.addDefaultSerializer(JsonNode.class, new JsonNodeSerializer()); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } public void serialize(Operation request, DataOutputStream out) { @@ -61,4 +67,28 @@ public int size(Operation operation) { return (int) output.total(); } } + + private static class JsonNodeSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, JsonNode object) { + try { + byte[] bytes = MAPPER.writeValueAsBytes(object); + output.writeInt(bytes.length, true); + output.writeBytes(bytes); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize JsonNode", e); + } + } + + @Override + public JsonNode read(Kryo kryo, Input input, Class type) { + try { + int length = input.readInt(true); + byte[] bytes = input.readBytes(length); + return MAPPER.readTree(bytes); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize JsonNode", e); + } + } + } }