flinksql如何关联mysql维表_Flink系例之TableAPISQL与MYSQL数据查询

更新时间:2023-06-30 11:05:00 阅读: 评论:0

flinksql如何关联mysql维表_Flink系例之TableAPISQL与
MYSQL数据查询
使⽤Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
⽰例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
⽰例数据源(项⽬码云下载)
⽰例模块 (l)
SelectToMysql.java
大同礼记package sql;
import org.apache.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.pes.Row;
工艺过程import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使⽤Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
*/
public class SelectToMysql {
/**
分区扫描
为了加速并⾏Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
lumn:⽤于对输⼊进⾏分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第⼀个分区的最⼩值。
scan.partition.upper-bound:最后⼀个分区的最⼤值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory⼯⼚类中定义
static String table_sql =
九寨沟作文"CREATE TABLE my_urs (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +梦见买米
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'pe' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?uUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'sql.jdbc.Driver', \n" +
" 'connector.table' = 'urs', \n" +
宝宝高烧
" 'connector.urname' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'ad.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = ExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = wInstance().uBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment
StreamTableEnvironment tEnv = ate(env, bsSettings);
//注册mysql数据维表
String sql = "lect id,name,age,status from my_urs";
//第⼀种:执⾏SQL
//Table table = tEnv.sqlQuery(sql);财务文员
雷锋小故事//第⼆种:通过⽅法拼装执⾏语句
Table table = tEnv.from("my_urs").lect($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream behaviorStream = AppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(Row value, Collector out) throws Exception { System.out.String());
Thread.sleep(1 * 1000);
}
}).print();
}
}
打印结果
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
水果酸奶冰1,nike,16,0
2,nike,18,0

本文发布于:2023-06-30 11:05:00,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1070069.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   分区   码云   打印
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图