spark写入到mysql(C3p0连接池方式)

更新时间:2023-05-15 18:43:45 阅读: 评论:0

spark写⼊到mysql(C3p0连接池⽅式)
emmm
总共四个⽂件
MysqlPool⽂件 获取连接数plasticsurgery
mysqlutils⽂件 处理连接池逻辑
propertyutils⽂件 获取mysql配载⽂件信息
mysql-ur.properties 供propertils获取连接信息
Mysql-ur.properties
mysql.pool.jdbc.url=xxx
mysql.pool.jdbc.urName=xxx
mysql.pool.jdbc.passWord=xxx
mysql.pool.jdbc.sql.jdbc.Driver
mysql.pool.jdbc.minPoolSize=5
mysql.pool.jdbc.maxPoolSize=200
mysql.pool.jdbc.acquireIncrement=5
mysql.pool.jdbc.maxStatements=180
MySQLPoolManager.scala
fts.util
import java.sql.Connection
hange.v2.c3p0.ComboPooledDataSource
object MySQLPoolManager {
var mysqlManager: MysqlPool = _
def getMysqlManager: MysqlPool ={
synchronized {
if(mysqlManager == null){
mysqlManager = new MysqlPool
}
}
mysqlManager
}
class MysqlPool extends Serializable {
private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
try {
FileProperties("mysql-ur.properties", "fts_common_jdbc_driver"))
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.urName"))
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.passWord"))
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.url"))
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.driverClass"))
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.minPoolSize").toInt)
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.maxPoolSize").toInt)
cpds.FileProperties("mysql-ur.properties", "mysql.pool.jdbc.acquireIncrement").toInt)      cpds.FilePropertie
s("mysql-ur.properties", "mysql.pool.jdbc.maxStatements").toInt)
} catch {
} catch {
ca e: Exception => e.printStackTrace()
ca e: NullPointerException => e.printStackTrace()
}
def getConnection: Connection ={
try {
} catch {
ca ex: Exception =>
ex.printStackTrace()
null
}
}
def clo(): Unit ={
try {
cpds.clo()
} catch {
资产负债表的编制ca ex: Exception =>
ex.printStackTrace()
}
}
}
}
MySQLUtils.scala
package com.deppon.fts.util
import java.sql.{Date, Timestamp}
import org.apache.log4j.Logger
import org.apache.pes._
import org.apache.spark.sql.{DataFrame, SQLContext}
object MySQLUtils {
val logger: Logger = SimpleName)
ca class Cust(id: Integer, name: String, sales: Double, discount: Double, state: String)
/**
* 将DataFrame所有类型(除id外)转换为String后,通过c3p0的连接池⽅法,向mysql写⼊数据
*
* @param tableName      表名
* @param resultDateFrame DataFrame
*/
def saveDFtoDBUPool(tableName: String, resultDateFrame: DataFrame){
val colNumbers = lumns.length
val sql = getInrtSql(tableName, colNumbers)
val columnDataTypes = resultDateFrame.schema.fields.map(_.dataType)
resultDateFrame.foreachPartition(partitionRecords =>{
val conn = Connection //从连接池中获取⼀个连接
val preparedStatement = conn.prepareStatement(sql)
val metaData = Columns(null, "%", tableName, "%") //通过连接获取表名对应数据表的元数据        try {
conn.tAutoCommit(fal)
partitionRecords.foreach(record =>{
//注意:tString⽅法从1开始,String()⽅法从0开始
for(i <- 1 to colNumbers){
val value = (i - 1)
on january>生命中不能承受之轻下载
val dateType = columnDataTypes(i - 1)
val dateType = columnDataTypes(i - 1)
if(value != null){ //如何值不为空,将类型转换为String
preparedStatement.tString(i, String)
dateType match {
ca _: ByteType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: ShortType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: IntegerType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: LongType => preparedStatement.tLong(i, As[Long](i - 1))
ca _: BooleanType => preparedStatement.tBoolean(i, As[Boolean](i - 1))
ca _: FloatType => preparedStatement.tFloat(i, As[Float](i - 1))
ca _: DoubleType => preparedStatement.tDouble(i, As[Double](i - 1))
ca _: StringType => preparedStatement.tString(i, As[String](i - 1))
ca _: TimestampType => preparedStatement.tTimestamp(i, As[Timestamp](i - 1))
ca _: DateType => preparedStatement.tDate(i, As[Date](i - 1))
ca _ => throw new RuntimeException(s"nonsupport ${dateType} ")
}
}el{ //如果值为空,将值设为对应类型的空值
metaData.absolute(i)
preparedStatement.tNull(i, Int("DATA_TYPE"))
}
}
preparedStatement.addBatch()
})
} catch {
ca e: Exception => println(s"@@ saveDFtoDBUPool ${e.getMessage}")
// do some log
} finally {
preparedStatement.clo()
conn.clo()
}
})
}
/
**
* 拼装inrt SQL
*
* @param tableName
* @param colNumbers
* @return
*/
def getInrtSql(tableName: String, colNumbers: Int): String ={
var sqlStr ="inrt into " + tableName + " values("
for(i <- 1 to colNumbers){
sqlStr +="?"
if(i != colNumbers){
sqlStr +=", "
}
}
sqlStr +=")"
sqlStr
}
/** 以元组的⽅式返回mysql属性信息 **/
//    def getMySQLInfo: (String, String, String)={
//      val jdbcURL = FileProperties("mysql-ur.properties", "mysql.jdbc.url")
//      val urName = FileProperties("mysql-ur.properties", "mysql.jdbc.urname")
chinahr//      val passWord = FileProperties("mysql-ur.properties", "mysql.jdbc.password")
//      (jdbcURL, urName, passWord)
//    }
//    def getDFFromMysql(sqlContext: SQLContext, mysqlTableName: String, queryCondition: String): DataFrame ={ //      val (jdbcURL, urName, passWord)= getMySQLInfo
//      val prop = new Properties()
//      val prop = new Properties()
//      prop.put("ur", urName)
//      prop.put("password", passWord)
//
//      if(null == queryCondition ||""== queryCondition)
//        ad.jdbc(jdbcURL, mysqlTableName, prop)
主要看气质
//      el
//        ad.jdbc(jdbcURL, mysqlTableName, prop).where(queryCondition)
//    }
/**
* 删除数据表
*
* @param sqlContext
* @param mysqlTableName
* @returnbehold
*/
def DeleteMysqlTable(sqlContext: SQLContext, mysqlTableName: String): Boolean ={
val conn = Connection //从连接池中获取⼀个连接      val preparedStatement = ateStatement()
try {
} catch {
ca e: Exception =>
println(s"mysql deleteMysqlTable error:${e.getMessage}")
fal
} finally {
preparedStatement.clo()
conn.clo()
}
}
/**
* 删除表中的数据
*
* @param sqlContextliquid
* @param mysqlTableName
* @param condition
* @return
*/
def deleteMysqlTableData( mysqlTableName: String, condition: String): Boolean ={
val conn = Connection //从连接池中获取⼀个连接      val preparedStatement = ateStatement()
try {
ca e: Exception =>
println(s"mysql deleteMysqlTable error:${e.getMessage}")
fal
} finally {
preparedStatement.clo()choo
conn.clo()
}
}
/**
* 保存DataFrame 到 MySQL中,如果表不存在的话,会⾃动创建
*
* @param tableName
* @param resultDateFrame
*/
def saveDFtoDBCreateTableIfNotExist(tableName: String, resultDateFrame: DataFrame){      //如果没有表,根据DataFrame建表
//      createTableIfNotExist(tableName, resultDateFrame)
//验证数据表字段和dataFrame字段个数和名称,顺序是否⼀致
//      verifyFieldConsistency(tableName, resultDateFrame)
//      verifyFieldConsistency(tableName, resultDateFrame)
//保存df
saveDFtoDBUPool(tableName, resultDateFrame)
}
/**
* 拼装inrtOrUpdate SQL 语句
*
* @param tableName
* @param cols
* @param updateColumns
* @return
*/
def getInrtOrUpdateSql(tableName: String, cols: Array[String], updateColumns: Array[String]): String ={
val colNumbers = cols.length
var sqlStr ="inrt into " + tableName + s" (${cols.mkString(",")}) values("
for(i <- 1 to colNumbers){
sqlStr +="?"
if(i != colNumbers){
sqlStr +=", "
}
kalimba
}
sqlStr +=") ON DUPLICATE KEY UPDATE "
updateColumns.foreach(str =>{
sqlStr += s"$str = ?,"
})
sqlStr.substring(0, sqlStr.length - 1)
}
/**
* 通过inrtOrUpdate的⽅式把DataFrame写⼊到MySQL中,注意:此⽅式,必须对表设置主键
*
* @param tableName
* @param resultDateFrame
* @param updateColumns
*/
def inrtOrUpdateDFtoDBUPool(tableName: String, resultDateFrame: DataFrame, updateColumns: Array[String]){      val colNumbers = lumns.length
val sql = getInrtOrUpdateSql(tableName, lumns, updateColumns)
val columnDataTypes = resultDateFrame.schema.fields.map(_.dataType)
println(">>#### sql = " + sql)
resultDateFrame.foreachPartition(partitionRecords =>{
val conn = Connection //从连接池中获取⼀个连接
val preparedStatement = conn.prepareStatement(sql)
val metaData = Columns(null, "%", tableName, "%") //通过连接获取表名对应数据表的元数据        try {
conn.tAutoCommit(fal)
partitionRecords.foreach(record =>{
//注意:tString⽅法从1开始,String()⽅法从0开始
for(i <- 1 to colNumbers){
val value = (i - 1)
val dateType = columnDataTypes(i - 1)
if(value != null){ //如何值不为空,将类型转换为String
preparedStatement.tString(i, String)
dateType match {
ca _: ByteType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: ShortType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: IntegerType => preparedStatement.tInt(i, As[Int](i - 1))
ca _: LongType => preparedStatement.tLong(i, As[Long](i - 1))
ca _: BooleanType => preparedStatement.tInt(i, As[Boolean](i - 1)) 1 el 0)
ca _: FloatType => preparedStatement.tFloat(i, As[Float](i - 1))
ca _: DoubleType => preparedStatement.tDouble(i, As[Double](i - 1))
ca _: StringType => preparedStatement.tString(i, As[String](i - 1))

本文发布于:2023-05-15 18:43:45,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/109673.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:对应   获取   连接   数据表   数据   名称   承受
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图