【AMQ】 二:点对点模式Dome

【AMQ】 二:点对点模式Dome

AMQ通讯分为两种,一种是点对点模式,另一种是发布订阅模式,本文主要介绍点对点模式和简单实现。

什么是点对点模式?

点对点模式是AMQ的一种通过队列方式通讯的模式, 即生产者会把生产的消息放在某个队列中,消费者从队列中取得消息进行通讯的方式。

基本实现:

生产者:

package www.amp.com;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* Created by wangpengzhi1 on 2018/10/11.

*/

public class QueueProducer {

/** 默认用户名 */

public static final String USERNAME = "admin";

/** 默认密码 */

public static final String PASSWORD = "admin";

/** 默认连接地址(格式如:tcp://IP:61616) */

public static final String BROKER_URL = "tcp://192.168.198.138:61616";

/** 队列名称 */

public static final String QUEUE_NAME = "hello amq";

// 连接工厂(在AMQ中由ActiveMQConnectionFactory实现)

private ConnectionFactory connectionFactory;

// 连接对象

private Connection connection;

// 会话对象

private Session session;

// 消息目的地(对于点对点模型,是Queue对象;对于发布订阅模型,是Topic对象;它们都继承或实现了该接口)

private Destination destination;

// 消息发送(生产)者

private MessageProducer messageProducer;

public static void main(String[] args) {

QueueProducer producer = new QueueProducer();

producer.doSend();

}

public void doSend() {

try {

/**

* 1.创建连接工厂

* 构造函数有多个重载,默认连接本地MQ服务器,也可以手动设置用户名、密码、连接地址信息

* new ActiveMQConnectionFactory(userName, password, brokerURL)

*/

connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);

/**

* 2.创建连接

*/

connection = connectionFactory.createConnection();

/**

* 3.启动连接

*/

connection.start();

/**

* 4.创建会话

* param1:是否支持事务,若为true,则会忽略第二个参数,默认为SESSION_TRANSACTED

* param2:确认消息模式,若第一个参数为false时,该参数有以下几种状态

* -Session.AUTO_ACKNOWLEDGE:自动确认。客户端发送和接收消息不需要做额外的工作,即使接收端发生异常,

* 也会被当作正常发送成功

* -Session.CLIENT_ACKNOWLEDGE:客户端确认。客户端接收到消息后,必须调用message.

* acknowledge() 方法给予收到反馈,JMS服务器才会把该消息当做发送成功,并删除

* -Session.DUPS_OK_ACKNOWLEDGE:副本确认。一旦接收端应用程序的方法调用从处理消息处返回,

* 会话对象就会确认消息的接收,而且允许重复确认。

*/

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

/**

* 5.创建(发送)消息目的地,即队列,参数为队列名称

*/

destination = session.createQueue(QUEUE_NAME);

/**

* 6.创建一个消息生产者,并指定目的地

*/

messageProducer = session.createProducer(destination);

/**

* 其他操作: 设置生产者的生产模式,默认为持久化

* 参数有以下两种状态:

* -DeliveryMode.NON_PERSISTENT:消息不持久化,消息被消费之后或者超时之后将从队列中删除

* -DeliveryMode.PERSISTENT:消息会持久化,即使接收端消费消息之后仍然会保存

*/

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

/**

* 其他操作:设置消息的存活时间(单位:毫秒)

*/

messageProducer.setTimeToLive(60000);

for (int i = 0; i < 5; i++) {

/**

* 7.创建文本消息

* 此外,还有多种类型的消息如对象,字节……都可以通过session.createXXXMessage()方法创建

*/

TextMessage message = session.createTextMessage("send content:"

+ i);

/**

* 8. 发送

*/

messageProducer.send(message);

}

System.out.println("消息发送完成!");

/**

* 如果有事务操作也可以提交事务

*/

session.commit();

/**

* 9.关闭生产者对象(即使关闭了程序也在运行)

*/

messageProducer.close();

} catch (Exception e) {

e.printStackTrace();

} finally {

if (connection != null) {

try {

/**

* 10.关闭连接(将会关闭程序)

*/

connection.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

}

消费者端:

package www.amp.com;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class QueueConsumer {

private ConnectionFactory connectionFactory;

private Connection connection;

private Session session;

private Destination destination;

// 注意这里是消息接收(消费)者

private MessageConsumer messageConsumer;

/** 默认用户名 */

public static final String USERNAME = "admin";

/** 默认密码 */

public static final String PASSWORD = "admin";

/** 默认连接地址(格式如:tcp://IP:61616) */

public static final String BROKER_URL = "tcp://192.168.198.138:61616";

public static void main(String[] args) {

QueueConsumer consumer = new QueueConsumer();

consumer.doReceive();

}

public void doReceive() {

try {

connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

connection = connectionFactory.createConnection();

connection.start();

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue(QueueProducer.QUEUE_NAME);

/**

* 注意:这里要创建一个消息消费,并指定目的地(即消息源队列)

*/

messageConsumer = session.createConsumer(destination);

// 方式一:监听接收

receiveByListener();

// 方式二:阻塞接收

// receiveByManual();

/**

* 注意:这里不能再关闭对象了

*/

// messageConsumer.close();

} catch (Exception e) {

e.printStackTrace();

} finally {

/**

* 注意:这里不能再关闭Connection了

*/

// connection.close();

}

}

/**

* 通过注册监听器的方式接收消息,属于被动监听

*/

private void receiveByListener() {

try {

messageConsumer.setMessageListener(new MessageListener() {

public void onMessage(Message message) {

if (message instanceof TextMessage) {

try {

TextMessage msg = (TextMessage) message;

System.out.println("Received:“" + msg.getText()+ "”");

// 可以通过此方法反馈消息已收到

msg.acknowledge();

} catch (Exception e) {

e.printStackTrace();

}

}

}

});

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 通过手动去接收消息的方式,属于主动获取

*/

private void receiveByManual() {

while (true) {

try {

/**

* 通过receive()方法阻塞接收消息,参数为超时时间(单位:毫秒)

*/

TextMessage message = (TextMessage) messageConsumer.receive(60000);

if (message != null) {

System.out.println("Received:“" + message.getText() + "”");

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}