SparkSQLjdbc()写⼊流程分析
SparkSQL jdbc()写⼊流程分析
导⾔
在使⽤SparkSQL⾃带的jdbc()⽅法测试ClickHou的写⼊性能时,jdbc()写⼊⽆法⽀持Array类型的数据写⼊。
⽹上有⼈说不⽀持数组写⼊的原因是SparkSQL的jdbc()⽅法获取到的是statement连接,⽽不是preparedStatement连接,因此SparkSQL不⽀持Array类型的写⼊。
抛开这个结论的正确性不谈,要想知道jdbc()不⽀持数组的原因,只要深⼊Spark的源码,应该就能找到答案了。因此笔者准备⽤两篇⽂章记录spark写⼊clickhou的具体流程,这篇⽂章将着重介绍程序⼊⼝jdbc()的写⼊流程。
jdbc()程序的⼊⼝
dataFrame
.repartition(1)
.write
.mode("append")
.jdbc(url,clickhou_table,properties)
上述代码为jdbc()的标准代码,它也可以写成以下形式
dataFrame.write
.format("jdbc")
.mode("append")
.option("dbtable",dbtable)
.option("url",url)
.option("ur",ur)
.option("password",password)
.
save()
从上⾯两种代码来分析,可以判断,jdbc()⽅法应该是对save()⽅法的封装。
下⾯对jdbc()源码开始逐层分析
def jdbc(url:String, table:String, connectionProperties: Properties):Unit={
asrtNotPartitioned("jdbc")
asrtNotBucketed("jdbc")
/**
* 不难发现,jdbc()其实是对save()⽅法的⼀层封装
*/
format("jdbc").save()
}
save()
def save():Unit={
......
/**
private var source: String = df.f.defaultDataSourceName
*/
val cls = DataSource.lookupDataSource(source, df.f)
if(classOf[DataSourceV2].isAssignableFrom(cls)){
val ds = wInstance()
媳妇底线
ds match{
ca ws: WriteSupport =>
核桃的营养价值
val options =new DataSourceOptions((extraOptions ++
ds = ds.asInstanceOf[DataSourceV2],
conf = df.f)).asJava)
val jobId =new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date())+"-"+ UUID.randomUUID()
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if(writer.isPrent){
runCommand(df.sparkSession,"save"){
(), df.logicalPlan)
}
有名的古诗>名班主任工作室
}
ca _ => saveToV1Source()
}
}el{
saveToV1Source()
}
}
在save()⽅法内,会调⽤lookupDataSource()⽅法对判断当前的Source类型来执⾏不同的写⼊⽅法(WriteToDataSourceV2() 或者saveToV1Source() )
lookupDataSource()
lookupDataSource()def lookupDataSource(provider:String, conf: SQLConf): Class[_]={
val provider1 = OrEl(provider, provider)match{
ca name if name.equalsIgnoreCa("orc")&&
classOf[OrcFileFormat].getCanonicalName
ca name if name.equalsIgnoreCa("orc")&&
曹操败走华容道歇后语
"org.apache.spark.OrcFileFormat"
ca name => name
}
val provider2 = s"$provider1.DefaultSource"
val loader = ContextOrSparkClassLoader
val rviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try{
rviceLoader.asScala.filter(_.shortName().equalsIgnoreCa(provider1)).toList match{
ca Nil =>
Try(loader.loadClass(provider1)).orEl(Try(loader.loadClass(provider2)))match{
ca Success(dataSource)=>
......
dataSource
ca Failure(error)=>.......
}
ca head :: Nil =>...... Class
ca sources =>.......val internalSources =
sources.filter(_.Name.startsWith("org.apache.spark"))
Class
}
}
backwardCompatibilityMap()
private val backwardCompatibilityMap: Map[String,String]={
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm ="org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc ="org.apache.spark.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
Map(
"org.apache.spark.sql.jdbc"-> jdbc,
"org.apache.spark.sql.jdbc.DefaultSource"-> jdbc,
"org.apache.ution.datasources.jdbc.DefaultSource"-> jdbc,
"org.apache.ution.datasources.jdbc"-> jdbc,
"org.apache.spark.sql.json"-> json,
"org.apache.spark.sql.json.DefaultSource"-> json,
"org.apache.ution.datasources.json"-> json,
"org.apache.ution.datasources.json.DefaultSource"-> json,
"org.apache.spark.sql.parquet"-> parquet,
"org.apache.spark.sql.parquet.DefaultSource"-> parquet,
"org.apache.ution.datasources.parquet"-> parquet,
"org.apache.ution.datasources.parquet.DefaultSource"-> parquet,
"org.apache.spark.DefaultSource"-> orc,
"org.apache.spark."-> orc,
"org.apache.DefaultSource"-> nativeOrc,
"org.apache."-> nativeOrc,
"org.apache.spark.ml.source.libsvm.DefaultSource"-> libsvm,
"org.apache.spark.ml.source.libsvm"-> libsvm,
"com.databricks.spark.csv"-> csv
)
}
lookupDataSource()会按照传⼊的provider的shortNmae()//jdbc去寻找DatasourceRegister的⼦类,jdbc返回的dataSource应该是org.apache.ution.datasources.jdbc.JdbcRelationProvider,再把视线放回到save()⽅法内。
执⾏的应该是saveToV1Source()⽅法
saveToV1Source()
工业工程技术
private def saveToV1Source():Unit={
// Code path for data source v1.
runCommand(df.sparkSession,"save"){
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = OrEl(Nil),
options = Map).planForWriting(mode, AnalysisBarrier(df.logicalPlan))
}
}
saveToV1Source()调⽤的是planForWriting(mode, AnalysisBarrier(df.logicalPlan)) planForWriting()
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan ={
if(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])){
throw new AnalysisException("Cannot save interval data type into external storage.")
}
ca dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caInnsitiveOptions, mode) ca format: FileFormat =>
dryingplanForWritingFileFormat(format, mode, data)
ca _ =>
<(s"${CanonicalName} does not allow create table as lect.")
}
}
planForWriting会根据所传⼊的dataSource,判断执⾏⽅法
因此调⽤的应该是SaveIntoDataSourceCommand()
SaveIntoDataSourceCommand()
ca class SaveIntoDataSourceCommand(
query: LogicalPlan,
dataSource: CreatableRelationProvider,
options: Map[String,String],
mode: SaveMode)extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]]= Seq(query)
override def run(sparkSession: SparkSession): Seq[Row]={
sparkSession.sqlContext, mode, options, Datat.ofRows(sparkSession, query))
}
override def simpleString:String={
val redacted = nf, Seq).toMap
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
}
}
执⾏ createRelation()
createRelation()
trait CreatableRelationProvider {
/**
* Saves a DataFrame to a destination (using data source-specific parameters)
*
* @param sqlContext SQLContext
* @param mode specifies what happens when the destination already exists
* @param parameters data source-specific parameters
* @param data DataFrame to save (i.e. the rows after executing the query)
* @return Relation with a known schema
*
* @since 1.3.0
*/
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String,String],
data: DataFrame): BaRelation
}
......
}