同事在项目中使用了JMS,产生了兴趣,于是便进行相关知识的学习,google下JMS,JMS全称是Java Message Service,而Apache ActiveMQ是实现JMS1.1的open source,因此主要就Apache ActiveMesageQueue进行研究。
下面通过讲解一个简单的Demo进行讲解
(1)首先从apache上下载了Apache ActiveMQ的源码包:apache-activemq-5.2.
(2)在ubuntu下解压缩缩:tar -xzvf apache-activemq-5.2.
(3)进入到解压缩的文件夹下:
$cd bin
$./activemq 启动ActiveMQ,可以在浏览器中查看:localhost:8161/admin通过ActiveMQ的Web Console查看,验证已启动。
(4)准备jar包:
jms-1.1.jar,activemq-core-5.4.2.jar两个核心包,其他必备的包有commons-logging等。
(5) 首先介绍一下ActiveMQ的原理:
代码的结构是HelloWorld.java中包含三个class,其中HelloWorld类声明为public,另两个:Producer和Consumer不能声明为public,使用默认类型。在主类中HelloWorld中使用thread函数跑两个线程,一个是Producer,另一个是Consumer,这个Demo主要演示消息队列Queue的原理:启动的ActiveMQ监控61616端口,Producer将消息发送到ActiveMQ在61616中监控的消息队列(指定队列,在程序中我们为其取名“villasy-queue”),Consumer则监控该端口的指定队列(“villasy-queue”),这个队列是约定好的,然后Producer向该队列发送消息,而Consumer则从该队列取消息,在Consumer中通过while(true)的死循环去监控,不断从队列中取消息,如果消息不为null,那么将消息信息打出来,而如果消息为空,说明队列中已经没有消息了,所以退出死循环。
(6)下面编写测试的代码,可以直接粘贴去跑,当然前提是必备的包都齐了:
Java代码
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloWorld {
public static void main(String [] args) throws InterruptedException{
thread(new Producer(),fal);
Thread.sleep(2000);
thread(new Consumer(),fal);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.tDaemon(daemon);
brokerThread.start();
}
}
class Producer implements Runnable{
private static final int SEND_NUMBER = 5;
反字笔顺@Override
public void run() {
ConnectionFactory connFactory;
Connection conn = null;
Session ssion;
Destination destination;
MessageProducer msgProducer;
外墙保温层
//构造ConnectionFactory实例对象
connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try{
//从构造工厂获得连接对象
conn = ateConnection();
//启动连接
conn.start();
//获得连接ssion
ssion = ateSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//获得连接目的地
destination = ateQueue("villasy-queue");
/
/消息生产者
msgProducer = ateProducer(destination);
//设置不持久化
msgProducer.tDeliveryMode(DeliveryMode.NON_PERSISTENT);
//构造并发送消息
节能降耗从我做起
ndMsg(ssion,msgProducer);
}catch(Exception ex){
ex.printStackTrace();
}finally{
if(null != conn)
try {
conn.clo();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public static void ndMsg(Session ssion,MessageProducer msgProducer) throws JMSException{
for(int i=1;i<SEND_NUMBER;i++){
TextMessage txtMsg = ateTextMessage("发送消息:"+i);
System.out.Text());
msgProducer.nd(txtMsg);
}
}
}
class Consumer implements Runnable,javax.jms.ExceptionListener{
@Override
public void run() {
// ConnectionFactory :连接工厂,JMS 用它创建连接
打南边来了个ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
/
/ Session: 一个发送或接收消息的线程
Session ssion;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
许幽兰
try {
/
/ 构造从工厂得到连接对象
connection = ateConnection();
// 启动
connection.start();
// 获取操作连接
ssion = ateSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取ssion0的queue
destination = ateQueue("villasy-queue");
consumer = ateConsumer(destination);
while (true) {
TextMessage message = (TextMessage) ive(1000);
if (null != message) {
System.out.println("收到消息" + Text());
} el {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.clo();
} catch (Throwable ignore) {
}
}
}
@Override
public synchronized void onException(JMSException arg0) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloWorld {
public static void main(String [] args) throws InterruptedException{
thread(new Producer(),fal);
Thread.sleep(2000);
thread(new Consumer(),fal);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.tDaemon(daemon);
brokerThread.start();
}
}
class Producer implements Runnable{
private static final int SEND_NUMBER = 5;
@Override
public void run() {
ConnectionFactory connFactory;
Connection conn = null;
Session ssion;
Destination destination;
MessageProducer msgProducer;
//构造ConnectionFactory实例对象
connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
try{
//从构造工厂获得连接对象
conn = ateConnection();
//启动连接
conn.start();
//获得连接ssion
ssion = ateSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//获得连接目的地
destination = ateQueue("villasy-queue");
//消息生产者
msgProducer = ateProducer(destination);
//设置不持久化
msgProducer.tDeliveryMode(DeliveryMode.NON_PERSISTENT);
//构造并发送消息
少年先锋队ndMsg(ssion,msgProducer);
蝉鸣黄叶汉宫秋it();
}catch(Exception ex){
ex.printStackTrace();
}finally{
if(null != conn)
try {
conn.clo();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public static void ndMsg(Session ssion,MessageProducer msgProducer) throws JMSException{
for(int i=1;i<SEND_NUMBER;i++){
TextMessage txtMsg = ateTextMessage("发送消息:"+i);
System.out.Text());
msgProducer.nd(txtMsg);
}
}
}
class Consumer implements Runnable,javax.jms.ExceptionListener{
@Override
public void run() {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session ssion;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = ateConnection();
// 启动
connection.start();
// 获取操作连接
ssion = ateSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取ssion0的queue
destination = ateQueue("villasy-queue");
consumer = ateConsumer(destination);
while (true) {
TextMessage message = (TextMessage) ive(1000);
if (null != message) {
土质分类五类土System.out.println("收到消息" + Text());
} el {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.clo();
} catch (Throwable ignore) {
}
}
}
@Override
public synchronized void onException(JMSException arg0) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
以上为代码,通过注释读者应该能够理解基本的流程,需要说明的是,在创建连接的时候我们是通过这句:
Java代码
connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
实现的,通过tcp协议和消息队列服务器进行交互,我们也可以使用一些其他的方式实现我们的消息队列:
Java代码
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
这种消息
队列是通过在本地文件类型的,同样可以实现通讯。
在项目中对该项技术的主要的使用之处在于提高处理速度和提高用户体验,我们可以通过消息队列的方式实现异步的方式,比如我们要处理10个订单的核对,我们可以将这10个订单发送到某服务器的消息队列上,而订单核对程序在从该服务器上的消息队列中取,只要取到就进行处理,处理完之后返回处理完毕消息,异步之于阻塞的优势是我们不必一个一个地去等待订单的处理完毕,而将所有的订单都一起发过去,如果处理机够多的话,只要有消息到达,处理机就可以处理,实现了并发处理,最终所有处理完毕时将结果告诉需要订单核对结果的程序,这样就减少了处理的时间提高了用户体验。
对于ActiveMQ的学习还是比较初步,比较简单,继续深入学习。