spark读写数据的⽂件顺序问题
1.最近在⽤spark读写parquet⽂件,流程主要是把⼀个其他格式的⼤⽂件(⾏有序)转成parquet⽂件,这个parquet实际是包含很多⼦⽂件parquet的.然后再⽤spark读parquet,结果发现⾏的顺序错乱了.于是检查问题发⽣的原因.
⾸先说下这边说的顺序主要是说 ⾏顺序.⽽且这个顺序错乱指的是 ⼦⽂件间的相对顺序错乱,在单个⼦⽂件中的顺序还是不会变的(如果单个⼦⽂件没有被进⼀步拆分的话).
经过检查发现,不光是parquet,只要是spark读这种包含多个⼦⽂件的数据时,都会出现这种问题.
这是因为在spark读⽂件的中的⼀个步骤,org.apache.ution.DataSourceScanExec类中的
createNonBucketedReadRDD函数.
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
lectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.f.filesMaxPartitionBytes
val openCostInBytes = fsRelation.f.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = lectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = lectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, Path)) {
(0L Len by maxSplitBytes).map { offt =>
val remaining = Len - offt
val size = if (remaining > maxSplitBytes) maxSplitBytes el remaining
val hosts = getBlockHosts(blockLocations, offt, size)
PartitionedFile(
partition.values, String, offt, size, hosts)
}
} el {
val hosts = getBlockHosts(blockLocations, 0, Len)
Seq(PartitionedFile(
partition.values, String, 0, Len, hosts))
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].rever)
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Clo the current partition and move to the next. */
def cloPartition(): Unit = {
if (Empty) {
val newPartition =
FilePartition(
partitions.size,
partitions += newPartition
}
currentFiles.clear()
currentSize = 0旧金山湾
}
/
/ Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
cloPartition()
}
披萨饼怎么做// Add the given file to the current partition.番茄蔬菜汤
currentSize += file.length + openCostInBytes
currentFiles += file
}
cloPartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
下⾯对这个函数进⾏分析,⾸先在输⼊参数lectedPartitions中,就不是按⽂件名排序读进来的(注:parquet⽂件名的顺序实际就是正确的顺序).如
正确的顺序⽂件:
part-00000-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00001-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00002-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00002-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c001.snappy.parquet
part-00003-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00004-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
然⽽spark读进来的顺序就可能是这样的
part-00003-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00002-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00001-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00002-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c001.snappy.parquet
part-00001-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
part-00004-2df71d0c-9aa3-4a3d-b612-dda39f241ed7-c000.snappy.parquet
所以这⾥就产⽣问题.
另外splitFiles还会对⼦⽂件进⾏拆分读(如果⽂件格式⽀持拆分的话),如果单个⼦⽂件⼤于maxSplitBytes, 则就对这单个⼦⽂件进⼀步拆分,拆分的俩个⽂件还不是连着的顺序(拆分的尾部放在最后⾯),这样⼜会导致顺序错乱问题.
总体分析spark为什么这样做的原因: 为了负载均衡,保证每个partition的负载差不多.但是如果是顺序有要求的话,就有很⼤的问题了.
2.说下解决⽅法
while的意思 ⽅法1:在read的时候就把单个⼦⽂件进⾏排序,然后如果你的⽂件格式是可分的话,还需要设置openCostInBytes,设置为最⼤.如果⽂件格式是不可分的话,则不⽤设置openCostInBytes.
// 对⽂件夹中的单个⼦⽂件按名称排序
def glob(filename: String, hadoopConf: Configuration): Array[String] = {
val fs = new Path(filename).getFileSystem(hadoopConf)
val path = new Path(filename)
val status = try fs.listStatus(path) catch {
ca _: FileNotFoundException =>
}
val filteredStatus = status.filterNot(status => Name))
filteredStatus.map(_.String).sorted
}
//read
val files = glob(filepath, spark.sparkContext.hadoopConfiguration)
val df = ad.parquet(files:_*)
⽅法2:⾃⼰改写org.apache.ution.DataSourceScanExec类中的createNonBucketedReadRDD函数.相当于替换这个这个类. 其实主要就是改了splitFiles变量,不过这样可能会造成负载不均衡.
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
lectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.f.filesMaxPartitionBytes
val openCostInBytes = fsRelation.f.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = lectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = lectedPartitions.flatMap { partition =>
partition.files.sortBy(_.String).flatMap { file =>
val blockLocations = getBlockLocations(file)
统计表格怎么做
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, Path)) {
(0L Len by maxSplitBytes).map { offt =>
val remaining = Len - offt
val size = if (remaining > maxSplitBytes) maxSplitBytes el remaining
val hosts = getBlockHosts(blockLocations, offt, size)
PartitionedFile(
partition.values, String, offt, size, hosts)心有余悸
}
} el {
val hosts = getBlockHosts(blockLocations, 0, Len)
Seq(PartitionedFile(
partition.values, String, 0, Len, hosts))
}
}母亲节贺卡怎么做
}.toArray
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Clo the current partition and move to the next. */
def cloPartition(): Unit = {
if (Empty) {
白酒加啤酒
val newPartition =
FilePartition(
FilePartition(
partitions.size,
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
cloPartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
cloPartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
⽅法3: 先不管顺序,最后处理完对datat进⾏sort, 不过这样shuffle会⽐较久.