You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from kafka, then select from dynamic table, and write results to mysql.
but, when we test the exactly once(end to end), we found problem.
official documentation about flink sql do ous no favor. I need help
INSERT INTO pvuv_sink
SELECT
uuid,
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'), uuid;
sql parse and concat file:
/**
这是进行命令解析和提交的程序,整个工程入口
*/
public class SqlSubmit {
public static void main(String[] args) throws Exception {
// 解析命令行参数
final CliOptions options = CliOptionsParser.parseClient(args);
// 将解析好的命令行参数传递给SqlSubmit
SqlSubmit submit = new SqlSubmit(options);
// 运行程序
submit.run();
we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from kafka, then select from dynamic table, and write results to mysql.
but, when we test the exactly once(end to end), we found problem.
official documentation about flink sql do ous no favor. I need help
sql file:
-- source, 使用计算列,uuid()在线生成uuid
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
uuid as uuid()
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'test-consumer-group12',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
-- sink
CREATE TABLE pvuv_sink (
uuid varchar,
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink_test',
'connector.table' = 'pvuv_sink13',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1',
'connector.sink.semantic' = 'exactly-once'
);
INSERT INTO pvuv_sink
SELECT
uuid,
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'), uuid;
sql parse and concat file:
/**
这是进行命令解析和提交的程序,整个工程入口
*/
public class SqlSubmit {
public static void main(String[] args) throws Exception {
// 解析命令行参数
final CliOptions options = CliOptionsParser.parseClient(args);
}
// --------------------------------------------------------------------------------------------
private String sqlFilePath;
private TableEnvironment tEnv;
// 获取到sql执行文件的路径
private SqlSubmit(CliOptions options) {
this.sqlFilePath = options.getSqlFilePath();
}
private void run() throws Exception {
// 创建flink执行的上下文对象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
this.tEnv = StreamTableEnvironment.create(environment,
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());
// System.out.println(call.command.toString());
callCommand(call);
}
}
}
shell to submit a job:
#!/bin/bash
export FLINK_HOME=/Users/hulc/developEnv/flink-1.11.0
sql_file=$2
# flink home检查
if [ -z "$FLINK_HOME" ];then
echo "请指定FLINK_HOME 或者在该配置文件中配置"
exit 1
fi
# 参数数量检查
if [ $# -lt 2 ];then
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi
# 要依赖的jar包,这里名字是写死的,后去可以使用传入参数
# SQL_JAR=./flink-sql-submit-1.0-SNAPSHOT.jar
SQL_JAR=./target/flink-test1-1.0-SNAPSHOT.jar
# 检查是否正确加载这个jar包
if [ -f $SQL_JAR ];then
echo "
date +%Y-%m-%d" "%H:%M:%S
load jars from ${SQL_JAR}"else
echo "failed to load dependent jars for sql-submit.sh,please specify it"
exit 1
fi
# 检查是否指定sql文件
if [ ! -f $sql_file ];then
echo "sql文件 $sql_file 不存在,请检查文件路径"
exit 1
fi
#提交命令, 注意这里的提交参数也是写死的,并行度5 main主类全名, 工程打出的jar包
# $1 就是 -f,也就是制定需要执行的文件参数
# $sql_file 就是制定需要执行的sql文件
if [ $1 = "-f" ];then
$FLINK_HOME/bin/flink run -p 1 -c SqlSubmit /Users/hulc/develop/flink-test1/target/flink-test1-1.0-SNAPSHOT.jar $1 $sql_file
else
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi
The text was updated successfully, but these errors were encountered: