Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException in Kafka Connect #350

Closed
akamensky opened this issue May 27, 2021 · 16 comments
Closed

NullPointerException in Kafka Connect #350

akamensky opened this issue May 27, 2021 · 16 comments
Labels
bug Something isn't working

Comments

@akamensky
Copy link
Contributor

akamensky commented May 27, 2021

Environment

  • OS version: Centos 7.5
  • JDK version: AdoptOpenJDK 11.0.4
  • ClickHouse Server version: 21.5.5.12
  • ClickHouse Native JDBC version: 2.5.5-shaded
  • (Optional) Spark version: N/A
  • (Optional) Other components' version: kafkaconnect version 6.1.1-2.12

Error logs

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
	at com.github.housepower.jdbc.data.type.DataTypeInt64.serializeBinary(DataTypeInt64.java:77)
	at com.github.housepower.jdbc.data.Column.write(Column.java:31)
	at com.github.housepower.jdbc.data.Block.appendRow(Block.java:93)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addParameters(ClickHousePreparedInsertStatement.java:162)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addBatch(ClickHousePreparedInsertStatement.java:95)
	at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)
	at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more

Steps to reproduce

The message contains types (as per CH types): String, UInt64, Float64, DateTime. Some columns can be NULL, as is defined in CH schema, but looks like there is some NPE happens inside the driver when kafka message field is actually null.

Other descriptions

The same ingest works with official HTTP protocol driver, but would prefer to use this one as it works better in throughput and resource usage.

@akamensky akamensky changed the title NullPointerException NullPointerException in Kafkaconnect May 27, 2021
@pan3793
Copy link
Member

pan3793 commented May 27, 2021

The stacktrace seems not related to Jdbc driver

@akamensky
Copy link
Contributor Author

Yes, it doesn't show anything about JDBC driver. However same exact setup with Postgres driver works fine, also same exact setup with official CH driver also works. Only difference between those is which JDBC driver is used.

@akamensky
Copy link
Contributor Author

I have not tried just released 2.5.5 yet, since I cannot restart kafkaconnect just yet. I will try the same setup with 2.5.5 a little later and will provide feedback here.

@pan3793
Copy link
Member

pan3793 commented May 27, 2021

from my understand, the pipeline is pg => (dbz as kafka connect source) => kafka => (ck native jdbc as kafka connect sink) => clickhouse. if not, could you briefly illustrate the pipeline? which components are you using clickhouse native jdbc in?

@akamensky
Copy link
Contributor Author

akamensky commented May 27, 2021

we are changing the pipeline to use CH instead of PG. We have many topics to ingest into database (all different tables). Some work fine with this driver other have some problems.

Now: application => kafka => confluent-kafkaconnect + jdbc sink => timescaledb -- this one works fine.

New: application => kafka => confluent-kafkaconnect + jdbc sink => clickhouse -- this one works with official HTTP driver, but native driver has this error (only some sinks)

We are running old and new setup in parallel, so the same data works with PG driver, but fails with CH native driver.

@akamensky
Copy link
Contributor Author

Upgraded to 2.5.5. Stacktrace is different now:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
	at com.github.housepower.jdbc.data.type.DataTypeInt64.serializeBinary(DataTypeInt64.java:77)
	at com.github.housepower.jdbc.data.Column.write(Column.java:31)
	at com.github.housepower.jdbc.data.Block.appendRow(Block.java:93)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addParameters(ClickHousePreparedInsertStatement.java:162)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addBatch(ClickHousePreparedInsertStatement.java:95)
	at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)
	at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more

@pan3793
Copy link
Member

pan3793 commented May 27, 2021

that make sense, i see what happens in jdbc driver now.

@pan3793
Copy link
Member

pan3793 commented May 27, 2021

integration-test passed, and will publish v2.5.6 soon

@pan3793 pan3793 changed the title NullPointerException in Kafkaconnect NullPointerException in Kafka Connect May 27, 2021
@pan3793 pan3793 added the bug Something isn't working label May 27, 2021
@pan3793
Copy link
Member

pan3793 commented May 27, 2021

v2.5.6 is deployed.

@akamensky
Copy link
Contributor Author

Thanks, I will test it once it is available in central.

@akamensky
Copy link
Contributor Author

Update to 2.5.6, but still see same stacktrace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
	at com.github.housepower.jdbc.data.type.DataTypeInt64.serializeBinary(DataTypeInt64.java:77)
	at com.github.housepower.jdbc.data.Column.write(Column.java:31)
	at com.github.housepower.jdbc.data.Block.appendRow(Block.java:93)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addParameters(ClickHousePreparedInsertStatement.java:162)
	at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.addBatch(ClickHousePreparedInsertStatement.java:95)
	at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:115)
	at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:169)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more

@pan3793
Copy link
Member

pan3793 commented May 27, 2021

I think the stacktrace indicate the column type is Int64, not Nullable(Int64), can you provide the ClikckHouse table schema and the error record?

@akamensky
Copy link
Contributor Author

akamensky commented May 28, 2021

@pan3793

I needed to redact the names, but one of the tables where it fails looks like this:

production :) show create table data

CREATE TABLE data.data
(
    `col1` String,
    `col2` String,
    `col3` String,
    `col4` UInt64,
    `col5` UInt64,
    `col6` UInt64,
    `col7` UInt64,
    `col8` Float64,
    `col9` UInt64,
    `col10` String,
    `col11` String,
    `col12` String,
    `col13` String,
    `col14` UInt64,
    `col15` DateTime,
    `col16` UInt64,
    `col17` DateTime,
    `col18` UInt64,
    `col19` DateTime,
    `col20` UInt64
)
ENGINE = MergeTree
PARTITION BY toStartOfHour(col19)
ORDER BY col20
SETTINGS index_granularity = 8192

Actual SQL used to create table is:

CREATE TABLE data.data (
 `col1` String NOT NULL,
 `col2` String NOT NULL,
 `col3` String NOT NULL,
 `col4` UInt64 NOT NULL,
 `col5` UInt64,
 `col6` UInt64,
 `col7` UInt64,
 `col8` Float64 NOT NULL,
 `col9` UInt64 NOT NULL,
 `col10` String NOT NULL,
 `col11` String NOT NULL,
 `col12` String NOT NULL,
 `col13` String NOT NULL,
 `col14` UInt64 NOT NULL,
 `col15` DateTime NOT NULL,
 `col16` UInt64 NOT NULL,
 `col17` DateTime NOT NULL,
 `col18` UInt64 NOT NULL,
 `col19` DateTime NOT NULL,
 `col20` UInt64 NOT NULL
) ENGINE = MergeTree PARTITION BY toStartOfHour(col19) ORDER BY col20;

@akamensky
Copy link
Contributor Author

akamensky commented May 28, 2021

Hmm, I figured out that NOT NULL is the default in CH (which is the opposite of many other databases where NULL is the default).

I flipped some values around and it works now, the data is being written into table. However this is very different from how official driver behaves, it still can handle above schema and not fail the task for some reason 🤔

Also would be helpful if there was a clearer error message for the cases like this.

@pan3793
Copy link
Member

pan3793 commented May 28, 2021

Yes, ClickHouse use String to represent VARCHAR NOT NULL in most ANSI compatible database, and Nullable(String) to represent VARCHAR NULL.
BTW, it's by design that throw exception when insert null to a not null column, I will close this issue, and feel free to reopen or open new issues if you encounter other questions.

@pan3793 pan3793 closed this as completed May 28, 2021
@akamensky
Copy link
Contributor Author

Yes, while it is clearer now and I can ingest the data into clickhouse, one thing could be improved on this driver side is to have a better error handling for this case.

The error here is very generic and does not point in the right direction. The point of having error messages at all is to make it clear what happened and possibly point to correct solution.

I suggest to:

  • catch this exception
  • and check if the exception happened because of null being received for non-nullable column
  • if it is then throw another exception which provides much clearer message (i.e. received NULL value for NOT NULL column or something similar, maybe even include column name in the message if possible).

This will allow users to handle those issues in much cleaner way (and not bothering you with these issues regularly as it seems from issues history for this type of issue).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants