ActiveMQ 入门实战(2)--Java 操作 ActiveMQ
本文主要介绍使用 JMS 1.1 API 来操作 ActiveMQ,文中所使用到的软件版本:Java 1.8.0_191、ActiveMQ "Classic" 5.16.2、ActiveMQ Artemis 2.17.0。
1、Java 操作ActiveMQ "Classic"
使用 JMS 1.1 的 API操作 ActiveMQ "Classic"。
1.1、引入依赖
<dependency>
<groupId>org.apache.activemqgroupId>
<artifactId>activemq-allartifactId>
<version>5.16.2version>dependency>
1.2、发送消息
1.2.1、发送到 Queue
public static void sendToQueue() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //连接池
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
Connection connection = pooledConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageProducer producer = session.createProducer(destination); //消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
producer.close();
session.close();
connection.close();
pooledConnectionFactory.stop();
}
1.2.2、发送到 Queue(事务)
public static void sendToQueueTransaction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = null; try {
Destination destination = session.createQueue("testQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("事务消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
session.commit();
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
1.2.3、发送到 Topic
public static void sendToTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
producer.close();
session.close();
connection.close();
}
1.2.4、发送到 Topic(事务)
public static void sendToTopicTraction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); try { for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("事务消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
session.commit();
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.jms.pool.PooledConnectionFactory;import javax.jms.*;public class Producer { private static String brokerURL = "tcp://10.40.96.140:61616"; public static void main(String[] args) throws JMSException {
sendToQueue();// sendToQueueTransaction();// sendToTopic();// sendToTopicTraction(); } public static void sendToQueue() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //连接池
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
Connection connection = pooledConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageProducer producer = session.createProducer(destination); //消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
producer.close();
session.close();
connection.close();
pooledConnectionFactory.stop();
} /**
* 以事务方式发送消息
* @throws JMSException */
public static void sendToQueueTransaction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = null; try {
Destination destination = session.createQueue("testQueue");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("事务消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
session.commit();
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
} public static void sendToTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
producer.close();
session.close();
connection.close();
} public static void sendToTopicTraction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); try { for (int i = 1; i <= 10; i++) {
TextMessage message = session.createTextMessage("事务消息" + i);
producer.send(message);
System.out.println("已发送的消息:" + message.getText());
}
session.commit();
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
Producer.java
1.3、消费者
1.3.1、从 Queue 中消费消息
public static void recevieFromQueue() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //连接池
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
Connection connection = pooledConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
1.3.2、从 Queue 中消费消息(事务)
public static void recevieFromQueueTransction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageConsumer consumer = session.createConsumer(destination);
AtomicInteger index = new AtomicInteger(); try {
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
}
}
1.3.3、从 Topic 中消费消息
public static void recevieFromTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
1.3.4、从 Topic 中消费消息(持久化订阅+事务)
对于 Topic,使用MessageConsumer 消费消息,只能消费订阅时间之后的消息;JMS 允许订阅者创建一个可持久化的订阅(TopicSubscriber),这样,即使订阅者宕机恢复后,也能接收宕机时生产者发布的消息。
public static void recevieFromTopicDurable() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("12345678");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testTopic");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");
AtomicInteger index = new AtomicInteger();
topicSubscriber.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
}
完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.jms.pool.PooledConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;public class Consumer { private static String brokerURL = "tcp://10.40.96.140:61616"; public static void main(String[] args) throws JMSException {
recevieFromQueue();// recevieFromQueueTransction();// recevieFromTopic();// recevieFromTopicDurable(); } public static void recevieFromQueue() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //连接池
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
Connection connection = pooledConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
} public static void recevieFromQueueTransction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("testQueue");
MessageConsumer consumer = session.createConsumer(destination);
AtomicInteger index = new AtomicInteger(); try {
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
session.rollback();
e.printStackTrace();
}
} public static void recevieFromTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("testTopic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
} public static void recevieFromTopicDurable() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("12345678");
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testTopic");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");
AtomicInteger index = new AtomicInteger();
topicSubscriber.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message; try {
System.out.println("接受到的消息:" + textMessage.getText());
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
Consumer.java
2、Java 操作ActiveMQ Artemis
使用 JMS 2.0 的 API操作 ActiveMQ Artemis。
2.1、引入依赖
<dependency>
<groupId>org.apache.activemqgroupId>
<artifactId>artemis-jms-client-allartifactId>
<version>2.17.0version>dependency>
2.2、发送消息
2.2.1、发送到 Queue
public static void sendToQueue() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
Destination destination = context.createQueue("testQueue"); //消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //延迟投递
producer.setDeliveryDelay(1000 * 5); //异步发送
producer.setAsync(new CompletionListener() {
@Override public void onCompletion(Message message) {
System.out.println("消息发送完成");
}
@Override public void onException(Message message, Exception exception) {
exception.printStackTrace();
}
}); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.close();
}
2.2.2、发送到 Queue(事务)
public static void sendToQueueTransaction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try {
Destination destination = context.createQueue("testQueue");
JMSProducer producer = context.createProducer();
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = context.createTextMessage("事务消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.commit();
} catch (JMSException e) {
context.rollback();
e.printStackTrace();
} finally {
context.close();
}
}
2.2.3、发送到 Topic
public static void sendToTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
Destination destination = context.createTopic("testTopic");
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) {
TextMessage message = context.createTextMessage("消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.close();
}
2.2.4、发送到 Topic(事务)
public static void sendToTopicTraction() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try {
JMSProducer producer = context.createProducer();
Destination destination = context.createTopic("testTopic");
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("事务消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.commit();
} catch (JMSException e) {
context.rollback();
e.printStackTrace();
} finally {
context.close();
}
}
完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;import javax.jms.*;public class ProducerJms20 { private static String brokerURL = "tcp://10.40.96.11:61616"; public static void main(String[] args) throws Exception {
sendToQueue();// sendToQueueTransaction();// sendToTopic();// sendToTopicTraction(); } public static void sendToQueue() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
Destination destination = context.createQueue("testQueue"); //消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //延迟投递
producer.setDeliveryDelay(1000 * 5); //异步发送
producer.setAsync(new CompletionListener() {
@Override public void onCompletion(Message message) {
System.out.println("消息发送完成");
}
@Override public void onException(Message message, Exception exception) {
exception.printStackTrace();
}
}); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.close();
} /**
* 以事务方式发送消息
* @throws JMSException */
public static void sendToQueueTransaction() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try {
Destination destination = context.createQueue("testQueue");
JMSProducer producer = context.createProducer();
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("事务消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.commit();
} catch (JMSException e) {
context.rollback();
e.printStackTrace();
} finally {
context.close();
}
} public static void sendToTopic() throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
Destination destination = context.createTopic("testTopic");
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.close();
} public static void sendToTopicTraction() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try {
JMSProducer producer = context.createProducer();
Destination destination = context.createTopic("testTopic");
producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) {
TextMessage message = context.createTextMessage("事务消息" + i);
producer.send(destination, message);
System.out.println("已发送的消息:" + message.getText());
}
context.commit();
} catch (JMSException e) {
context.rollback();
e.printStackTrace();
} finally {
context.close();
}
}
}
View Code
2.3、消费者
2.3.1、从 Queue 中消费消息
public static void recevieFromQueue() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Destination destination = context.createQueue("testQueue");
JMSConsumer consumer = context.createConsumer(destination);
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
}); //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程 System.in.read();
}
2.3.2、从 Queue 中消费消息(事务)
public static void recevieFromQueueTransction() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
Destination destination = context.createQueue("testQueue");
JMSConsumer consumer = context.createConsumer(destination);
AtomicInteger index = new AtomicInteger(); try {
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
context.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
context.rollback();
e.printStackTrace();
}
System.in.read();
}
2.3.3、从 Topic 中消费消息
public static void recevieFromTopic() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createConsumer(topic);
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
});
System.in.read();
}
2.3.4、从 Topic 中消费消息(持久化订阅+事务)
public static void recevieFromTopicDurable() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
context.setClientID("12345678");
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createDurableConsumer(topic, "test");
AtomicInteger index = new AtomicInteger();
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
index.getAndIncrement(); //每5条提交一次
if (index.get() % 5 == 0) {
context.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
System.in.read();
}
2.3.5、从 Topic 中消费消息(共享订阅)
public static void recevieFromTopicShare() throws Exception { //模拟三个消费者
for (int i = 0; i < 3; i++) { new Thread(() -> {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
});
}).start();
}
System.in.read();
}
2.3.6、从 Topic 中消费消息(共享持久订阅+事务)
public static void recevieFromTopicShareDurable() throws Exception { //模拟三个消费者
for (int i = 0; i < 3; i++) { new Thread(() -> {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println(Thread.currentThread() + "-接受到的消息:" + msg); //处理完一条就提交 context.commit();
} catch (JMSException e) {
e.printStackTrace();
}
});
}).start();
}
System.in.read();
}
完整代码:

package com.abc.demo.general.activemq;import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;import javax.jms.*;import java.util.concurrent.atomic.AtomicInteger;public class ConsumerJms20 { private static String brokerURL = "tcp://10.40.96.11:61616"; public static void main(String[] args) throws Exception {
recevieFromQueue();// recevieFromQueueTransction();// recevieFromTopic();// recevieFromTopicDurable();// recevieFromTopicShare();// recevieFromTopicShareDurable(); } public static void recevieFromQueue() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Destination destination = context.createQueue("testQueue");
JMSConsumer consumer = context.createConsumer(destination);
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
}); //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程 System.in.read();
} public static void recevieFromQueueTransction() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
Destination destination = context.createQueue("testQueue");
JMSConsumer consumer = context.createConsumer(destination);
AtomicInteger index = new AtomicInteger(); try {
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
index.getAndIncrement(); //每10条提交一次
if (index.get() % 10 == 0) {
context.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
context.rollback();
e.printStackTrace();
}
System.in.read();
} public static void recevieFromTopic() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createConsumer(topic);
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
});
System.in.read();
} /**
* 持久订阅+事务
* @throws Exception */
public static void recevieFromTopicDurable() throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
context.setClientID("12345678");
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createDurableConsumer(topic, "test");
AtomicInteger index = new AtomicInteger();
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println("接受到的消息:" + msg);
index.getAndIncrement(); //每5条提交一次
if (index.get() % 5 == 0) {
context.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
System.in.read();
} /**
* 共享订阅
* @throws Exception */
public static void recevieFromTopicShare() throws Exception { //模拟三个消费者
for (int i = 0; i < 3; i++) { new Thread(() -> {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
});
}).start();
}
System.in.read();
} /**
* 共享持久订阅+事务
* @throws Exception */
public static void recevieFromTopicShareDurable() throws Exception { //模拟三个消费者
for (int i = 0; i < 3; i++) { new Thread(() -> {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
Topic topic = context.createTopic("testTopic");
JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
consumer.setMessageListener(message -> { try {
String msg = message.getBody(String.class);
System.out.println(Thread.currentThread() + "-接受到的消息:" + msg); //处理完一条就提交 context.commit();
} catch (JMSException e) {
e.printStackTrace();
}
});
}).start();
}
System.in.read();
}
}
赞 (0)
