Java多线程总结之线程安全队列Queue
在Java多线程应⽤中,队列的使⽤率很⾼,多数⽣产消费模型的⾸选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和⾮阻塞队列,其中阻塞队列的典型例⼦是BlockingQueue,⾮阻塞队列的典型例⼦是ConcurrentLinkedQueue,在实际应⽤中要根据实际需要选⽤阻塞队列或者⾮阻塞队列。
注:什么叫线程安全?这个⾸先要明确。线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(⽐如修改、遍历、查看)⽽使这些变量结构被破坏或者针对这些变量操作的原⼦性被破坏,则这个类就不是线程安全的。
今天就聊聊这两种Queue,本⽂分为以下两个部分,⽤分割线分开:
BlockingQueue 阻塞算法
ConcurrentLinkedQueue,⾮阻塞算法
⾸先来看看BlockingQueue:
Queue是什么就不需要多说了吧,⼀句话:队列是先进先出。相对的,栈是后进先出。如果不熟悉的话先找本基础的数据结构的书看看吧。
BlockingQueue,顾名思义,“阻塞队列”:可以提供阻塞功能的队列。
⾸先,看看BlockingQueue提供的常⽤⽅法:
可能报异常返回布尔值可能阻塞设定等待时间
⼊队add(e)offer(e)put(e)offer(e, timeout, unit)
出队remove()poll()take()poll(timeout, unit)
查看element()peek()⽆⽆
从上表可以很明显看出每个⽅法的作⽤,这个不⽤多说。我想说的是:
add(e) remove() element() ⽅法不会阻塞线程。当不满⾜约束条件时,会抛出IllegalStateException 异常。例如:当队列被元素填满后,再调⽤add(e),则会抛出异常。
offer(e) poll() peek() ⽅法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调⽤offer(e),则不会插⼊元素,函数返回fal。
要想要实现阻塞功能,需要调⽤put(e) take() ⽅法。当不满⾜约束条件时,会阻塞线程。
BlockingQueue 阻塞算法
BlockingQueue作为线程容器,可以为线程同步提供有⼒的保障。
⼆、BlockingQueue定义的常⽤⽅法
1.BlockingQueue定义的常⽤⽅法如下:
抛出异常特殊值阻塞超时
插⼊ add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可⽤不可⽤
1. ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了⼀个定长数组,以便缓存队列中的数据对象,这是⼀个常⽤的阻塞队列,除了⼀个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在⽣产者放⼊数据和消费者获取数据,都是共⽤同⼀个锁对象,由此也意味着两者⽆法真正并⾏运⾏,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采⽤分离锁,从⽽实现⽣产者和消费者操作的完全并⾏运⾏。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写⼊和获取操作已经⾜够轻巧,以⾄于引⼊独⽴的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有⼀个明显的不同之处在于,前者在插⼊或删除元素时不会产⽣或销毁任何额外的对象实例,⽽后者则会⽣成⼀个额外的Node对象。这在长时间内需要⾼效并发地处理⼤批量数据的系统中,其对于GC的影响还是存在⼀定的区别。⽽在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采⽤公平锁,默认采⽤⾮公平锁。
2. LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着⼀个数据缓冲队列(该队列由⼀个链表构成),当⽣产者往队列中放⼊⼀个数据时,队列会从⽣产者⼿中获取数据,并缓存在队列内部,⽽⽣产者⽴即返回;只有当队列缓冲区达到最⼤值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞⽣产者队列,直到消费者从队列中消费掉⼀份数据,⽣产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。⽽LinkedBlockingQueue之所以能够⾼效的处理并发数据,还因为其对于⽣产者端和消费者端分别采⽤了独⽴的锁来控制数据同步,这也意味
着在⾼并发的情况下⽣产者和消费者可以并⾏地操作队列中的数据,以此来提⾼整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造⼀个LinkedBlockingQueue对象,⽽没有指定其容量⼤⼩,LinkedBlockingQueue会默认⼀个类似⽆限⼤⼩的容量(Integer.MAX_VALUE),这样的话,如果⽣产者的速度⼀旦⼤于消费者的速度,也许还没有等到队列满阻塞产⽣,系统内存就有可能已被消耗殆尽了。
阻塞队列:线程安全
按 FIFO(先进先出)排序元素。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素插⼊到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要⾼于基于数组的队列,但是在⼤多数并发应⽤程序中,其可预知的性能要低。
注意:
1、必须要使⽤take()⽅法在获取的时候达成阻塞结果
2、使⽤poll()⽅法将产⽣⾮阻塞效果
import urrent.ExecutorService;
import urrent.Executors;
import urrent.LinkedBlockingDeque;
import urrent.LinkedBlockingQueue;
import urrent.TimeUnit;
public class BlockingDeque {
//阻塞队列,FIFO
private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = wFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
7994}妯娌怎么读
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " ⽣产: " + i);
韩语大婶怎么说//concurrentLinkedQueue.add(i);
上海软件培训
try {
concurrentLinkedQueue.put(i);
Thread.sleep(200); //模拟慢速的⽣产,产⽣阻塞的效果
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
//必须要使⽤take()⽅法在获取的时候阻塞
System.out.println(name+"消费: " + concurrentLinkedQueue.take());
//使⽤poll()⽅法将产⽣⾮阻塞效果
//System.out.println(name+"消费: " + concurrentLinkedQueue.poll());
nuts//还有⼀个超时的⽤法,队列空时,指定阻塞时间后返回,不会⼀直阻塞
//但有⼀个疑问,既然可以不阻塞,为啥还叫阻塞队列?
//System.out.println(name+" Consumer " + concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
ConcurrentLinkedQueue,⾮阻塞算法
⾮阻塞队列
基于链接节点的、⽆界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进⾏排序。队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插⼊到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问⼀个公共 collection 时,ConcurrentLinkedQueue 是⼀个恰当的选择。此队列不允许 null 元素。
例⼦
import urrent.ConcurrentLinkedQueue;
import urrent.ConcurrentLinkedQueue;
import urrent.ExecutorService;
import urrent.Executors;
import urrent.LinkedBlockingDeque;
import urrent.TimeUnit;
public class NoBlockQueue {宾语从句引导词
private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = wFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " start producer " + i);
concurrentLinkedQueue.add(i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(name+"end producer " + i);
}
}
}
static class Consumer implements Runnable {
private String name;
isbnpublic Consumer(String name) {
this.name = name;
english}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
System.out.println(name+" Consumer " + concurrentLinkedQueue.poll());
} catch (Exception e) {
/
/ TODO Auto-generated catch block
e.printStackTrace();
热忱}
// System.out.println();
// System.out.println(name+" end Consumer " + i);
}
}
}
}usual
在并发编程中,⼀般推荐使⽤阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使⽤最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放⼊队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合⽣产者-消费者模型的都可以使⽤阻塞
队列。
使⽤⾮阻塞队列,虽然能即时返回结果(消费结果),但必须⾃⾏编码解决返回为空的情况处理(以及消费重试等问题)。
另外他们都是线程安全的,不⽤考虑线程同步问题。