flinksql如何关联mysql维表_Flink系例之TableAPISQL与
MYSQL数据查询
使⽤Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
⽰例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
⽰例数据源(项⽬码云下载)
⽰例模块 (l)
SelectToMysql.java
大同礼记package sql;
import org.apache.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.pes.Row;
工艺过程import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使⽤Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
*/
public class SelectToMysql {
/**
分区扫描
为了加速并⾏Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
lumn:⽤于对输⼊进⾏分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第⼀个分区的最⼩值。
scan.partition.upper-bound:最后⼀个分区的最⼤值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory⼯⼚类中定义
static String table_sql =
九寨沟作文"CREATE TABLE my_urs (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +梦见买米
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'pe' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?uUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'sql.jdbc.Driver', \n" +
" 'connector.table' = 'urs', \n" +
宝宝高烧
" 'connector.urname' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'ad.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = ExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = wInstance().uBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment
StreamTableEnvironment tEnv = ate(env, bsSettings);
//注册mysql数据维表
String sql = "lect id,name,age,status from my_urs";
//第⼀种:执⾏SQL
//Table table = tEnv.sqlQuery(sql);财务文员
雷锋小故事//第⼆种:通过⽅法拼装执⾏语句
Table table = tEnv.from("my_urs").lect($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream behaviorStream = AppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(Row value, Collector out) throws Exception { System.out.String());
Thread.sleep(1 * 1000);
}
}).print();
}
}
打印结果
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
水果酸奶冰1,nike,16,0
2,nike,18,0