AWSGlue集成ApacheHudi同步元数据深度历险(各类错误的填坑⽅案)
⽂章⽬录
在开始我们的“历险”之前,回看⼀路踩过的“坑”,不得不说的是:Glue集成Hudi在同步元数据这件事上,坑太多!究其原因主要是与Glue使⽤了⾃⼰的Metastore(Glue Catalog)有关,虽然本⽂我们将针对遇到的问题逐⼀给出解决⽅案,甚⾄有些⽅案还有⼀点hacking 的味道,但还是希望未来Hudi社区和Glue产品团队能积极靠拢⼀下,解决掉两者集成上的各类问题,因为这两个产品代表着⼤数据领域两⼤新兴的潮流:增量处理与⽆服务器架构,它们的结合真得是⼀件很酷的事情。
好的,前路崎岖,做好⼼理准备,让我们开始吧。(注:本⽂写作时,使⽤的是当前最新的Hudi 0.8.0与Glue 2.0)
1. 第⼀关:标配⽆效,使⽤替代⽅案
1.1 标准配置
⾸先是最基本的三项:
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY ->"true"
DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY ->"your-target-databa"
状元水饺
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY ->"your-target-table"
这三项很容易理解,就是告诉Hudi要开启Hive Sync,同时指定同步到Hive的什么库的什么表。如果你要同步的是⼀张分区表,还需要追加以下⼏项:
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY ->"true"
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"your-partition-path-field"
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY ->"your-hive-partition-field"
1.2 错误信息
在常规Hadoop/Spark集群上,完成上述配置之后Hudi Hive Sync就可以正常⼯作了,但是在Glue环境下,我们将遇到这样⼀个错误:Cannot create hive connection jdbc:hive2://localhost:10000/
1.3 原因分析
报这个错误的原因其实很好理解,因为Hudi的Hive Sync默认是通过JDBC连接HiveServer2执⾏建表操作的,
冰淇淋酸奶⽽jdbc:hive2://localhost:10000/是默认的Hive JDBC连接字符串(这个字符串当然也是可配置的,对应配置项为hive_sync.jdbc_url)。⽽在Glue的环境⾥,只有Glue Catalog,没有Hive Metastore和HiverServer2,这⾥我们让Hudi通过JDBC连接Glue⾥不存在的
兑命怎么读HiveServer2去同步元数据,结果必然是失败的。
1.4 替代⽅案
在解锁第⼆关之前,我曾经认为这个问题是⽆解的,因为Glue⾥就是没有Hive MetaStore,所以⾃动同步元数据到Glue Catalog是⽆望的。于是我使⽤了这样⼀种替代⽅案:预先建表 + ALTER TABLE RECOVER PARTITIONS修复分区。也就是不依赖Hudi的⾃动同步功能,在程序中提前建好Hudi数据集对应的Hive表,⽽对于分区表,每次写⼊分区数据后调⽤⼀次:
ALTER TABLE table_name RECOVER PARTITIONS;
来⾃动识别并添加新的分区。当然,如果每次写⼊的数据分区是确定的,最好是使⽤add partition进⾏精准的分区添加操作:
ALTER TABLE table_name ADD PARTITION(...) location '...';
如果使⽤的是动态分区,⽆法预知分区字段的取值,就只能使⽤recover partitions了。
2. 第⼆关:找到关键配置,成功开启⾃动同步
上述⽅案使⽤了⼀段时间后,⼀次偶然的机会意外发现了Hudi的⼀个关键配置项,从⽽顺利地开启了Glue到Hudi的元数据⾃动同步,这⼤⼤⽅便了Hudi在Glue上的应⽤。这步关键操作就是:disable使⽤jdbc同步元数据,即在Hudi的Write Options中添加:
DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY ->"fal"
这样做之所以能⽣效还有待查验源代码进⾏分析,但可能性很⼤的猜测是:当禁⽤JDBC⽅式连接Hive Metastore之后,Hudi很有可能会使⽤配置项spark.astore.client.factory.class指定的Hive Metastore客户端类去访问和操作Hive Metastore,⽽在Glue环境下,这⼀项配置的是com.a
mazonaws.astore.AWSGlueDataCatalogHiveClientFactory,这是专门访问Glue Catalog的客户端⼯⼚类,所以使⽤这个类创建的客户端就可以与Glue Catalog⽆缝交互了。
3. 第三关:同Session先读再写,再次失败
3.1 问题描述
闯过第⼆关之后,普通的元数据同步基本就没什么问题了。但是当遇到下⾯这种场景时,同步再次“翻了车”:
如果在⼀个SparkSession下,先读取⼀个Hudi数据集,得到DataFrame,在进⾏⼀些数据转换之后将变换后DataFrame再次以Hudi的形式写⼊另⼀张表,此时,Hudi会在同步这张新表的元数据时离奇失败。⽽这类操作其实是最典型不过的ETL操作流程,所以你⼤概率会遇上这个问题。
这⼀问题并不在配置上,所有配置与前⾯完全⼀样,起初在遇到这⼀问题时,我试过修改各种操作与配置进⾏验证,可以⾮常确定的是:
在Glue⾥,同⼀个SparkSession内,只要前⾯存在读取或写⼊Hudi数据集的⾏为,后⾯⼆次写⼊新的Hudi数据集并同步元数据时就⼀定会出错!如果前⾯没有Hudi操作,同步就能成功。
3.2 错误信息
以下是程序⽇志中记录的错误信息:
Exception in Ur Class: org.ption.HoodieException : Got runtime exception when hive syncing ....
如果try catch异常后, 给出的cau message是:
Failed to check if databa exists xxx
以下是全部的异常堆栈信息:
ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Exception in Ur Class: org.ption.HoodieException : Got runtime exc eption when hive syncing xxx
org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:122)
宁波科学探索中心
org.apache.hudi.HoodieSparkSqlWriter$$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:391)
org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:440)
org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:436)
org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:436)
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:497)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
org.apache.ateRelation(DefaultSource.scala:145)
org.apache.ution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
org.apache.and.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
org.apache.and.ExecutedCommandExec.sideEffectResult(commands.scala:68)
org.apache.and.ExecutedCommandExec.doExecute(commands.scala:86)
org.apache.ution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.ution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.ution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.uteQuery(SparkPlan.scala:152)
org.apache.ute(SparkPlan.scala:127)
org.apache.Rdd$lzycompute(QueryExecution.scala:80)
org.apache.Rdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
org.apache.ution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.ution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.ution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
微官网
小学生画画org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
... // 隐去⽆关的业务代码类
flect.Method.invoke(Method.java:498)
com.amazonaws.rvices.glue.SparkProcessLauncherPlugin$class.invoke(ProcessLauncher.scala:45)
com.amazonaws.rvices.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:76)
com.amazonaws.rvices.glue.ProcessLauncher.launch(ProcessLauncher.scala:115)
com.amazonaws.rvices.glue.ProcessLauncher$.main(ProcessLauncher.scala:26)
com.amazonaws.rvices.glue.ProcessLauncher.main(ProcessLauncher.scala)
3.3 原因分析
⾸先,我们需要从异常堆栈中找到发⽣错误的原始位置,但是⽇志中给出的错误堆栈其实是不全的,准确的位置是
在:org.apache.hudi.hive.HoodieHiveClient#doesDataBaExist的346⾏:
显然,异常是在这个client变量试图根据数据库名获取数据库时就报错了。⽽实际情况是,代码中请求的数据库是存在的,问题⼀定是client 连接不上metastore,将错误信息报成数据库不存在。那问题就转为这个client是什么?它是怎样⼯作的?
进⼀步浏览代码可以发现,这个client是这样定义和使⽤的:
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs){
...
private IMetaStoreClient client;// 71⾏
...
this.client = (configuration).getMSC();// 91⾏
...
}
它是由(configuration).getMSC()创建的,我们需要看⼀下Hive类的这两个⽅法以及⼀个重要的变量hiveDB:
public class Hive {
...
private static ThreadLocal<Hive> hiveDB =new ThreadLocal<Hive>(){
@Override
protected synchronized Hive initialValue(){
return null;
}
@Override
public synchronized void remove(){
()!=null){
<().clo();
}
}
};
...
public static Hive get(HiveConf c) throws HiveException {
Hive db = ();寓言故事画蛇添足
if(db ==null||
(db.metaStoreClient !=null&&!db.metaStoreClient.isCompatibleWith(c))){
return get(c,true);
}
return db;
}
.
..
public IMetaStoreClient getMSC() throws MetaException {
// 创建metaStoreClient代码与错误分析⽆关,故省略。
...
return metaStoreClient;
}
对于这段代码,有⼀个地⽅⾮常重要,即:hiveDB是⼀个线程局部变量(ThreadLocal),其包裹⼀个Hive实例,每当我们通
过(configuration)⽅法获取Hive实例时,得到的都当前线程上的唯⼀Hive实例。这⼀信息将对理解最后给出的解决⽅法很有帮助。
在排查进⾏到这⾥之前,我⼀直将怀疑重点放在传⼊的Hive配置上,也就是这⾥的参数HiveConf c,因为我⾼度怀疑单独写没有问题,⽽⼀次读之后再写就有问题的最⼤可能是前⼀次的读操作改动或触
发了某些Hive的配置,导致后续的写⼊因配置项不对⽽失败。但是经过层层的代码追查,我并未发现有任何问题,且通过⽐对⽇志中打印出的Hive Configuration信息,也未发现历次输出的配置有不⼀样的地⽅(其实后⾯的分析表明过程中还是有部分配置信息改写过,只是我打印的地⽅并不是改动后),所以调查进⾏到这⾥的时候就进⼊了“死胡同”。
为了找到新的线索,我将同步成功与失败的两份⽇志拿出来做了更细致的对⽐,终于发现了⼀处可疑的地⽅:
在成功同步元数据(去除读,只写⼊)作业的⽇志中,我发现了⼀条⽐同步失败的作业多出的⼀⾏信息:
即:
2021-04-12 08:21:08,752 INFO [main] astore (HiveMetaStoreClient.java:isCompatibleWith(291)): Mestastore astore.wareho u.dir changed from /ur/hive/warehou to file:/tmp/spark-warehou
我们看⼀下输出这⾏⽇志的isCompatibleWith⽅法:
public boolean isCompatibleWith(HiveConf conf){
if(currentMetaVars ==null){
return fal;// recreate
}
boolean compatible =true;
for(ConfVars oneVar : aVars){
红军营// Since metaVars are all of different types, u string for comparison
String oldVar = (oneVar.varname);
String newVar = (oneVar.varname,"");
if(oldVar ==null||
(oneVar.isCaSensitive()?!oldVar.equals(newVar):!oldVar.equalsIgnoreCa(newVar))){
LOG.info("Mestastore configuration "+ oneVar.varname +
" changed from "+ oldVar +" to "+ newVar);
compatible =fal;
}
}
return compatible;
这个⽅法中,最为关键的地⽅在于包裹LOG.info的这个if判定逻辑,从执⾏结果推定:在同步成功的作业⾥,此处的if条件判定为true,因为astore.warehou.dir这⼀项的值从/ur/hive/warehou改为了file:/tmp/spark-warehou(关于这个配置项是在何时由哪⾥发起的变更操作,⽬前也还不清楚,主要是Glue的服务器环境实在不太⽅便进⾏远程调试),从⽽促使isCompatibleWith⽅法返回了fal,整个if的判定的结果就成为了true,所以实际是使⽤了get(c,true)返回了⼀个新的Hive实例(这个参数true就是让Hive类创建⼀个新的Hive实例),就是说,恰好是这⼀处配置的变动,促使Hive⽣成了⼀个新的Hive实例!
⽽对于第⼆次Hudi操作,程序运⾏到这⾥的if后,由于没有任何HiveConf的改动,isCompatibleWith⽅法的返回值⼀定是true了,没有打印配置变更的⽇志也印证了这⼀点,这样整个if的判定值就是fal了,所以返回的必定还是上次⽤过的Hive实例,显然这个上次就⽤过的Hive实例,它的metaStoreClient现在已经不能正常⼯作了,它⽆法联通metastore,Failed to check if databa exists错误就是证明,⾄于为什么,我们从⽇志上⽆从得知(主要是Glue的服务器环境实在不太⽅便进⾏远程调试,故暂时搁置进⼀步的排查),但是解决⽅案其实已经呼之欲出了,那就是:
3.4 解决⽅案
每次在Hudi将要同步元数据前,我们让Hudi总是使⽤⼀个全新的aStoreClient去连接metastore,⽽实现的⽅法也很简单,那就是调⽤⼀次Hive.cloCurrent()⽅法将当前线程上的Hive实例移除即可,后续Hudi在进⾏元数据同步时,会检测到实例为空,然后触发新实例的重建。之所以在Hudi源代码之外的地⽅调⽤Hive.cloCurrent()能⽣效,也是因为Hive实例(hiveDB)是⼀个线程局部变量,同线程任意位置均可⽅便地获取它的引⽤并进⾏操作!
推荐:博主历时三年倾注⼤量⼼⾎创作的⼀书已由知名IT图书品牌电⼦⼯业出版社博⽂视点出版发⾏,真诚推荐给每⼀位读者!点击了解图书详情,扫码进⼊京东⼿机购书页⾯!