Flink基础(三⼗⼀):FLINK-SQL语法
(七)DML(⼀)INSERT语句
INSERT 语句⽤来向表中添加⾏。
1 执⾏ INSERT 语句
单条 INSERT 语句,可以使⽤TableEnvironment中的executeSql()⽅法执⾏,也可以在中执⾏ INSERT 语句。executeSql()⽅法执⾏ INSERT 语句时会⽴即提交⼀个 Flink 作业,并且返回⼀个 TableResult 对象,通过该对象可以获取 JobClient ⽅便的操作提交的作业。多条 INSERT 语句,使⽤TableEnvironment中的createStatementSet创建⼀个StatementSet对象,然后使⽤StatementSet中的addInrtSql()⽅法添加多条INSERT 语句,最后通过StatementSet中的execute()⽅法来执⾏。
以下的例⼦展⽰了如何在TableEnvironment和 SQL CLI 中执⾏⼀条 INSERT 语句,或者通过StatementSet执⾏多条 INSERT 语句。
val ttings = wInstance()...
val tEnv = ate(ttings)
伍亮
// 注册⼀个 "Orders" 源表,和 "RubberOrders" 结果表
// 运⾏⼀个 INSERT 语句,将源表的数据输出到结果表中
val tableResult1 = uteSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
云台山旅游攻略// 通过 TableResult 来获取作业状态
JobClient().get().getJobStatus())
//----------------------------------------------------------------------------
// 注册⼀个 "GlassOrders" 结果表⽤于运⾏多 INSERT 语句
// 运⾏多个 INSERT 语句,将原表数据输出到多个结果表中新生儿打嗝什么原因
val stmtSet = ateStatementSet()
// `addInrtSql` ⽅法每次只接收单条 INSERT 语句
stmtSet.addInrtSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
stmtSet.addInrtSql(
蟑螂最怕什么
"INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
// 执⾏刚刚添加的所有 INSERT 语句
val tableResult2 = ute()
// 通过 TableResult 来获取作业状态
中国无产阶级
男猪女狗JobClient().get().getJobStatus())
电脑物理内存
2 将 SELECT 查询数据插⼊表中
通过 INSERT 语句,可以将查询的结果插⼊到表中,
语法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] lect_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
OVERWRITE
INSERT OVERWRITE将会覆盖表中或分区中的任何已存在的数据。否则,新数据会追加到表中或分区中。
PARTITION
PARTITION语句应该包含需要插⼊的静态分区列与值。
⽰例
-- 创建⼀个分区表
CREATE TABLE country_page_view (ur STRING, cnt INT, date STRING, country STRING)
PARTITIONED BY (date, country)
WITH (...)
工资管理
-- 追加⾏到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
SELECT ur, cnt FROM page_view_source;
-- 追加⾏到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每⼀⾏动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
SELECT ur, cnt, country FROM page_view_source;
-- 覆盖⾏到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
SELECT ur, cnt FROM page_view_source;
-- 覆盖⾏到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每⼀⾏动态决定INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
SELECT ur, cnt, country FROM page_view_source;
3 将值插⼊表中
通过 INSERT 语句,也可以直接将值插⼊到表中,
语法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...] values_row:
: (val1 [, val2, ...])
OVERWRITE
INSERT OVERWRITE将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。
⽰例
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);