多线程设计模式——Pipeline(流水线)模式

更新时间:2023-06-08 20:23:17 阅读: 评论:0

多线程设计模式——Pipeline(流⽔线)模式
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。
模式名称
Pipeline(流⽔线)模式
模式解决的问题
有时⼀些线程的步奏⽐较冗长,⽽且由于每个阶段的结果与下阶段的执⾏有关系,⼜不能分开。
assume解决思路
可以将任务的处理分解为若⼲个处理阶段,上⼀个阶段任务的结果交给下⼀个阶段来处理,这样每个线程的处理是并⾏的,可以充分利⽤资源提⾼计算效率。
模式所使⽤的类:
Pipe对处理阶段的抽象,负责对输⼊进⾏处理,并将输出作为下⼀个阶段的输⼊:process()⽤于接收前⼀个处理阶段的处理结果,作为该处理阶段的输⼊,init()初始化当前处理阶段对外提供的服务,shutd
own()关闭当前处理阶段对外提供的服务,tNextPipe()设置当前处理阶段的下⼀个处理阶段。
PipeContext对各个处理阶段的计算环境进⾏抽象,主要⽤于异常处理:handleError()⽤于对处理阶段泡醋的异常进⾏处理。 AbstractPipe类Pipe接⼝的抽象实现类:process()接收抢⼀个处理阶段的输⼊并调⽤其⼦类实现的doProcess⽅法对输⼊元素进⾏处理,init()保存对其参数中制定的PipeContext实例的引⽤,⼦类可根据需要覆盖该⽅法以实现其服务的初始化。
Shutdown默认实现什么也不做,⼦类可根据需要覆盖该⽅法实现服务停⽌:tNextPipe()设置当前处理阶段的下⼀个处理阶
段,doProcess()留给⼦类实现的抽象⽅法,⽤于实现其服务的初始化。
WorkerThreadPipeDecprator基于⼯作线程的Pipe实现类,该Pipe实例会将接收到的输⼊元素存⼊队列,由制定个数的⼯作者线程对队列中输⼊元素进⾏处理,该类的⾃⾝主要负责⼯作者线程的⽣命周期的管理:process()接收前⼀个处理阶段的输⼊,并将其存⼊队列由⼯作者线程运⾏时取出进⾏处理,init()启动⼯作者线程并调⽤委托Pipe实例的init⽅法,shutdown()停⽌⼯作者线程并委托Pipe实例的shutdown⽅法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法,dispatch()取队列中的输⼊元素并调⽤委托Pipe实例的process⽅法对其进⾏处理
ThreadPoolPopeDecorator基于线程池的Pipe的实现类:process()接收前⼀个处理阶段的输⼊,并向线程池提交⼀个对该输⼊进⾏相应处理的任务,init()调⽤委托pipe实例的init⽅法,shutdown()关闭当前Pipe实例对外提供的服务并调⽤委托Pipe实例的shutdown⽅法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法。
AbstractParallelPile类AbstractPipe的⼦类,⽀持并⾏处理的Pipe实现类,该类对其每个输⼊元素(原始任务)⽣成相应的⼀组⼦任务,并以并⾏的⽅式去执⾏这些⼦任务,各个⼦任务的执⾏结果会被合并为相应原始任务的输出结果:bulidTasks()流给⼦类实现的抽象⽅法,⽤于根据制定的输⼊构造⼀组⼦任务,combineResults()留给⼦类实现的抽象⽅法,对各个并⾏⼦任务的处理结果进⾏合并,形成相应输⼊元素的输出结果。invokeParallel()实现以并⾏的⽅式执⾏⼀组任务,doProcess()实现该类对其输⼊的处理逻辑。
ConcreteParallelPipe类由应⽤定义的AbstractParallelPipe的⼦类:buildTasks()根据指定的输⼊构造⼀组⼦任务,combineResults()对各个并⾏⼦任务的处理结果进⾏合并,形成相应输⼊元素的输出结果
Pipeline类对符合Pipe的抽象:addPipe()往该Pipeline实例中添加⼀个Pipe实例。
SimplePipeline类基于AbstractPipe的Pipeline接⼝的⼀个简单实现类:addPipe()往该Pipeline实例中添加⼀个Pipe实
例,addAsWorkerThreadBadPipe()将制定的Pipe实例⽤WorkerThreadPipeDecorator实例包装后加⼊Pipeline实
例,addAsThreadPoolBadPipe()将制定的Pipe实例⽤ThreadPoolPipeDecorator实例包装后加⼊Pipeline实例。
Pipeline模式的服务初始化序列图
⽰例代码
某系统需要⼀个数据同步的定时任务,该定时任务将数据库中符合制定条件的记录数据以⽂件的形式FTP传输(同步)到制定的主机上。该定时任务需要满⾜以下要求:
1.每个数据⽂件最多只包含N(如10000,具体可配置)条记录;当⼀个数据⽂件被写满时,其他代谢记录会被写⼊新的数据⽂件。
2.每个数据⽂件可能需要被传输到多台主机上。
3.本地要保留同步过的数据⽂件的备份。
因此,该定时任务需要做三件事情,都是⽐较耗时的操作,⽽且后⾯的操作还需要依赖前⾯操作的结果才能进⾏,不易拆分,如果只是⽤多线程,每个线程中仍然是按顺序串⾏处理也是不合适的,这样的话第⼆个步奏会出现多线程之间争夺资源导致时间浪费的问题,会更难完成任务,所以需要有Pipeline模式去执⾏。
数据同步定时任务
public class DataSynctask implements Runnable{
public void run(){
ResultSet rs = null;
SimplePipeline<RecordSaveTask,String> pipeline = buildPipeline();
pipeline.wDefaultPipeContext());
Connection dbConn = null;
try{
dbConn = getConnection();
rs = qryRecords(dbConn);
processRecords(rs.pipeline);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != dbConn){
try{
dbConn.clo();
}catch (SQLException e){
;
}
}
acescpipeline.shutdown(360,TimeUnit.SECONDS);
}
private ResultSet qryRecords(Connection dbConn) throws Exception{
dbConn.tReadOnly(true);
PreparedStatement ps = dbConn
.prepareStatement("lect id,productId,packageId,msisdn,operationTime,operationTyoe," + "effectiveDate,dueDate from subscriptions order by operationTime",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = ps.executeQuery();
return rs;
}
private static Connection getConnection() throws Exception{
Connection dbConn = null;
Class.forName("org.hsqldb.jdbc.JDBCDriver");
dbConn = Connection("jdbc:hsqldb:hsql://192.168.1.105:9001",
"SA","");
return dbConn;
}
private static Record makeRecordFrom(ResultSet rs) throws SQLException{
Record record = new Record();
record.Int("id"));
hrecord.String("productId"));
record.String("msisdn"));
record.Timestamp("operationTime"));
record.Int("operationType"));
record.Timestamp("effectiveDate"));
record.Timestamp("dueDate"));
}
private static class RecordSaveTask{
public final Record[] records;
public final int targetFileIndex;
public final String recordDay;
public RecordSaveTask(Record[] records,int targetFileIndex){
this.targetFileIndex = targetFileIndex;
}
puclic RecordSaveTask(String recordDay,int targeFileIndex){
this.targetFileIndex = targetFileIndex;
}
}
private SimplePipeline<RecordSavetask,String> buildPipeline(){
/*
* 线程的本质是重复利⽤⼀定数量的线程,⽽不是针对每个任务都有⼀个专门的⼯作者线程。
* 这⾥,各个Pipe的初始化完全可以在上游Pipe初始化完毕后再初始化其后继Pipe,⼆不必多
bier* 个Pipe同时初始化。
wechat什么意思中文
* 因此,这个初始化的动作可以由⼀个线程来处理。该线程处理完各个Pipe的初始化后,可以继续            * 处理之后可能产⽣的任务,如出错处理。
* 所以,上述这些先后产⽣的任务可以由线程池中的⼀个⼯作者线程从头到尾执⾏。
*/
final ExecutorService helperExecutor = wSingleThreadExecutor();
final SimplePipeline<RecordSaveTask,String> pipeline = new
SimplePipeline<RecordSaveFile,String>(helperExcecutor);
Pipe<RecordSaveTask,File> stageSaveFile = new AbstractPipe<RecordSaveTask,File>(){
final RecordWriter recordWriter =Instance();
final Record[] records = ds;
File file;
if(null == records){
file = recordWriter.write(records,task.targerFileIndex);
}el{
try{
file = recordWriter.write(records.task.targetFileIndex);
}catch(IOException e){
throw new PipeException(this,task,"Failed to save records",e);
}
}
advantage
};
/*
* 由于这⾥的⼏个Pipe都是处理I/O的,为了避免使⽤锁(以减少不必要的上下⽂切换)但⼜能            * 保证线程安全,故每个Pipe都采⽤单线程处理。
* 若各个Pipe要改⽤线程池来处理,需要注意:1)线程安全2)死锁。
*/
pipeline.addAsWorkerThreadBadPipe(stageSaveFile,1);
final String[][] ftpServerConfigs = retrieveFTPServConf();
final ThreadPoolExecutor ftpExecutorService = new ThreadPoolExecutor(1
中餐英语菜单ftpServerConfigs.length,60,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor executor){
if(!executor.isShutdown()){
try{
}catch(InterruptedException e){
;
}
}
}
});
Pipe<File,File,File> stageTransferFile = new AbstractParallelPipe<File,File,
File>((new SynchronousQueue<file>(),ftpExecutorService)){
darkblueFuture[ftpServerConfigs.length];
@Override
纽约时报双语版
public void init(PipeContext piptCtx){
super.init(pipeCtx);
String[] ftpServerConfig;
for(int i = 0; i <ftoServerConfigs.length; i++){
ftpServerConfig = ftpServerConfigs[i];
ftpClientUtilHolders[i] = wInstance(
ftpServerConfig[0],ftpServerConfig[1],ftpServerConfig[2]);
}
}
@Override
protected List<Callable<File>> buildTasks(final File file){
List<Callable<File>> tasks = new LinkedList<Callable<File>>();
for(Fucture<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
tasks.add(new ParallelTask(fipClientUtilHolder,file));
}
return tasks;
}
@Override
protected File combineResults(List<Future<File>> subTaskResults)
throws Exception{
圣诞节英语祝福语if(0 == subTaskResults.size()){
return null;
return null;
}
File file = null;
file = (0).get();
return file;
}
@Override
public void shutdown(long timeout,TimeUnit unit){
super.shutdown(timeout/unit);
ftpExecutorService.shutdown();
try{
ftpExecutorService.awaitTermination(timeout, unit);
}catch(InterruptedException e){
;
}
for(Future<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
try{
<().disconnect();
}catch(Exception e){
;
}
}
}
class ParallelTask implements Callable<File>{
public final Future<FTPClientUtil>ftpUtilHodler;
public final File file2Transfer;
public ParallelTask(Future<FTPClientUtil> ftpUtilHodler,
File file2Transfer){
this.ftpUtilHodler = ftpUtilHodler;
this.file2Transfer = file2Transfer;
}
@Override
public File call() throws Exception{
File transferedFile = null;
<().upload(file2Transfer);
transferedFile = file2Transfer;
return transferedFile;
}
}
};
pipeline.addAsWorkerThreadBadPipe(stageTransferFile,1);
//备份已经传输的数据⽂件
Pipe<File,Void> stageBackupFile = new AbstractPipe<File,Void>(){ @Override
protected Void doProcess(File transferedFile) throws PipeException{                    RecordWriter.backupFile(transferedFile);
return null;
}
public void shutdown(long timeout,TimeUnit unit){
//所有⽂件备份完毕后,七⾥掉空⽂件夹
RecordWriter.purgeDir();
}
};
pipeline.addAsWorkerThreadBadPipe(stageTransferFile,1);
return pipeline;
}

本文发布于:2023-06-08 20:23:17,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/78/905316.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:处理   任务   线程   阶段
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图