[fix#827][jdbc] jdbc PreparedStmtProxy can't resolve DELETE statement.this is make flink retraction failed. #824
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
mysql create ddl:
CREATE DATABASE
flinkx
/*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci */;create table kuaidi
(
kuaidi_name varchar(20),
product_id varchar(100)
);
create table result
(
kuaidi_name varchar(20),
count int,
primary key (kuaidi_name)
);
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('申通', '1');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('顺丰', '2');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('中通', '3');
INSERT INTO flinkx.kuaidi (kuaidi_name, product_id) VALUES ('中通', '1');
flinkx.sql:
CREATE TABLE kuaidi
(
product_id string,
kuaidi_name string
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/flinkx?useCursorFetch=true&fetchSize=1000&useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'kuaidi',
'username' = 'root',
'password' = '123456',
'scan.parallelism' = '1'
);
CREATE TABLE product (
kuaidi_name string,
count
bigint,primary key (kuaidi_name) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://localhost:3306/flinkx?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'result',
'username' = 'root',
'password' = '123456',
'sink.buffer-flush.max-rows' = '1', -- 批量写数据条数,默认:1024
'sink.buffer-flush.interval' = '1000000', -- 批量写时间间隔,默认:10000毫秒
'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
-- sink.all-replace = 'true' 生成如:INSERT INTO
result3
(mid
,mbb
,sid
,sbb
) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATEmid
=VALUES(mid
),mbb
=VALUES(mbb
),sid
=VALUES(sid
),sbb
=VALUES(sbb
) 。会将所有的数据都替换。-- sink.all-replace = 'false' 生成如:INSERT INTO
result3
(mid
,mbb
,sid
,sbb
) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATEmid
=IFNULL(VALUES(mid
),mid
),mbb
=IFNULL(VALUES(mbb
),mbb
),sid
=IFNULL(VALUES(sid
),sid
),sbb
=IFNULL(VALUES(sbb
),sbb
) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。'sink.parallelism' = '1' -- 写入结果的并行度,默认:null
);
INSERT into product
select
kuaidi_name,
count(1)
count
from
(select
product_id,
last_value(kuaidi_name) kuaidi_name
from kuaidi
group by product_id)
group by kuaidi_name;
error picture:
correct picture: