Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka connect sink can not process with empty array #457

Open
phongbui81062 opened this issue Oct 4, 2024 · 10 comments
Open

kafka connect sink can not process with empty array #457

phongbui81062 opened this issue Oct 4, 2024 · 10 comments
Labels
bug Something isn't working

Comments

@phongbui81062
Copy link

phongbui81062 commented Oct 4, 2024

Describe the bug

Hi everyone,

I have this case and I don't know what is the problem.

I create kafka connect sink, which reads data from CDC (Postgres) with Avro format. However when I try to sink data to Table which contains Array(String) columns (it worked well with Array(UInt32)), the error SIZES_OF_ARRAYS_DONT_MATCH, seem like ClickHouse tried to compare the length of array columns

Steps to reproduce

  1. Create table
CREATE OR REPLACE TABLE db.`topic_name`
(
    `before.id` Nullable(UInt32),
    `before.name` Nullable(String),
    `before.slug_name` Nullable(String),
    `before.aftership_id`              Array(String),
    `before.carrier_17_id`             Array(String),
    `before.carrier_cs_phone` Nullable(String),
    `before.carrier_id` Nullable(String),
    `before.carrier_language` Nullable(String),
    `before.carrier_logo` Nullable(String),
    `before.carrier_support_languages` Array(String),
    `before.carrier_url` Nullable(String),
    `before.carrier_url_tracking` Nullable(String),
    `before.comments` Nullable(String),
    `before.carrier_countries_iso`     Array(String),
    `before.pattern_regex`             Array(String),
    `before.scraper_id` Nullable(UInt32),
    `before.base_weight` Nullable(DOUBLE),
    `before.cms_url` Nullable(String),
    `before.allowed_concurrent_request_count` Nullable(UInt32),
    `before.postal_scrape` Nullable(Bool),
    `before.carrier_countries`         Array(String),
    `before.updated_date` Nullable(DateTime64(6)),
    `before.is_hidden` Nullable(Bool),
    `before.merchant_parameters` Nullable(String),
    `before.last_calc_rating` Nullable(DateTime64(6)),
    `before.rating` Nullable(String),
    `after.id` Nullable(UInt32),
    `after.name` Nullable(String),
    `after.slug_name` Nullable(String),
    `after.aftership_id`               Array(String),
    `after.carrier_17_id`              Array(String),
    `after.carrier_cs_phone` Nullable(String),
    `after.carrier_id` Nullable(String),
    `after.carrier_language` Nullable(String),
    `after.carrier_logo` Nullable(String),
    `after.carrier_support_languages`  Array(String) CODEC (ZSTD),
    `after.carrier_url` Nullable(String),
    `after.carrier_url_tracking` Nullable(String),
    `after.comments` Nullable(String),
    `after.carrier_countries_iso`      Array(String),
    `after.pattern_regex`              Array(String),
    `after.scraper_id` Nullable(UInt32),
    `after.base_weight` Nullable(DOUBLE),
    `after.cms_url` Nullable(String),
    `after.allowed_concurrent_request_count` Nullable(UInt32),
    `after.postal_scrape` Nullable(Bool),
    `after.carrier_countries`          Array(String),
    `after.updated_date` Nullable(DateTime64(6)),
    `after.is_hidden` Nullable(Bool),
    `after.merchant_parameters` Nullable(String),
    `after.last_calc_rating` Nullable(DateTime64(6)),
    `after.rating` Nullable(String),
    ts_ms                              Int64 CODEC (LZ4),
    source_table                       Nullable(String),
    op                                 LowCardinality(String) CODEC (ZSTD)
)
    ENGINE MergeTree
        ORDER BY tuple();
  1. Kafka connect configuration:
name: config-name
  credentialsName: ch-credential
  tasksMax: 1
  config:
    topics: topic_name
    ssl: false
    security.protocol: SASL_PLAINTEXT
    behavior.on.null.values: ignore
    behavior.on.error: ignore
    database: db_name
    hostname: db_host
    port: 8123
    transforms: flatten
    transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
    transforms.flatten.delimiter: .
    value.converter: io.confluent.connect.avro.AvroConverter
    key.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: schema-registry:8081
    key.converter.schema.registry.url: schema-registry:8081
    value.converter.schemas.enable: true
    exactlyOnce: true
    errors.log.enable: true
    errors.log.include.messages: true
    schemas.enable: false
    RowBinaryWithDefaults: true
  1. Trigger WAL
{
  "before": {
    "topic_name.Value": {
      "aftership_id": {
        "array": []
      },
      "carrier_17_id": {
        "array": [
          {
            "string": "190015"
          }
        ]
      }
    }
  },
  "op": "u",
  "transaction": null,
  "ts_ms": {
    "long": 1727944418823
  }
}

Errors Log:

Tasks:
  0:
    State:      FAILED
    Worker ID:  connect:8083
    Trace:      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: java.lang.RuntimeException: Number of records: 500
	at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:68)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more
      Caused by: java.lang.RuntimeException: Topic: [topic_name], Partition: [0], MinOffset: [377905], MaxOffset: [378033], (QueryId: [b1cbfdfe-66f4-4404-bdda-736e6da42602])
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:68)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:165)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:64)
	... 11 more
      Caused by: java.util.concurrent.ExecutionException: com.clickhouse.client.ClickHouseException: Code: 190. DB::Exception: Elements 'before.aftership_id' and 'before.carrier_17_id' of Nested data structure 'before' (Array columns) have different array sizes. (SIZES_OF_ARRAYS_DONT_MATCH) (version 24.5.3.5 (official build))

	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:706)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:185)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:66)
	... 14 more
      Caused by: com.clickhouse.client.ClickHouseException: Code: 190. DB::Exception: Elements 'before.aftership_id' and 'before.carrier_17_id' of Nested data structure 'before' (Array columns) have different array sizes. (SIZES_OF_ARRAYS_DONT_MATCH) (version 24.5.3.5 (official build))

	at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:151)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	... 3 more
      Caused by: java.io.IOException: Code: 190. DB::Exception: Elements 'before.aftership_id' and 'before.carrier_17_id' of Nested data structure 'before' (Array columns) have different array sizes. (SIZES_OF_ARRAYS_DONT_MATCH) (version 24.5.3.5 (official build))

	at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:222)
	at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:261)
	at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:194)
	at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
	... 4 more

Topics:
  topic_name

Can you help me to resolve this problems?

@phongbui81062 phongbui81062 added the bug Something isn't working label Oct 4, 2024
@Paultagoras
Copy link
Contributor

Hi @phongbui81062 are you still running into this?

@phongbui81062
Copy link
Author

yes, I still stuck on this

@Paultagoras
Copy link
Contributor

Paultagoras commented Oct 31, 2024

@phongbui81062 Could you run this and paste the output here?

SHOW CREATE TABLE db.`topic_name`

I want to check something with the definition of the table...

@phongbui81062
Copy link
Author

phongbui81062 commented Oct 31, 2024

here is my table

CREATE` TABLE db.table
(
    `before.id` Nullable(UInt32),
    `before.name` Nullable(String),
    `before.slug_name` Nullable(String),
    `before.aftership_id` Array(String) DEFAULT [],
    `before.carrier_17_id` Array(String) DEFAULT [],
    `before.carrier_cs_phone` Nullable(String),
    `before.carrier_id` Nullable(String),
    `before.carrier_language` Nullable(String),
    `before.carrier_logo` Nullable(String),
    `before.carrier_url` Nullable(String),
    `before.carrier_url_tracking` Nullable(String),
    `before.comments` Nullable(String),
    `before.scraper_id` Nullable(UInt32),
    `before.base_weight` Nullable(Float64),
    `before.cms_url` Nullable(String),
    `before.allowed_concurrent_request_count` Nullable(UInt32),
    `before.postal_scrape` Nullable(Bool),
    `before.updated_date` Nullable(DateTime64(6)),
    `before.is_hidden` Nullable(Bool),
    `before.merchant_parameters` Nullable(String),
    `before.last_calc_rating` Nullable(DateTime64(6)),
    `before.rating` Nullable(String),
    `after.id` Nullable(UInt32),
    `after.name` Nullable(String),
    `after.slug_name` Nullable(String),
    `after.aftership_id` Array(String),
    `after.carrier_17_id` Array(String),
    `after.carrier_cs_phone` Nullable(String),
    `after.carrier_id` Nullable(String),
    `after.carrier_language` Nullable(String),
    `after.carrier_logo` Nullable(String),
    `after.carrier_support_languages` Array(String) CODEC(ZSTD(1)),
    `after.carrier_url` Nullable(String),
    `after.carrier_url_tracking` Nullable(String),
    `after.comments` Nullable(String),
    `after.carrier_countries_iso` Array(String),
    `after.pattern_regex` Array(String),
    `after.scraper_id` Nullable(UInt32),
    `after.base_weight` Nullable(Float64),
    `after.cms_url` Nullable(String),
    `after.allowed_concurrent_request_count` Nullable(UInt32),
    `after.postal_scrape` Nullable(Bool),
    `after.carrier_countries` Array(String),
    `after.updated_date` Nullable(DateTime64(6)),
    `after.is_hidden` Nullable(Bool),
    `after.merchant_parameters` Nullable(String),
    `after.last_calc_rating` Nullable(DateTime64(6)),
    `after.rating` Nullable(String),
    `ts_ms` Int64 CODEC(LZ4),
    `source_table` Nullable(String),
    `op` LowCardinality(String) CODEC(ZSTD(1))
)
ENGINE = MergeTree
ORDER BY ts_ms
SETTINGS allow_experimental_replacing_merge_with_cleanup = 1, index_granularity = 8192

Postgres table

create table table
(
    id                               integer default  not null primary key,
    name                             varchar(500),
    slug_name                        varchar(500),
    aftership_id                     varchar(200)[],
    carrier_17_id                    varchar(200)[],
    carrier_cs_phone                 varchar(200),
    carrier_id                       varchar(100)                                                      not null
        constraint carrier_carrier_id_c0a57067_uniq
            unique,
    carrier_language                 varchar(100),
    carrier_logo                     varchar(100),
    carrier_support_languages        varchar(50)[],
    carrier_url                      varchar(200),
    carrier_url_tracking             varchar(200),
    comments                         varchar(200),
    carrier_countries_iso            varchar(200)[],
    pattern_regex                    varchar(1000)[],
    scraper_id                       integer,
    base_weight                      double precision                                                  not null,
    cms_url                          varchar(200),
    allowed_concurrent_request_count integer                                                           not null,
    postal_scrape                    boolean                                                           not null,
    carrier_countries                varchar(200)[]                                                    not null,
    updated_date                     timestamp,
    is_hidden                        boolean                                                           not null,
    merchant_parameters              jsonb                                                             not null,
    last_calc_rating                 timestamp,
    rating                           jsonb                                                             not null,
    booking_type                     varchar(50)[],
    supported_integrations           varchar(50)[]
);

@phongbui81062
Copy link
Author

Could you try to insert the data in Postgres table in different array length? such as col aftership_id values {'asd','asd'}, carrier_17_id with empty array

@Paultagoras
Copy link
Contributor

@phongbui81062 I was able to replicate the issue, let me discuss and see. Thanks!

@phongbui81062
Copy link
Author

thank you

@phongbui81062
Copy link
Author

Hi @Paultagoras Can I ask 1 question? What happen if I add async_insert and wait_for_async_insert settings for kafka connect sink?

@Paultagoras
Copy link
Contributor

Paultagoras commented Oct 31, 2024

Hi @Paultagoras Can I ask 1 question? What happen if I add async_insert and wait_for_async_insert settings for kafka connect sink?

If I remember right we had a bug with RowBinaryWithDefaults - essentially it'll hiccup if using Avro or Protobuf.

May have figured out the issue, could you please try setting this delimiter instead?

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"

@phongbui81062
Copy link
Author

Hi @Paultagoras

May have figured out the issue, could you please try setting this delimiter instead?
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"

let me test. Thank a lot.

If I remember right we had a bug with RowBinaryWithDefaults - essentially it'll hiccup if using Avro or Protobuf.

I had one problem that one of my table (which use kafka connect sink as pipeline) will trigger 1 materialize view and that MV would trigger another one. When I test with basic insert query, It looks like it waits to process in MV done, and it took too long around 8 mins, then when I added these settings async_insert and wait_for_async_insert, it reduced to 10 seconds. I worry about if I deploy kafka connect sink on production env with my current design, performance will be affected.

Can you give me alternative solution in case not use these settings?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants