java线程安全之并发Queue(⼗三)
并发Queue
在并发的队列上jdk提供了两套实现,⼀个是以ConcurrentLinkedQueue为代表的⾼性能队列,⼀个是以BlockingQueue接⼝为代表的阻塞队列,⽆论在那种都继承⾃Queue。
如图继承Queue共有⼆⼗四个:
ConcurrentLinkedQueue
概念理解
ConcurrentLinkedQueue:是⼀个适⽤于⾼并发场景下的队列,通过⽆锁的⽅式,实现了⾼并发状态下的⾼性能,通常ConcurrentLinkedQueue性能好于BlockingQueueo它是⼀个基于链接节点的⽆界线程安全队列。该队列的元素遵循先讲先出的原则。头是最先加⼊的,尾是最近加⼊的,该队列不允许null元素。
ConcurrentLinkedQueue重要⽅法:
Add()和offer()都是加⼊元素的⽅法(在ConcurrentLinkedQueue中,这两个⽅法投有任何区别)
Poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
案例
public class UQueue_ConcurrentLinkedQueue {
public static void main(String[] args) throws Exception {
//⾼性能⽆阻塞⽆界队列:ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
q.offer("a");
q.offer("b");
givenq.offer("c");
q.offer("d");
q.add("e");
System.out.println("从头部取出元素,并从队列⾥删除 >> "+q.poll()); //a 从头部取出元素,并从队列⾥删除
System.out.println("删除后的长度 >> "+q.size()); //4
System.out.println("取出头部元素 >> "+q.peek()); //b
System.out.println("长度 >> "+q.size()); //4
}
}
打印结果:
从头部取出元素,并从队列⾥删除 >> a
删除后的长度 >> 4
取出头部元素 >> b
长度 >> 4
BlockingQueue接⼝
welcome to my style
ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了⼀个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着⽣产和消费不能完全并⾏,长度是需要定义的,可以指定先讲先出或者先讲后出,也叫有界队列,在很多场合⾮常适合使⽤。讲话紧张
LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着⼀个数据缓冲队列〈该队列由⼀个链表构成),LinkedBlockingQueue之所以能够⾼效的处理并发数据,是因为其内部实现采⽤分离锁(读写分离两个锁),从⽽实现⽣产者和消费者操作的完全并⾏运⾏,他是⼀个⽆界队列。
SynchronousQueue:⼀种没有缓冲的队列,⽣产者产⽣的数据直接会被消费者获取并消费。
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传⼊的Compator对象来决定,也就是说传⼊队列的对象必须实现Comparable接⼝),在实现PriorityBlockingQueue时,内部控制线程同步的锁采⽤的是公平锁,他也是⼀个⽆界的队列。
DelayQueue:带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接⼝,DelayQueue是⼀个没有⼤⼩限制的队列,应⽤场景很多,⽐如对缓存超时的数据进⾏移除、任务超时处理、空闲连接的关闭等等。
ArrayBlockingQueue、LinkedBlockingQueue、synchronousQueue案例
public class UQueue_ConcurrentLinkedQueue {
public static void main(String[] args) throws Exception {
acknowledgeSystem.out.println("--------------- ArrayBlockingQueue --------------");
//阻塞队列有长度的队列
refer
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
//array.add("f");
/
/array.add("f");
//返回⼀个布尔类型在3秒之内能不能加⼊不能返回fal
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
System.out.println("所有数据 >> " + String());
System.out.println("--------------- LinkedBlockingQueue --------------");
//阻塞队列⽆长度限制队列
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println("总长度 >> "+q.size());
for (Iterator iterator = q.iterator(); iterator.hasNext(); ) {
String string = (String) ();
System.out.print(string+" -- ");
}
System.out.println();
List<String> list = new ArrayList<String>();
//在 q 的队列中取三个元素放到list 队列⾥
System.out.println(q.drainTo(list, 3));
System.out.println("取出LinkedBlockingQueue数据放到list列表的长度为 >> "+list.size());
for (String string : list) {
System.out.print(string + " -- ");
}
System.out.println();
System.out.println("--------------- SynchronousQueue --------------");
productionmanagerfinal SynchronousQueue<String> q1 = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"取数据 "+ q1.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
q1.add("b");
System.out.println(Thread.currentThread().getName() +"加⼊数据 b");
}
});
t2.start();
}
}
水平面打印结果
---------------ArrayBlockingQueue--------------
fal
所有数据 >> [a,b,c,d,e]
---------------LinkedBlockingQueue--------------
总长度 >> 6
rectangle是什么意思a--b--c--d--e--f--
3
取出LinkedBlockingQueue数据放到list列表的长度为 >> 3
a--b--c--
---------------SynchronousQueue--------------
Thread-1加⼊数据b
Thread-0取数据b
PriorityBlockingQueue 案例
Task.java
public class Task implements Comparable<Task>{
private int id ;
private String name;
public int getId() {
return id;
}
public void tId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void tName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
public String toString(){
return this.id + "," + this.name;
}
}
UPriorityBlockingQueue.java
public class UPriorityBlockingQueue {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue<Task> q2 = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.tId(3);
t1.tName("id为3");
Task t2 = new Task();
t2.tId(4);
blue vividt2.tName("id为4");
Task t3 = new Task();
t3.tId(1);
t3.tName("id为1");
Task t4 = new Task();
t4.tId(2);
t4.tName("id为2");
//return this.id > task.id ? 1 : 0;
q2.add(t1); //3
q2.add(t2); //4
q2.add(t3); //1
q2.add(t4);
// 1 3 4
/
/第⼀次取值时候是取最⼩的后⾯不做排序
System.out.println("容器:" + q2); //[1,id为1, 2,id为2, 3,id为3, 4,id为4] //拿出⼀个元素后⼜会取⼀个最⼩的出来放在第⼀个
System.out.println(q2.take().getId());
System.out.println("容器:" + q2); //[2,id为2, 4,id为4, 3,id为3]
System.out.println(q2.take().getId());
System.out.println("容器:" + q2); //[3,id为3, 4,id为4]
}
}
打印结果
容器:[1,id为1, 2,id为2, 3,id为3, 4,id为4]
1
容器:[2,id为2, 4,id为4, 3,id为3]
2
references
容器:[3,id为3, 4,id为4]
DelayQueue 案例
Wangmin.java