javapipeline模式_多线程设计模式——Pipeline(流⽔线)模
式
编辑推荐:
本⽂来⾃csdn,本⽂主要介绍了Pipeline(流⽔线)模式的问题解决思路,并通过⽰例代码介绍Pipeline模式的服务,希望本⽂对您的学习有所帮助。
模式名称
Pipeline(流⽔线)模式
模式解决的问题
有时⼀些线程的步奏⽐较冗长,⽽且由于每个阶段的结果与下阶段的执⾏有关系,⼜不能分开。
解决思路
可以将任务的处理分解为若⼲个处理阶段,上⼀个阶段任务的结果交给下⼀个阶段来处理,这样每个线程的处理是并⾏的,可以充分利⽤资源提⾼计算效率。
模式所使⽤的类:
Pipe对处理阶段的抽象,负责对输⼊进⾏处理,并将输出作为下⼀个阶段的输⼊:process()⽤于接收前⼀个处理阶段的处理结果,作为该处理阶段的输⼊,init()初始化当前处理阶段对外提供的服务,shutdown()关闭当前处理阶段对外提供的服务,tNextPipe()设置当前处理阶段的下⼀个处理阶段。
PipeContext对各个处理阶段的计算环境进⾏抽象,主要⽤于异常处理:handleError()⽤于对处理阶段泡醋的异常进⾏处理。
AbstractPipe类Pipe接⼝的抽象实现类:process()接收抢⼀个处理阶段的输⼊并调⽤其⼦类实现的doProcess⽅法对输⼊元素进⾏处
理,init()保存对其参数中制定的PipeContext实例的引⽤,⼦类可根据需要覆盖该⽅法以实现其服务的初始化。
Shutdown默认实现什么也不做,⼦类可根据需要覆盖该⽅法实现服务停⽌:tNextPipe()设置当前处理阶段的下⼀个处理阶
段,doProcess()留给⼦类实现的抽象⽅法,⽤于实现其服务的初始化。
WorkerThreadPipeDecprator基于⼯作线程的Pipe实现类,该Pipe实例会将接收到的输⼊元素存⼊队列,由制定个数的⼯作者线程对队列中输⼊元素进⾏处理,该类的⾃⾝主要负责⼯作者线程的⽣命周期的管理:process()接收前⼀个处理阶段的输⼊,并将其存⼊队列由⼯作者线程运⾏时取出进⾏处理,init()启动⼯作者线程并调⽤委托Pipe实例的init⽅法,shutdown()停⽌⼯作者线程并委托Pipe实例的shutdown ⽅法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法,dispatch()取队列中的输⼊元素并调⽤委托Pipe实例的process⽅法对其进⾏处理
ThreadPoolPopeDecorator基于线程池的Pipe的实现类:process()接收前⼀个处理阶段的输⼊,并向线程池提交⼀个对该输⼊进⾏相应处理的任务,init()调⽤委托pipe实例的init⽅法,shutdown()关闭当前Pipe实例对外提供的服务并调⽤委托Pipe实例的shutdown⽅
法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法。
AbstractParallelPile类AbstractPipe的⼦类,⽀持并⾏处理的Pipe实现类,该类对其每个输⼊元素(原始任务)⽣成相应的⼀组⼦任务,并以并⾏的⽅式去执⾏这些⼦任务,各个⼦任务的执⾏结果会被合并为相应原始任务的输出结果:bulidTasks()流给⼦类实现的抽象⽅法,⽤于根据制定的输⼊构造⼀组⼦任务,combineResults()留给⼦类实现的抽象⽅法,对各个并⾏⼦任务的处理结果进⾏合并,形成相应输⼊元素的输出结果。invokeParallel()实现以并⾏的⽅式执⾏⼀组任务,doProcess()实现该类对其输⼊的处理逻辑。
ConcreteParallelPipe类由应⽤定义的AbstractParallelPipe的⼦类:buildTasks()根据指定的输⼊构造⼀组⼦任务,combineResults()对各个并⾏⼦任务的处理结果进⾏合并,形成相应输⼊元素的输出结果
Pipeline类对符合Pipe的抽象:addPipe()往该Pipeline实例中添加⼀个Pipe实例。
SimplePipeline类基于AbstractPipe的Pipeline接⼝的⼀个简单实现类:addPipe()往该Pipeline实例中添加⼀个Pipe实
例,addAsWorkerThreadBadPipe()将制定的Pipe实例⽤WorkerThreadPipeDecorator实例包装后加⼊Pipeline实
例,addAsThreadPoolBadPipe()将制定的Pipe实例⽤ThreadPoolPipeDecorator实例包装后加⼊Pipeline实例。
⽰例代码
某系统需要⼀个数据同步的定时任务,该定时任务将数据库中符合制定条件的记录数据以⽂件的形式FTP传输(同步)到制定的主机上。该定时任务需要满⾜以下要求:
1.每个数据⽂件最多只包含N(如10000,具体可配置)条记录;当⼀个数据⽂件被写满时,其他代谢记录会被写⼊新的数据⽂件。
2.每个数据⽂件可能需要被传输到多台主机上。
3.本地要保留同步过的数据⽂件的备份。口语
因此,该定时任务需要做三件事情,都是⽐较耗时的操作,⽽且后⾯的操作还需要依赖前⾯操作的结果才能进⾏,不易拆分,如果只是⽤多线程,每个线程中仍然是按顺序串⾏处理也是不合适的,这样的话第⼆个步奏会出现多线程之间争夺资源导致时间浪费的问题,会更难完成任务,所以需要有Pipeline模式去执⾏。
数据同步定时任务
public class
DataSynctask implements Runnable{
public void run(){
ResultSet rs = null;
SimplePipeline
pipeline = buildPipeline();
pipeline.wDefaultPipeContext());
Connection dbConn = null;
dbConn = getConnection();
rs = qryRecords(dbConn);
processRecords(rs.pipeline);
}catch(Exception e){一起走过的路
e.printStackTrace();
钢甲铁拳
}finally{
if(null != dbConn){
try{
电影主题dbConn.clo();
}catch (SQLException e){
;
}
}
pipeline.shutdown(360,TimeUnit.SECONDS);
}
private ResultSet qryRecords(Connection dbConn)
throws Exception{
dbConn.tReadOnly(true);
PreparedStatement ps = dbConn
.prepareStatement("lect id,productId,packageId,msisdn,operationTime, operationTyoe," +
"effectiveDate,dueDate from subscriptions
order by operationTime",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ
_ONLY);
ResultSet rs = ps.executeQuery();
return rs;
}
private static Connection getConnection() throws
Exception{
Connection dbConn = null;
武警河南总队医院Class.forName("org.hsqldb.jdbc.JDBCDriver");
dbConn = Connectio
return dbConn;
}
private static Record makeRecordFrom(ResultSet
rs)
throws SQLException{
Record record = new Record();
record.Int("id"));
record.String("productId"));
record.String("msisdn"));
record.Timestamp ("operationTime"));英语试题
record.Int("operationType")); record.Timestamp ("effectiveDate"));
虾仁
record.Timestamp("dueDate"));
}
private static class RecordSaveTask{
public final Record[] records;
public final int targetFileIndex;
public final String recordDay;
public RecordSaveTask(Record[] records,int targetFileIndex){ ds = records;
this.targetFileIndex = targetFileIndex;
}
puclic RecordSaveTask(String recordDay,int
targeFileIndex){
岗位分析报告ds = null;
this.targetFileIndex = targetFileIndex;
}
} private SimplePipeline
buildPipeline(){
* 线程的本质是重复利⽤⼀定数量的线程,⽽不是针对
每个任务
都有⼀个专门的⼯作者线程。
* 这⾥,各个Pipe的初始化完全可以在上游Pipe初始化
完毕后再
初始化其后继Pipe,⼆不必多
* 个Pipe同时初始化。
* 因此,这个初始化的动作可以由⼀个线程来处理。
该线程处理
完各个Pipe的初始化后,可以继续
* 处理之后可能产⽣的任务,如出错处理。
* 所以,上述这些先后产⽣的任务可以由线程池中的
⼀个⼯作者
线程从头到尾执⾏。
*/
final ExecutorService helperExecutor = wSingleThreadExecutor(); final SimplePipeline
pipeline = new
SimplePipeline(helperExcecutor);
Pipe stageSaveFile
= new AbstractPipe(){
final RecordWriter recordWriter =Instance();
final Record[] records = ds;
File file;
if(null == records){
file = recordWriter.write(records,task.targerFileIndex);
}el{
try{
file = recordWriter.write(records.task.targetFileIndex);
}catch(IOException e){
throw new PipeException(this,task,"Failed
to save records",e);
}