使用LinkedBlockingQueue来实现生产者消费者的例子

更新时间:2023-08-11 04:25:45 阅读: 评论:0

使⽤LinkedBlockingQueue来实现⽣产者消费者的例⼦
flavored⼯作中,经常有将⽂件中的数据导⼊数据库的表中,或者将数据库表中的记录保存到⽂件中。为了提⾼程序的处理速度,可以设置读线程和写线程,这些线程通过消息队列进⾏数据交互。本例就是使⽤了LinkedBlockingQueue来模仿⽣产者线程和消费者线程进⾏数据⽣产和消费。
为了⽅便,这些不同的类被写在了⼀个类中,实际使⽤的时候,可以单独拆开,举⼀反三地使⽤。
以下是例⼦:
LinkedBlockingQueueDemo.java
import java.util.Date;
import java.util.Random;
import urrent.LinkedBlockingQueue;
import urrent.TimeUnit;
public class LinkedBlockingQueueDemo {
// ⽣产者线程数量几点了用英语怎么说
private final static int providerThreadAmount = 5;
// 记录每⼀个⽣产者线程是否处理完毕的标记
private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];
// 整个所有的⽣产者线程全部结束的标记
private static boolean done = fal;
// ⼀个线程安全的队列,⽤于⽣产者和消费者异步地信息交互
private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();kubuntu
static class ProviderThread extends Thread {
private Thread thread;
private String threadName;
private int threadNo;
public ProviderThread(String threadName2, int threadNo) {
this.threadName = threadName2;
this.threadNo = threadNo;
}
public void start() {碎纸机英文
if (thread == null) {
thread = new Thread(this, threadName);
}
thread.start();
System.out.println(
(new Date().getTime()) + " " + threadName + " " + Thread.currentThread().getName());
}
@Override
public void run() {
int rows = 0;
for (int i = 0; i < 100; i++) {
String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());
// offer不会去阻塞线程,put会
//linkedBlockingQeque.offer(string);
linkedBlockingQeque.put(string);
rows++;
/
*
* try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
艾尼斯
* (InterruptedException e) { e.printStackTrace(); }
*/
}
// 本线程处理完毕的标记
LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;
System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"
+ Thread.currentThread().getName());
}
}
static class ConsumerThread implements Runnable {
雅虎翻译在线
private Thread thread;
private String threadName;
public ConsumerThread(String threadName2) {
this.threadName = threadName2;
}
public void start() {
if (thread == null) {
thread = new Thread(this, threadName);
}
thread.start();
System.out.println(
(new Date().getTime()) + " " + threadName + " " + Thread.currentThread().getName());
}
@Override
public void run() {
int rows = 0;
// ⽣产者线程没有结束,或者消息队列中有元素的时候,去队列中取数据
while (Done() == fal || linkedBlockingQeque.isEmpty() == fal) {
try {
//在⽢肃电信的实际应⽤中发现,当数据的处理量达到千万级的时候,带参数的poll会将主机的⼏百个G的内存耗尽,jvm会提⽰申请内存失败,并将进程退出。⽹上说,这是这个⽅法的⼀个bug。                    //String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);
String string = linkedBlockingQeque.poll();
if (string == null) {
continue;
}
rows++;
System.out
.println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "
+ string + "\t" + Thread.currentThread().getName());
/*
* try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch
* (InterruptedException e) { e.printStackTrace(); }
*/
} catch (InterruptedException e) {
e.printStackTrace();
}alu
}
System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"
+ Thread.currentThread().getName());
}
}
public static synchronized void tDone(boolean flag) {
LinkedBlockingQueueDemo.done = flag;
}
public static synchronized boolean getDone() {
return LinkedBlockingQueueDemo.done;
}
public static void main(String[] args) {
System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());
System.out.println(
(new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());
// 启动若⼲⽣产者线程
for (int i = 0; i < providerThreadAmount; i++) {
String threadName = String.format("%s-%d", "ProviderThread", i);
ProviderThread providerThread = new ProviderThread(threadName, i);
providerThread.start();
}
// 启动若⼲个消费者线程
for (int i = 0; i < 10; i++) {
String threadName = String.format("%s-%d", "ConsumerThread", i);
ConsumerThread consumerThread = new ConsumerThread(threadName);
consumerThread.start();
}
// 循环检测⽣产者线程是否处理完毕
do {
for (boolean b : providerDoneFlag) {
if (b == fal) {
/*
nickname* try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +
* " "+"sleep 3 conds. linkedBlockingQeque.size() is "+linkedBlockingQeque.
* size() + "\t" + Thread.currentThread().getName()); } catch
* (InterruptedException e) { e.printStackTrace(); }
英语语法网*/
// 只要有⼀个⽣产者线程没有结束,则整个⽣产者线程检测认为没有结束
break;
}
LinkedBlockingQueueDemo.tDone(true);
}
// ⽣产者线程全部结束的时候,跳出检测
if (Done() == true) {
break;
}
等级英文} while (true);
System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());    }
}
结果略。

本文发布于:2023-08-11 04:25:45,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/78/1129151.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   产者   结束   数据   时候   消费者
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图