java⾼并发有哪些框架_纯Java——简易⾼并发框架
0.源代码
github-简易⾼并发框架
瓜片茶注:本篇博客知识来⾃于⽹课。
1.问题来源以及w
对于⼀个题库系统。考试组要有批量的离线⽂档要⽣成。题库组批量的题⽬要进⾏排重,且要根据条件批量修改题⽬内容。对于
痛点:
批量任务完成缓慢
所有的问题都围绕着“查询”,即查询进度影响总体性能
我们希望尽量使⽤友好(如果⽤多线程来提⾼性能,我们希望能屏蔽细节)
因此我们需要⼀个可以提供查询进度通⽤的框架。
2.我们该怎么做?
这⾥先要明确“任务”(Task)和“⼯作”(Job)的关系。对于⼀个⼯作,他内部可能须有许多的任务,任务是他的⼦元素(属性、字段)。⽤并发安全的类确保每个⼯作的属性和⼯作下的每个任务信息,也意味着⼯作和任务的注册机制。
需要并发安全的类保存每个任务的处理结果(TaskResult)。
需要提供查询接⼝,供外部的使⽤。
这⾥我们不处理对于⼯作的检查。有兴趣的可以实现。
3.总体流程
这⾥不按照流程讲解,⽽是按照类关系从下⽽上讲解。
4.⽬录结构
5.TaskResultType
package me.hcFramework.pool.vo;
//这个类只是⽤来作为标志的信息。
public enum TaskResultType {
SUCCESS, //表⽰任务成功
FAILSURE, //表⽰任务失败
EXCEPTION; //表⽰发⽣了异常,这⾥我们不去详尽判断,只⽤这个标⽰来笼统表⽰}
6.TaskResult
package me.hcFramework.pool.vo;
/**
*
* @param 业务⽅法处理后的业务结果数据的类型
*
* 对属性使⽤final修饰是为了使其不可改
*/
public class TaskResult {
//⽤户业务是否成功完成
private final TaskResultType resultType;
/
/业务⽅法处理后的业务结果数据
private final R returnType;
//如果失败,则失败原因
private final String reason;
//针对任务失败的构造⽅法
public TaskResult(TaskResultType resultType , R returnType , String reason) { sultType = resultType;
有关童年的作文
}
//针对任务成功的构造⽅法
public TaskResult(TaskResultType resultType , R returnType) {
}
//因为我们希望字段不可改,设置为了final。所以只提供getters
public TaskResultType getResultType() {特布他林片
return resultType;
}
public R getReturnType() {
return returnType;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return "TaskResult [resultType=" + resultType + ", returnType=" + returnType + ", reason=" + reason + "]";
}
}
在这⾥其实可以发⽣⼀点⼩改动。即:把错误信息归并到TaskResultType中。这样⼀个TaskResultType包括成功,错误/异常以及其原因就完整了。这⾥不过多介绍。
7.JobInfo
package me.hcFramework.pool.vo;
import java.util.LinkedList;
import java.util.List;
import urrent.LinkedBlockingDeque;
import urrent.atomic.AtomicInteger;
/**
* 可以看作是⼀堆Task的打包+信息控制
* 与TaskResult⼀样,⼀旦设置好了就不许再次更改
*/
public class JobInfo {
//唯⼀性标志
private final String jobName;
//任务处理器,要求业务⼈员实现接⼝
private final ITaskProcessor, ?> taskProcessor;
//⼯作(Job)中任务(Task)的数量
private final int jobLength;
//以下两个类保证操作原⼦性
//任务总执⾏成功个数
private AtomicInteger successCount;
//已执⾏的任务总数
private AtomicInteger taskProcessCount;
//每个任务的处理结果,供查询调⽤
private LinkedBlockingDeque> taskDetailQueue;
笑说
public JobInfo(String jobName , int jobLength , ITaskProcessor,?> taskProcessor) {
包菜炒饭this.jobName = jobName;
this.jobLength = jobLength;
this.taskProcessor = taskProcessor;
this.successCount = new AtomicInteger(0);
企业环保this.taskProcessCount = new AtomicInteger(0);
this.taskDetailQueue = new LinkedBlockingDeque>(jobLength);
}
//提供⼯作的整体进度信息
public String getTotalProcess() {
return "success[" + ()+"]/current[" + () + "],Total=[" + jobLength + "]"; }
//取得⼯作中每个任务的详情
public List> getTaskDetail() {
List> taskDetailList = new LinkedList>();
TaskResult taskResult;
//pollFirst()⽅法返回双端队列的第⼀个元素,返回的元素会从列表中移除
while((taskResult = taskDetailQueue.pollFirst()) != null) {
taskDetailList.add(taskResult);
}
return taskDetailList;
}
//放⼊⼯作详情,只需要保证最终个数正确即可,不需要加锁
自什么自什么的四字词语
public void addTaskResult(TaskResult result) {
if(TaskResultType.SUCCESS == ResultType()) {
}
taskDetailQueue.add(result);
}
public String getJobName() {
return jobName;
}
public ITaskProcessor, ?> getTaskProcessor() {
return taskProcessor;
}
public int getJobLength() {
return jobLength;
}
public int getSuccessCount() {
();
}
public int getTaskProcessCount() {
();
}
@Override
public String toString() {
return "JobInfo [jobName=" + jobName + ", taskProcessor=" + taskProcessor + ", jobLength=" + jobLength + ", successCount=" + successCount + ", taskProcessCount=" + taskProcessCount + ", taskDetailQueue=" + taskDetailQueue + "]";
}
}
关于LinkedBlockingDeque的说明:他是线程安全的。他是双端队列,任何⼀端都可以进⾏元素的出⼊。
8.ITaskProcessor
package me.hcFramework.pool.vo;
/
**
* 定义接⼝,所有需要完成的任务都需要实现此接⼝进⾏
*
* @param 业务⽅法需要的数据
* @param 业务⽅法处理后的业务结果数据的类型