FlinkSQL连接Hive并写入读取数据

更新时间:2023-06-29 18:51:54 阅读: 评论:0

FlinkSQL连接Hive并写⼊读取数据
2、两种 planner(old & blink)的区别
1. 批流统⼀:Blink 将批处理作业,视为流式处理的特殊情况。所以,blink 不⽀持表和DataSet 之间的转换,批处理作业将不转换为
DataSet 应⽤程序,⽽是跟流处理⼀样,转换为 DataStream 程序来处理。
2. 因 为 批 流 统 ⼀ , Blink planner 也 不 ⽀ 持 BatchTableSource , ⽽ 使 ⽤ 有 界 的
3. Blink planner 只⽀持全新的⽬录,不⽀持已弃⽤的 ExternalCatalog。
4. 旧 planner 和 Blink planner 的 FilterableTableSource 实现不兼容。旧的 planner 会把PlannerExpressions 下推到
filterableTableSource 中,⽽ blink planner 则会把 Expressions 下推。
5. 基于字符串的键值配置选项仅适⽤于 Blink planner。
6. PlannerConfig 在两个 planner 中的实现不同。
7. Blink planner 会将多个 sink 优化在⼀个 DAG 中(仅在 TableEnvironment 上受⽀持,⽽在 StreamTableEnvironment 上不受⽀
持)。⽽旧 planner 的优化总是将每⼀个 sink 放在⼀个新的 DAG 中,其中所有 DAG 彼此独⽴。
8. 旧的 planner 不⽀持⽬录统计,⽽ Blink planner ⽀持。
⽼版本创建流处理批处理
7、 ⽼版本创建流处理批处理
7.1 ⽼版本流处理
val ttings = wInstance()
.uOldPlanner() // 使⽤⽼版本 planner
.inStreamingMode() // 流处理模式
可爱女孩头像萌图片
.build()
val tableEnv = ate(env, ttings)
7.2 ⽼版本批处理
val batchEnv = ExecutionEnvironment
val batchTableEnv = ate(batchEnv)
7.3 blink 版本的流处理环境
val bsSettings = wInstance()
.uBlinkPlanner()
.inStreamingMode().build()
val bsTableEnv = ate(env, bsSettings)
7.4 blink 版本的批处理环境
val bbSettings = wInstance()
.uBlinkPlanner()
.
inBatchMode().build()
val bbTableEnv = ate(bbSettings)
3、连接⽂件系统,创建hive catalog,对表进⾏操作,类似于Spark on Hive,flink可以直接获取Hive的元数据,并使⽤flink进⾏计算。
// 连接外部⽂件
.withFormat(new Csv().fieldDelimiter(','))
.withSchema(new Schema().field("id", DataTypes.STRING()))
神奇遥控器
.createTemporaryTable("output");
// 设置 hive ⽅⾔
// 获取l⽬录
婚恋教育String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);
HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);
bbTableEnv.uCatalog("hive");
bbTableEnv.uDataba("warningplatform");
通过t()去创建临时表的⽅式已经过时了,建议使⽤uteSql()的⽅式,通过DDL去创建临时表,临时表到底是属于哪⼀个catalog⽬前还不太确定,到底是什么规则⽬前还不清楚。 查资料得知,临时表与单个Flink会话的⽣命周期相关,临时表始终存储在内存中。 永久表需要⼀个catalog来管理表对应的元数据,⽐如hive metastore,该表将⼀直存在,直到明确删除该表为⽌。 因此猜测:default_catalog是存储在内存中,如果在切换成hive catalog之前创建临时表,那我们就可以使⽤
default_catalog.default_databa.tableName来获取这个临时表。 如果切换了catalog再去创建临时表,那我们就⽆法获取到临时表了,因为它不在default_catalog中,⽽且保存在内存⾥⾯,直接查询临时表会去当前的catalog⾥⾯去查找临时表,因此⼀定要在default_catalog ⾥⾯创建临时表。 ⽽临时视图好像是存储在当前的catalog⾥⾯
通过ateTemporaryView()创建的视图则是属于当前的databa的
注意1.11版本的执⾏sql的⽅法发⽣了改变,通过执⾏环境的executeSql(),executeInrt()等来进⾏插⼊或者执⾏sql语句
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
EnvironmentSettings ttings = EnvironmentSettings
.newInstance()
.uBlinkPlanner() // 使⽤BlinkPlanner
.inBatchMode() // Batch模式,默认为StreamingMode
.build();
TableEnvironment tableEnv = ate(ttings);
String name = "myhive";                                // Catalog名称,定义⼀个唯⼀的名称表⽰
String defaultDataba = ("defaultDataba"); // 默认数据库名称
String hiveConfDir = ("hiveConf");            // l路径
String version = "2.1.1";                              // Hive版本号
HiveCatalog hive = new HiveCatalog(name, defaultDataba, hiveConfDir, version);
tableEnv.uCatalog("myhive");
TableResult result;
String SelectTables_sql ="lect * stdata";
result = uteSql(SelectTables_sql);
result.print();
}
四、启动集群,提交任务
1、进⼊flink的⽬录下执⾏ bin/start-cluster.sh
2、登录rest.address : rest.port查看⽹页是否正常运⾏
3、执⾏命令提交任务
flink/flink-1.13.1/bin/flink run -ample.FlinkHiveIntegration flink/job/flink common job-1.1.jar -hiveConf
/etc/hive/conf.cloudera.hive/ -defaultDataba test
五、报错处理
Caud by: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/mapred/JobConf;瓜子脸图片
解决⽅法:补充hadoop-mapreduce-client-core-3.0.0.jar包
6、案例(新)
需求: 将⼀个txt⽂本⽂件作为输⼊流读取数据过滤id不等于nsor_1的数据
实现思路: ⾸先我们先构建⼀个table的env环境通过connect提供的⽅法来读取数据然后设置表结构将数据注册为⼀张表就可进⾏我们的数据过滤了(使⽤sql或者流处理⽅式进⾏解析)
准备数据
nsor_1,1547718199,35.8
nsor_6,1547718201,15.4
nsor_7,1547718202,6.7
object FlinkSqlTable {
def main(args: Array[String]): Unit = {
// 构建运⾏流处理的运⾏环境
val env = ExecutionEnvironment
// 构建table环境
val tableEnv = ate(env)快餐策划
//通过 connect 读取数据
.withFormat(new Csv()) //设置类型
.withSchema(new Schema() // 给数据添加元数信息
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.
field("temperature", DataTypes.DOUBLE())
).createTemporaryTable("inputTable")  // 创建⼀个临时表文1
val resTable = tableEnv.from("inputTable")
.lect("*").filter('id === "nsor_1")
// 使⽤sql的⽅式查询数据
var resSql = tableEnv.sqlQuery("lect * from inputTable where id='nsor_1'")
歌唱艺术
// 将数据转为流进⾏输出
}
}爱你的温柔
6、TableEnvironment 的作⽤
注册 catalog
在内部 catalog 中注册表
执⾏ SQL 查询
注册⽤户⾃定义函数
注册⽤户⾃定义函数
保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引⽤
在创建 TableEnv 的时候,可以多传⼊⼀个 EnvironmentSettings 或者 TableConfig 参数,可以⽤来配置 TableEnvironment 的⼀些特性。

本文发布于:2023-06-29 18:51:54,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1060480.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   处理   批处理   版本   读取数据
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图