数据现在支持json格式{"xx":"bb","cc":"dd"}
CREATE TABLE tableName(
colName colType,
...
function(colNameX) AS aliasName,
WATERMARK FOR colName AS withOffset( colName , delayTime )
)WITH(
type ='kafka09',
bootstrapServers ='ip:port,ip:port...',
zookeeperQuorum ='ip:port,ip:port/zkparent',
offsetReset ='latest',
topic ='topicName',
groupId='test',
parallelism ='parllNum',
--timezone='America/Los_Angeles',
timezone='Asia/Shanghai',
sourcedatatype ='json' #可不设置
);
kafka08,kafka09,kafka10,kafka11及以上版本
kafka读取和写入的版本必须一致,否则会有兼容性错误。
参数名称 | 含义 |
---|---|
tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称 |
colName | 列名称 |
colType | 列类型 colType支持的类型 |
function(colNameX) as aliasName | 支持在定义列信息的时候根据已有列类型生成新的列(函数可以使用系统函数和已经注册的UDF) |
WATERMARK FOR colName AS withOffset( colName , delayTime ) | 标识输入流生的watermake生成规则,根据指定的colName(当前支持列的类型为Long | Timestamp) 和delayTime生成waterMark 同时会在注册表的使用附带上rowtime字段(如果未指定则默认添加proctime字段);注意:添加该标识的使用必须设置系统参数 time.characteristic:EventTime; delayTime: 数据最大延迟时间(ms) |
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | kafka09 | 是 | kafka08、kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本) |
groupId | 需要读取的 groupId 名称 | 否 | |
bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开) | 是 | |
zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔) | 是 | |
topic | 需要读取的 topic 名称 | 是 | |
topicIsPattern | topic是否是正则表达式格式(true|false) | 否 | false |
offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})] | 否 | latest |
parallelism | 并行度设置 | 否 | 1 |
sourcedatatype | 数据类型 | 否 | json |
timezone | 时区设置timezone支持的参数 | 否 | 'Asia/Shanghai' |
kafka相关参数可以自定义,使用kafka.开头即可。 |
kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1,nbTest2,nbTest3',
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourcedatatype ='json' #可不设置
);
嵌套json解析示例
json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
数组类型字段解析示例
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
user[1].pv INT as pv,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
or
json: {"name":"tom", "obj":{"channel": "root"}, "pv": [4, 7, 10], "xctime":1572932485}
CREATE TABLE MyTable(
name varchar,
obj.channel varchar as channel,
pv[1] INT as pv,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
groupId='nbTest',
topic ='nbTest1,nbTest2,nbTest3',
--- topic ='mqTest.*',
---topicIsPattern='true',
parallelism ='1'
);
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | kafka09 | 是 | |
bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开) | 是 | |
zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔) | 是 | |
topic | 需要读取的 topic 名称 | 是 | |
offsetReset | 读取的topic 的offset初始位置[latest|earliest] | 否 | latest |
parallelism | 并行度设置 | 否 | 1 |
sourcedatatype | 数据类型 | 是 | csv |
fielddelimiter | 字段分隔符 | 是 | |
lengthcheckpolicy | 单行字段条数检查策略 | 否 | 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
kafka相关参数可以自定义,使用kafka.开头即可。 |
CREATE TABLE MyTable(
name varchar,
channel varchar,
pv INT,
xctime bigint,
CHARACTER_LENGTH(channel) AS timeLeng
)WITH(
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
--topic ='mqTest.*',
--topicIsPattern='true'
parallelism ='1',
sourcedatatype ='csv',
fielddelimiter ='\|',
lengthcheckpolicy = 'PAD'
);
Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。 与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致:
create table kafka_stream(
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
) with (
type ='kafka09',
bootstrapServers ='172.16.8.198:9092',
zookeeperQuorum ='172.16.8.198:2181/kafka',
offsetReset ='latest',
topic ='nbTest1',
parallelism ='1',
sourcedatatype='text'
)
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | kafka09 | 是 | |
bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开) | 是 | |
zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔) | 是 | |
topic | 需要读取的 topic 名称 | 是 | |
offsetReset | 读取的topic 的offset初始位置[latest|earliest] | 否 | latest |
parallelism | 并行度设置 | 否 | 1 |
sourcedatatype | 数据类型 | 否 | text |
kafka相关参数可以自定义,使用kafka.开头即可。 |
从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作, 需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下: 2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考)
SQL
-- 定义解析Kakfa message的UDTF
CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF';
CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF';
-- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
create table kafka_src (
_topic STRING,
_messageKey STRING,
_message STRING,
_partition INT,
_offset BIGINT,
ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。
watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark
) WITH (
type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档
topic = 'test_kafka_topic',
...
);
create table rds_sink (
name VARCHAR,
age INT,
grade VARCHAR,
updateTime TIMESTAMP
) WITH(
type='mysql',
url='jdbc:mysql://localhost:3306/test',
tableName='test4',
userName='test',
password='XXXXXX'
);
-- 使用UDTF,将二进制数据解析成格式化数据
CREATE VIEW input_view (
name,
age,
grade,
updateTime
) AS
SELECT
COUNT(*) as cnt,
T.ctime,
T.order,
T.name,
T.sex
from
kafka_src as S,
LATERAL TABLE (kafkapaser _message)) as T (
ctime,
order,
name,
sex
)
Group BY T.sex,
TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 对input_view中输出的数据做计算
CREATE VIEW view2 (
cnt,
sex
) AS
SELECT
COUNT(*) as cnt,
T.sex
from
input_view
Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);
-- 使用解析出的格式化数据进行计算,并将结果输出到RDS中
insert into rds_sink
SELECT
cnt,sex
from view2;
UDF&UDTF
package com.XXXX;
import com.XXXX.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;
/**
以下例子解析输入Kafka中的JSON字符串,并将其格式化输出
**/
public class kafkaUDTF extends TableFunction<Row> {
public void eval(byte[] message) {
try {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
// 提取JSON Object中各字段
String ctime = Timestamp.valueOf(data.split('\\|')[0]);
String order = data.split('\\|')[1];
String name = data.split('\\|')[2];
String sex = data.split('\\|')[3];
// 将解析出的字段放到要输出的Row()对象
Row row = new Row(4);
row.setField(0, ctime);
row.setField(1, age);
row.setField(2, grade);
row.setField(3, updateTime);
System.out.println("Kafka message str ==>" + row.toString());
// 输出一行
collect(row);
} catch (ClassCastException e) {
System.out.println("Input data format error. Input data " + msg + "is not json string");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
// 定义输出Row()对象的字段类型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
}
}
package com.dp58;
package com.dp58.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class KafkaUDF extends ScalarFunction {
// 可选,open方法可以不写
// 需要import org.apache.flink.table.functions.FunctionContext;
public String eval(byte[] message) {
// 读入一个二进制数据,并将其转换为String格式
String msg = new String(message, "UTF-8");
return msg.split('\\|')[0];
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
//可选,close方法可以不写
@Override
public void close() {
}
}