ApacheIceberg快速⼊门
导⾔
本⽂主要介绍如何快速的通过Spark访问Iceberg table。
Spark通过DataSource和DataFrame API访问Iceberg table,或者进⾏Catalog相关的操作。由于Spark Data Source V2 API还在持续的演进和修改中,所以Iceberg在不同的Spark版本中的使⽤⽅式有所不同。版本对⽐
phoster功能Spark 2.4Spark 3.0
基于DataFrame
- 读数据⽀持⽀持
roughly
- 读元数据⽀持⽀持
- 追加(append)⽀持⽀持
- 覆盖(Overwrite)⽀持⽀持
- V2 source专属操作,如
create, overwrite不⽀持⽀持
基于Spark SQL
- SELECT通过DataFrame的
temporary view⽀持
- DDL不⽀持(仅能通过
Iceberg API)⽀持(通过Catalog)
- DML不⽀持⽀持
Spark 2.4email
配置
Hive MetaStore
Iceberg内部⽀持Hive和Hadoop两种catalog:
Catalog类型Metadata JSON管理Namespace
Hive catalog Hive MetaStore1级,即DB
Hadoop catalog⽂件系统上的某个⽂件多级,对应多级⽬录
后⽂以Hive catalog为主做介绍。Hive catalog需要Hive MetaStore的⽀持。注意其有多种配置⽅式,其中内嵌的Derby数据库仅仅⽤于实验和学习,不能⽤于⽣产环境。
Spark
<SPARK_HOME>/f需要加⼊如下配置,使Iceberg能够访问Hive MetaStore:
spark.astore.uris thrift://<HiveMetaStore>:9083spark.astore.warehou.dir hdfs://<NameNode>:8020/path
部署
汽车除味1. 如何使⽤社区正式发布的版本:
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating
2. 如何本地打包,并把Iceberg放⼊Spark的classpath:
git /apache/incubator-iceberg.gitcd incubator-iceberg# master branch supports Spark 2.4.4./gradlew asmblespark-shell --jars <iceberg-git-working-directory>/spark-runtime/build/libs/iceberg-spark-runtime-<version>.jar
读Iceberg table
考研英语一历年国家线通过DataFrame
Spark 2.4只能读写已经存在的Iceberg table。在后续的操作前,需要先通过Iceberg API来创建table。
读取是通过DataFrameReader并指定iceberg作为format来访问Iceberg table,随后Iceberg内部的逻辑会根据path来判断访问的是Hive catalog下的table,还是⽤⽂件系统的路径表⽰的Hadoop table。
// Table managed by ad .format('iceberg') .load('db.table')// Hadoop table, id出国留学迈格森
entified by a ad .format('iceberg') .load('hdfs://<NameNode>:8020/<path_to_table>')
Iceberg会判断path中是否含有'/'。如果是,则认为是⼀个⽤路径表⽰Hadoop table;否则,会去Hive catalog中寻找。
利⽤time travel回溯某⼀个snapshot的数据
在读取时,通过option指定as-of-timestamp或者snapshot-id来访问之前某⼀个snapshot中的数据:
// Time travel to October 26, 1986 at 01:ad .format('iceberg') .option('as-of-timestamp', '499162860000') .load('db.table')// Time travel to snapshot with ad .format('iceberg') .option('snapshot-id', 10963874102873L) snapshot-id的获取⽅法,可以参考后⽂中访问元数据中snapshot的部分,或者直接查看元数据⽂件的内容。
在DataFrame基础上使⽤SQL SELECT
在DataFrame的基础上,创建local temporary view后,也可以通过SQL SELECT来读取Iceberg table的内容:
val df = ad .format('iceberg') .load('db.table')df.createOrReplaceTempView('view')spark.sql('''SELECT * FROM view''') .show()
写Iceberg table
Spark 2.4可以通过DataFrameWriter并指定iceberg作为format来写⼊Iceberg table,并⽀持append和overwrite两种模式:
// Appenddf.write .format('iceberg') .mode('append') .save('db.table')// Overwritedf.write .format('iceberg') .mode('overwrite') .save('db.table')
有如下⼏点需要注意:
1. Overwrit的⾏为dynamic overwrite,即当某个partition中含有输⼊DataFrame中的⾏的时候,该partition才会被新数据完全覆盖;其他partition则保持不变。⽽Spark
2.4中原⽣数据源(如parquet)的默认⾏为是stat
2. 操作粒度是⽂件级别,并不是⾏级别;
3. mode必须显式指定,没有默认⾏为。
访问Iceberg table的元数据
Iceberg⽀持通过DataFrameReader访问table的元数据,如snapshot,manifest等。对于Hive table,可以在原table name后⾯加.history、.snapshots等表⽰要访问元数据;对于⽤路径来表⽰的Hadoop table,需要在原路径后
// Read snapshot history of ad .format('iceberg') .load('db.table.history')
结果如下:
+-------------------------+---------------------+---------------------+---------------------+| made_current_at | snapshot_id | parent_id | is_current_ancestor |+-------------------------+---------------------+---------------------+---------------------+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL 5179299526185056830 | true || 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true || 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |+-------------------------+---------------------+---------------------+--------------------⼜如:
// Read snapshot list of ad .format('iceberg') .load('db.table.snapshots')// Read manifest files of ad .format('iceberg') .load('db.table.manifests')// Read data file list of ad .format('iceberg') .load('db.table 可以进⼀步将history和snapshot按照sna
pshot id做join,来查找snapshot id对应的application id:
SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id']FROM history hJOIN snapshots s ON h.snapshot_id = s.snapshot_idORDER BY made_current_at
结果如下:
-------------------------+-----------+----------------+---------------------+----------------------------------+| made_current_at | operation | snapshot_id | is_current_ancestor | summary[spark.app.id] |+-------------------------+-----------+----------------+---------------------+----------------------------------+ 57897183625154 | true | application_1520379288616_155055 || 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true | application_1520379288616_152431 |+-------------------------+-----------+----------------+---------------------+----------------------------------+
Spark 3.0
夏天护肤步骤Iceberg在Spark 3.0中,作为V2 Data Source,除了上述Spark 2.4所有的访问能⼒外,还可以通过V2 Data Source专属的DataFrame API访问;同时,受益于external catalog的⽀持,Spark SQL的DDL功能也可以操作配置external catalog
在<SPARK_HOME>/f加⼊如下配置:
幽默语言spark.sql.catalog.ample.YourCatalogClass
通过V2 Data Source专属DataFrame API访问
df.writeTo('catalog-name.db.table') .overwritePartitions()
通过Spark SQL访问
相较于Spark 2.4,Spark 3.0可以省去DataFrameReader和创建local temporary view的步骤,直接通过Spark SQL进⾏操作:
-- Create tableCREATE TABLE catalog-name.db.tabe (id INT, data STRING) USING iceberg PARTITIONED BY (id)-- InrtINSERT INTO catalog-name.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c')-- DeleteDELETE FROM catalog-name.db.table WHERE id 我们作为社区中spark-3分⽀的维护者,正在持续推进新功能的开发和合⼊,让更多的⼈受益。
总结
本⽂作为Iceberg的快速⼊门,介绍了如何通过Spark访问Iceberg table,以及不同Spark版本的⽀持情况:
Spark 2.4可以通过DataFrame读取或修改已经存在的Iceberg table中的数据,但建表、删表等DDL操作只能通过Iceberg API完成;
不可抗力英文
Spark 3.0访问Iceberg table的能⼒是Spark 2.4的超集,可以通过Spark SQL配合catalog,进⾏SELECT、DDL和DML等更多的操作。
2018四级答案随着Iceberg⾃⾝功能的完善(如向量化读取,merge on read等),以及上下游对接和⽣态的丰富,Iceberg作为优秀的表格式抽象,在⼤数据领域必然会有更好的发展。