hanqunfeng's blog

Spring Boot2学习笔记--activemq

摘要

看完本文你将掌握如下知识点:

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

<!-- activemq自动配置依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 连接池依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

<!-- 如果springboot是2.x.x的版本如果启用连接池(spring.activemq.pool.enabled=true),就必须引入这个依赖,否则启动时会报错,提示找不到JmsMessagingTemplate
springboot是1.5.x的版本就不需要引入,
这是因为springboot1.5.x使用的是org.apache.activemq.pool.PooledConnectionFactory,
而springboot2.x.x时候用的org.messaginghub.pooled.jms.JmsPoolConnectionFactory,
可以通过源码查看:
org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration : 负责初始化ConnectionFactory
org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration : 负责初始化JmsMessagingTemplate -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.3</version>
</dependency>

配置文件

1
2
3
4
5
6
7
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#启用连接池
spring.activemq.pool.enabled=true
#最大连接数
spring.activemq.pool.max-connections=100

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
@EnableJms //启用jms功能
public class ActiveMqConfig {

//如果要使用topic类型的消息,则需要配置该bean
@Bean("jmsTopicListenerContainerFactory")
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(
ConnectionFactory connectionFactory
){
DefaultJmsListenerContainerFactory factory
= new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true); //这里必须设置为true,false则表示是queue类型
return factory;
}


@Bean("springboot.queue")
public Queue queue() {
return new ActiveMQQueue("springboot.queue") ;
}

@Bean("springboot.topic")
public Topic topic() {
return new ActiveMQTopic("springboot.topic") ;
}

@Bean("springboot.queuereply")
public Queue queuereply() {
return new ActiveMQQueue("springboot.queuereply") ;
}


}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class Consumer {
//监听队列,queue类型
@JmsListener(destination="springboot.queue.a")
public void receiveQueueA(String text){
System.out.println(this.getClass().getName()+ "收到的报文为:"+text);
}

@JmsListener(destination="springboot.*.b")
public void receiveQueueB(String text){
System.out.println(this.getClass().getName()+ "收到的报文为:"+text);
}

@JmsListener(destination="springboot.>")
public void receiveQueueAll(String text){
System.out.println(this.getClass().getName()+ "收到的报文为:"+text);
}

//监听队列,topic类型,这里containerFactory要配置为jmsTopicListenerContainerFactory
@JmsListener(destination = "springboot.topic",
containerFactory = "jmsTopicListenerContainerFactory"
)
public void receiveTopic(String text) {
System.out.println(this.getClass().getName()+" 收到的报文为:"+text);
}


@JmsListener(destination="springboot.queuereply")
@SendTo("out.replyTo.queue") //消费者应答后通知生产者
public String receiveQueueReply(String text){
System.out.println(this.getClass().getName()+ "收到的报文为:"+text);
return "out.replyTo.queue receiveQueueReply";
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Component
public class Producer {

@Resource("springboot.queue.a")
private Queue queuea;

@Resource("springboot.queue.b")
private Queue queueb;

@Resource("springboot.topic")
private Topic topic;

@Resource("springboot.queuereply")
private Queue queuereply;

@Autowired
private JmsMessagingTemplate jmsTemplate;

// 发送消息,destination是发送到的队列,message是待发送的消息
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}

public void sendQueueAMessage(final String message){
sendMessage(queuea, message);
}

public void sendQueueBMessage(final String message){
sendMessage(queueb, message);
}

public void sendTopicMessage(final String message){
sendMessage(topic, message);
}

public void sendQueueMessageReply(final String message){
sendMessage(queuereply, message);
}

//生产者监听消费者的应答
@JmsListener(destination = "out.replyTo.queue")
public void consumerMessage(String text){
System.out.println("从out.replyTo.queue收到报文"+text);
}
}

热评文章