Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error with "behavior.on.null.values": "delete" #719

Open
eknudtson opened this issue Sep 12, 2023 · 1 comment
Open

Error with "behavior.on.null.values": "delete" #719

eknudtson opened this issue Sep 12, 2023 · 1 comment

Comments

@eknudtson
Copy link

Hi,

We are attempting to sink a topic into an elasticsearch cluster, and we want tombstone records to delete docs out of the cluster.

We are able to sink all docs from a test topic without an issue, but the moment we insert a tombstone the connector bails out (see error below).

We are using avro schema registry to serialize/deserialize both keys and values in the topic. We tried to avoid avro serializing tombstone null values (no schema associated w/ the messages), but it fails either way (null has no schema associated, or if we do associate one).

Any ideas? The IGNORE option also doesn't seem to work, so we're stuck when deletes come through.

Thanks!

kafka-connect  | [2023-09-12 22:40:07,729] ERROR [elasticsearch-connector|task-0] WorkerSinkTask{id=elasticsearch-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
kafka-connect  | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
kafka-connect  | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
kafka-connect  | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:522)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:499)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
kafka-connect  | 	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
kafka-connect  | 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
kafka-connect  | 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
kafka-connect  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
kafka-connect  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
kafka-connect  | 	at java.base/java.lang.Thread.run(Thread.java:829)
kafka-connect  | Caused by: java.nio.BufferUnderflowException
kafka-connect  | 	at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:643)
kafka-connect  | 	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:165)
kafka-connect  | 	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:598)
kafka-connect  | 	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:386)
kafka-connect  | 	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:261)
kafka-connect  | 	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
kafka-connect  | 	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)
kafka-connect  | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:522)
kafka-connect  | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
kafka-connect  | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
kafka-connect  | 	... 14 more```
@ryzhyk
Copy link

ryzhyk commented Sep 3, 2024

@eknudtson , have you found the answer to your question by any chance? I ran into the same issue with the Confluent JDBC sink connector. According to the docs, the connector supports deleting records by sending a tombstone message with null value, but I don't know what schema id to use for the null value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants