Java线程池实现多消费者批量处理队列消息
通常⽣产者-消费者的经典实现⽅式是,启动⼀个消费者线程从阻塞队列⾥获取消息进⾏消费,(多个)⽣产者往队列⾥插⼊待消费的数据然后⽴即返回。如果⽣产者⽣产的速率远⼤于消费者消费的速率,那么队列的待处理数据就会累积得越来越多。
本⽂的重点是封装实现(单个)消费者线程可以“批量处理”队列⾥的消息,这类似于多条数据插⼊数据库,由多次inrt优化为⼀次batch inrt。
顾名思义,“多消费者”就是开启多个消费者线程(其中每个消费者线程均可以批量处理队列消息),这⾥借⽤Java线程池来管理线程的⽣命周期:
⾸先,定义⼀个接⼝表⽰异步消费:
import urrent.RejectedExecutionException;
/**
炽热是什么意思* An object that accepts elements for future consuming.
*
* @param <E> The type of element to be consumed
*/
public interface AsynchronousConsumer<E> {
/**
* Accept an element for future consuming.
*
* @param e the element to be consumed
* @return true if accepted, fal otherwi
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of this element
* prevents it from being accepted for future consuming
* @throws RejectedExecutionException if the element
* cannot be accepted for consuming
*/
public boolean accept(E e);
}
再定义⼀个消费者线程(由于线程托管给线程池管理了,这⾥是定义⼀个Runnable),多个Runnable通过共⽤BlockingQueue来实现多个消费者消费。这⾥ 通过指定“batchSize”来表⽰批量处理(如果值⼤于1的话),TimeUnit指明了线程(Runnable)的最⼤空闲时间,超过该时间将会⾃动退出(如果值为0则表⽰永远等待)并提供beforeConsume、afterConsume、terminated等⽅法以便扩展:
import java.util.ArrayList;
import java.util.List;
恩情似海
import java.util.Objects;
import urrent.BlockingQueue;
import urrent.RejectedExecutionException;
import urrent.TimeUnit;
import s.logging.Log;
import s.logging.LogFactory;
/**
* An abstract {@code Runnable} of consumer for batches consuming Asynchronously
*
* @param <E> The type of element to be consumed
*/
public abstract class ConsumerRunnable<E> implements Runnable, AsynchronousConsumer<E>{
/
**
private static final Log log = Log(ConsumerRunnable.class);
/**the elements queue**/
private final BlockingQueue<E> queue;
/**
* the batchSize for {@link #consume} ud in every loop
*/
private volatile int batchSize;
/**
* Timeout in nanoconds for polling an element from the
* working queue(BlockingQueue).Thread us this timeout to
* wait up if the working queue is empty.
* zero means wait forever until an element become available.
*/
private volatile long waitTime;
/**
* If true then will cau run() quit
* when waiting for element if interrupted
*/
private volatile boolean quitIfInterrupted = true;
/**
* Allocates a new {@code ConsumerRunnable} object
* with the given initial parameters and default waitTime(0)
* which will cau the thread to wait forever until an element
* become available when the element queue is empty.
*
* @param queue the BlockingQueue for working
* @param batchSize the batch size ud in {@link #consume} for every loop
*
* @throws NullPointerException if the queue is null
* @throws IllegalArgumentException if the batchSize is less then or equal to zero */
public ConsumerRunnable(int batchSize, BlockingQueue<E> queue){
this.batchSize = quirePositive(batchSize);
this.queue = quireNonNull(queue);
}
/**
* Allocates a new {@code ConsumerRunnable} object
* with the given initial parameters.
* A time value of zero will cau the thread to wait forever
* until an element become available when the element queue is empty.
头绳
*
* @param queue the BlockingQueue for working
* @param batchSize the batch size ud in {@link #consume} for every loop
* @param time the time to wait. A time value of zero will cau
* the thread to wait forever until an element become available
* when the element queue is empty.
* @param unit the time unit of the time argument
*
* @throws NullPointerException if any specified parameter is null
* @throws IllegalArgumentException if the batchSize is less then or equal to zero, * or the time is less than zero
*/
public ConsumerRunnable(int batchSize, long time, TimeUnit unit,
BlockingQueue<E> queue){
this.batchSize = quirePositive(batchSize);
this.queue = quireNonNull(queue);
/**
* Accept an element for future consuming.
*
* <p>Default implementation is equally invoke
* {@link java.util.Queue#offer queue.offer}
*
* @param e the element to be consumed
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of this element
* prevents it from being accepted for future consuming
* @throws RejectedExecutionException if the element
* cannot be accepted for future consuming (optional)
* @return true if accepted, fal otherwi
*/
public boolean accept(E e){
return queue.offer(e);
}
/**
* Main worker run loop. Repeatedly gets elements from working queue
* and consumes them, while coping with a number of issues:
*
* <p>1. Each loop run is preceded by a call to beforeConsume, which
* might throw an exception, in which ca the {@link #consume}
* of the current loop will not be invoked.
*
* <p>2. Assuming beforeConsume completes normally, we consume the elements, * gathering any of its thrown exceptions to nd to afterConsume. Any thrown
* exception in the afterConsume conrvatively caus the runnable to quit.
古风图片高清*
* <p>3. After {@link #consume} of the current loop completes, we call afterExecute, * which may also throw an exception, which will cau the runnable to quit.
*
*/
@Override
public final void run() {
starting();
final List<E> list = new ArrayList<>();
try{
while(true){
try {
E info;
int bSize = batchSize;
for(int i=0;i<bSize&&list.size()<bSize&&(info=queue.poll())!=null;i++){
list.add(info);
}
if(list.isEmpty()){
try{
beforeWait();
} catch(Throwable ingnore){}
//waiting if necessary until an element becomes available
if(waitTime==0){
list.add(queue.take());
}el{
E e = queue.poll(waitTime, TimeUnit.NANOSECONDS);
if(e==null){//timeout
break;
}
list.add(e);
}el{
try{
beforeConsume(list);
}catch (Throwable t1) {
list.clear();//clear to be reud for next loop
<("The runnable["+getClass().getName()+"] which invoked beforeConsume occurred the exception:" +t1.toString(), t1);
continue;
}
doConsume(list);
}
} catch (InterruptedException e) {
if(quitIfInterrupted){
Thread t = Thread.currentThread();
if(!t.isInterrupted()){
//restores the interrupted status
t.interrupt();
}
break;
}
} catch(Throwable t){
if(t instanceof Error){
<("The runnable["+getClass().getName()+"] is about to die for the reason:"+t.toString(), t);
break;
}
// el continue;
}
}
} finally{
try{
//rechcek the list
if(!list.isEmpty()){
beforeConsume(list);
doConsume(list);
}
} finally{
terminated();
}
}
鉴赏美}
/**
* Consuming elements and ensure that the method
* {@code afterConsume} must be invoked in the end.
* @param list the elements that will be consumed
*/
private void doConsume(List<E> list){
Throwable t = null;
try {
consume(list);智力题
} catch (Throwable t1) {
t = t1;
throw t1;
} finally{
try {
afterConsume(list,t);
} finally{
list.clear();//clear to be reud for next loop
}
}
}
* @return the batchSize
*
* @e #tBatchSize
*/
public int getBatchSize(){
return batchSize;
}
/**
* Sets the batchSize for next loop ud in {@link #consume}.
* This overrides any value t in the constructor.
敖游*
* @param batchSize the new batchSize
* @throws IllegalArgumentException if the new batchSize is
* less than or equal to zero
* @e #getBatchSize
*/
public void tBatchSize(int batchSize){
this.batchSize = quirePositive(batchSize);
}
/**
* Returns the element queue ud by this thread. Access to the
* element queue is intended primarily for debugging and monitoring. * This queue may be in active u. Retrieving the element queue * does not prevent queued element from being saved.
从军行其六*
* @return the element queue
*/
public BlockingQueue<E> getQueue() {
return queue;
}
/**
* Sets the time limit for the thread when waiting for
* an element to become available. This overrides any value
* t in the constructor. zero means wait forever.
* Timeout will cau the runnable to quit.
*
* @param time the time to wait. A time value of zero will cau
* the thread to wait forever until an element become available * when the element queue is empty.
* @param unit the time unit of the {@code time} argument
* @throws IllegalArgumentException if {@code time} less than zero * @e #getWaitTime(TimeUnit)
*/
public void tWaitTime(long time, TimeUnit unit) {
long t = quireNonNegative(time));
this.waitTime = t;
}
/**
* Returns the thread waiting time, which is the time to wait up
* when the working queue is empty. zero means wait forever.
*
* @param unit the desired time unit of the result
* @return the time limit
* @e #tWaitTime(long, TimeUnit)
*/
public long getWaitTime(TimeUnit unit) {