Pipeline(流⽔线)模式
模式名称
Pipeline(流⽔线)模式
模式解决的问题
有时⼀些线程的步奏⽐较冗长,⽽且由于每个阶段的结果与下阶段的执⾏有关系,⼜不能分开。
解决思路
可以将任务的处理分解为若⼲个处理阶段,上⼀个阶段任务的结果交给下⼀个阶段来处理,这样每个线程的处理是并⾏的,可以充分利⽤资源提⾼计算效率。
模式所使⽤的类:
Pipe
对处理阶段的抽象,负责对输⼊进⾏处理,并将输出作为下⼀个阶段的输⼊:process()⽤于接收前⼀个处理阶段的处理结果,作为该处理阶段的输⼊,init()初始化当前处理阶段对外提供的服务,shutdown
()关闭当前处理阶段对外提供的服务,tNextPipe()设置当前处理阶段的下⼀个处理阶段。
PipeContext
对各个处理阶段的计算环境进⾏抽象,主要⽤于异常处理:handleError()⽤于对处理阶段泡醋的异常进⾏处理。
AbstractPipe 类
Pipe接⼝的抽象实现类:process()接收抢⼀个处理阶段的输⼊并调⽤其⼦类实现的doProcess⽅法对输⼊元素进⾏处理,init()保存对其参数中制定的PipeContext实例的引⽤,⼦类可根据需要覆盖该⽅法以实现其服务的初始化。
Shutdown 默认实现什么也不做,⼦类可根据需要覆盖该⽅法实现服务停⽌:tNextPipe()设置当前处理阶段的下⼀个处理阶段,doProcess()留给⼦类实现的抽象⽅法,⽤于实现其服务的初始化。
WorkerThreadPipeDecprator
基于⼯作线程的Pipe实现类,该Pipe实例会将接收到的输⼊元素存⼊队列,由制定个数的⼯作者线程对队列中输⼊元素进⾏处理,该类的⾃⾝主要负责⼯作者线程的⽣命周期的管理:process()接收前
⼀个处理阶段的输⼊,并将其存⼊队列由⼯作者线程运⾏时取出进⾏处理,init()启动⼯作者线程并调⽤委托Pipe实例的init⽅法,shutdown()停⽌⼯作者线程并委托Pipe实例的shutdown⽅法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法,dispatch()取队列中的输⼊元素并调⽤委托Pipe实例的process⽅法对其进⾏处理
ThreadPoolPopeDecorator
基于线程池的Pipe的实现类:process()接收前⼀个处理阶段的输⼊,并向线程池提交⼀个对该输⼊进⾏相应处理的任务,init()调⽤委托pipe实例的init⽅法,shutdown()关闭当前Pipe实例对外提供的服务并调⽤委托Pipe实例的shutdown⽅
法,tNextPipe()调⽤委托Pipe实例的tNextPipe⽅法。
AbstractParallelPile类
AbstractPipe的⼦类,⽀持并⾏处理的Pipe实现类,该类对其每个输⼊元素(原始任务)⽣成相应的⼀组⼦任务,并以并⾏的⽅式去执⾏这些⼦任务,各个⼦任务的执⾏结果会被合并为相应原始任务的输出结果:bulidTasks()流给⼦类实现的抽象⽅法,⽤于根据制定的输⼊构造⼀组⼦任务,combineResults()留给⼦类实现的抽象⽅法,对各个并⾏⼦任务的处理结果进⾏合并,形成相应输
⼊元素的输出结果。invokeParallel()实现以并⾏的⽅式执⾏⼀组任务,doProcess()实现该类对其输⼊的处理逻辑。
ConcreteParallelPipe类
由应⽤定义的AbstractParallelPipe的⼦类:buildTasks()根据指定的输⼊构造⼀组⼦任务,combineResults()对各个并⾏⼦任务的处理结果进⾏合并,形成相应输⼊元素的输出结果
Pipeline类
对符合Pipe的抽象:addPipe()往该Pipeline实例中添加⼀个Pipe实例。
SimplePipeline类
基于AbstractPipe的Pipeline接⼝的⼀个简单实现类:addPipe()往该Pipeline实例中添加⼀个Pipe实
例,addAsWorkerThreadBadPipe()将制定的Pipe实例⽤WorkerThreadPipeDecorator实例包装后加⼊Pipeline实
例,addAsThreadPoolBadPipe()将制定的Pipe实例⽤ThreadPoolPipeDecorator实例包装后加⼊Pipeline实例。
Pipeline模式的服务初始化序列图
⽰例代码
某系统需要⼀个数据同步的定时任务,该定时任务将数据库中符合制定条件的记录数据以⽂件的形式FTP传输(同步)到制定的主机上。该定时任务需要满⾜以下要求:
关于桥的民间故事
1. 每个数据⽂件最多只包含N(如10000,具体可配置)条记录;当⼀个数据⽂件被写满时,其他代谢记录会被写⼊新的数据⽂件。
2. 每个数据⽂件可能需要被传输到多台主机上。
3. 本地要保留同步过的数据⽂件的备份。
因此,该定时任务需要做三件事情,都是⽐较耗时的操作,⽽且后⾯的操作还需要依赖前⾯操作的结果才能进⾏,不易拆分,如果只是⽤多线程,每个线程中仍然是按顺序串⾏处理也是不合适的,这样的话第⼆个步奏会出现多线程之间争夺资源导致时间浪费的问题,会更难完成任务,所以需要有Pipeline模式去执⾏。
数据同步定时任务
public class DataSynctask implements Runnable {
大狮子简笔画public void run(){
ResultSet rs =null;
SimplePipeline<RecordSaveTask, String> pipeline =buildPipeline();
pipeline.wDefaultPipeContext());
Connection dbConn =null;
try{
岳然dbConn =getConnection();
rs =qryRecords(dbConn);
processRecords(rs.pipeline);
}catch(Exception e){
e.printStackTrace();
}finally{
}finally{
if(null!= dbConn){
try{
dbConn.clo();
}catch(SQLException e){
简约而不简单;
}
}
pipeline.shutdown(360, TimeUnit.SECONDS);
}
private ResultSet qryRecords (Connection dbConn)
throws Exception {
dbConn.tReadOnly(true);
PreparedStatement ps = dbConn
.prepareStatement("lect id,productId,packageId,msisdn,operationTime, operationTyoe, " +
"effectiveDate,dueDate from subscriptions order by operationTime", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ
_ONLY);
ResultSet rs = ps.executeQuery();
return rs;
}
private static Connection getConnection ()throws
Exception {
Connection dbConn =null;
Class.forName("org.hsqldb.jdbc.JDBCDriver");
dbConn = Connectio
("jdbc:hsqldb:hsql://192.168.1.105:9001","SA","");
return dbConn;
信息中心工作总结
}
private static Record makeRecordFrom (ResultSet rs)
throws SQLException {
Record record=new Record();
record.Int("id"));
record.String("productId"));
record.String("msisdn"));
record.Timestamp
("operationTime"));
record.Int("operationType"));
record.Timestamp
("effectiveDate"));
record.Timestamp("dueDate"));
}
private static class RecordSaveTask {
public final Record[] records;
public final int targetFileIndex;
public final String recordDay;
public RecordSaveTask(Record[] records,int targetFileIndex){
this.targetFileIndex = targetFileIndex;
}
puclic RecordSaveTask(String recordDay,int
targeFileIndex){
this.targetFileIndex = targetFileIndex;
}
}
private SimplePipeline<RecordSavetask, String> buildPipeline (){
/*
* 线程的本质是重复利⽤⼀定数量的线程,⽽不是针对
每个任务
都有⼀个专门的⼯作者线程。
* 这⾥,各个Pipe的初始化完全可以在上游Pipe初始化
* 这⾥,各个Pipe的初始化完全可以在上游Pipe初始化
完毕后再
初始化其后继Pipe,⼆不必多
* 个Pipe同时初始化。
* 因此,这个初始化的动作可以由⼀个线程来处理。
该线程处理
完各个Pipe的初始化后,可以继续
* 处理之后可能产⽣的任务,如出错处理。
* 所以,上述这些先后产⽣的任务可以由线程池中的
⼀个⼯作者
线程从头到尾执⾏。
*/
final ExecutorService helperExecutor = wSingleThreadExecutor();
final SimplePipeline<RecordSaveTask, String> pipeline =new
SimplePipeline<RecordSaveFile, String>(helperExcecutor);
Pipe<RecordSaveTask, File> stageSaveFile =new AbstractPipe<RecordSaveTask, File>(){ final RecordWriter recordWriter = Instance();
final Record[] records = ds;
File file;
if(null==records)
{
file = recordWriter.write(records, task.targerFileIndex);
}el
{
try{
file = recordWriter.write(records.task.targetFileIndex);
}catch(IOException e){
throw new PipeException(this, task,"Failed to save records", e);
}
}
};
/*
* 由于这⾥的⼏个Pipe都是处理I/O的,为了避免使⽤锁
(以减少不必要的上下⽂切换)但⼜能
* 保证线程安全,故每个Pipe都采⽤单线程处理。
* 若各个Pipe要改⽤线程池来处理,需要注意:1
线程安全2)死锁。
*/
pipeline.addAsWorkerThreadBadPipe(stageSaveFile,1);
final String[][] ftpServerConfigs = retrieveFTP
ServConf();
final ThreadPoolExecutor ftpExecutorService =new ThreadPoolExecutor(1
ftpServerConfigs.length,60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
if(!executor.isShutdown()){
try{
}catch(InterruptedException e){
;
}
}
}
});
Pipe<File, File, File> stageTransferFile =new AbstractParallelPipe<File, File,
File>((new SynchronousQueue<file>(), ftpExecutor
Service)){
Future[ftpServerConfigs.length];
@Override
public void init (PipeContext piptCtx){
super.init(pipeCtx);
String[] ftpServerConfig;
for(int i =0; i < ftoServerConfigs.length; i++){
ftpServerConfig = ftpServerConfigs[i];
ftpClientUtilHolders[i]= wInstance(
ftpClientUtilHolders[i]= wInstance(
ftpServerConfig[0], ftpServerConfig[1], ftpServer
罗非鱼做法大全家常
Config[2]);
}
}
@Override
protected List<Callable<File>> buildTasks (final File file){
List<Callable<File>> tasks =new LinkedList<Callable<File>>();
16岁男生for(Fucture<FTPClientUtil> ftpClientUtilHolder :
ftpClientUtilHolders){
tasks.add(new ParallelTask(fipClientUtilHolder, file));
}
return tasks;
}
@Override
protected File combineResults (List < Future < File >> subTaskResults) throws Exception {
if(0== subTaskResults.size()){
return null;
}
File file =null;
file = (0).get();
return file;
}
@Override
public void shutdown (long timeout, TimeUnit unit){
super.shutdown(timeout / unit);
ftpExecutorService.shutdown();
try{
ftpExecutorService.awaitTermination(timeout, unit);
}catch(InterruptedException e){
;
}
for(Future<FTPClientUtil> ftpClientUtilHolder :
ftpClientUtilHolders){
try{
<().disconnect();
}catch(Exception e){
;
}
}
}
class ParallelTask implements Callable<File>{
public final Future<FTPClientUtil> ftpUtilHodler;
public final File file2Transfer;
public ParallelTask(Future<FTPClientUtil>
ftpUtilHodler,
File file2Transfer){
this.ftpUtilHodler = ftpUtilHodler;
this.file2Transfer = file2Transfer;
}
@Override
public File call()throws Exception {
File transferedFile =null;
<().upload(file2Transfer);
十一月的英文transferedFile = file2Transfer;
return transferedFile;
}
}
};
pipeline.addAsWorkerThreadBadPipe(stageTrans
ferFile,1);
//备份已经传输的数据⽂件
Pipe<File, Void> stageBackupFile =new AbstractPipe<File, Void>(){