略
异步消息简介
远程调用机制是同步的,当客户端调用远程方法时,客户端必须等到远程方法完成后,才能继续执行,即使远程方法什么也不返回,客户端也必须要阻塞到服务完成.
而消息则是异步发送的,不需要等到服务处理完毕,客户端发送消息,然后继续执行,对于同步通信,异步通信有很多的优势.
发送消息
间接性是异步消息的一个关键,一个应用向另外一个应用发送消息时,两个应用之间没有直接的联系.相反的是,发送方会把消息发送给一个服务,然后由那个服务来将消息投递给接收方应用程序.
异步消息有两个主要的概念:消息代理(message broker)* 和目的地(destination) .当一个应用发送消息时,会将消息给消息代理,消息代理确保把消息投递给目的地,同时解放发送者.
不同的消息系统会提供不同的消息路由模式,但是有两个通用的目的地:队列(queue) 和主题(tipic) .每个类型都与特定的消息模型相关联,分别是点对点和发布/订阅模式..
点对点
每个消息都有一个发送者和一个接收者,当消息代理收到消息,他会把消息放到一个队列中,当接受者请求队列的下一条消息时,消息就会从队列中取出,并传递给接收者.然后消息传递后就会从队列中删除,这样确保了消息只能传递给一个接收者.
发布/订阅
对于一条消息,发布/订阅模式会让所有订阅该消息的接收者都收到该消息.
异步消息的优点
先说说同步:
- 同步意味着等待.
- 客户端通过服务接口与远程服务相耦合.
- 客户端与远程服务的位置耦合.
- 客户端与服务的可用性相耦合.
再说说异步:
- 无需等待.
- 面向消息和解耦.
- 位置独立
- 确保投递
使用JMS
ActiveMQ结合Spring
下载并打开ActiveMQ
导入jar包
1
2
3
4
5
6
7
8
9
10<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>创建连接工厂
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--ActiveMQ 连接工厂,默认密码和用户名为admin-->
<amq:connectionFactory
id="amqConnectionFactory"
brokerURL="tcp://localhost:61616"
userName="admin"
password="admin"/>
<!--或者(二选一)-->
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
</beans>声明ActiveMQ的目的地(使用jms模板后可以不用在xml中设置目的地)
1
2
3
4
5
6
7
8
9
10<amq:queue id="myQueue" physicalName="bos_sms_queue"/>
<amq:topic id="myTopic" physicalName="bos_sms_topic"/>
<!--或者(二选一)-->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="bos_sms_queue"/>
</bean>
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="bos_sms_topic"/>
</bean>使用jms模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29<!--Spring Caching 连接工厂-->
<!--让Spring的工厂来管理ampConnectionFactory-->
<bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!--定义JMSTemplate的Queue类型-->
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsQueueTemplate">
<constructor-arg ref="mqConnectionFactory"/>
<!--非pub/sub模式(发布订阅模式),队列模式-->
<property name="pubSubDomain" value="false"/>
<!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
<property name="defaultDestination" ref="myQueue"/>
<!--或者-->
<property name="defaultDestinationName" value="bos_sms_queue"/>
</bean>
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTopicTemplate">
<constructor-arg ref="mqConnectionFactory"/>
<!--pub/sub模式(发布订阅模式)-->
<property name="pubSubDomain" value="true"/>
<!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
<property name="defaultDestination" ref="myQueue"/>
<!--或者-->
<property name="defaultDestinationName" value="bos_sms_queue"/>
</bean>发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private JmsTemplate jmsTemplate;
//发送到消息队列
jmsTemplate.send("目的地名字", new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("telephone", customer.getTelephone());
mapMessage.setString("msg", randomCode);
return mapMessage;
}
});发送消息(二)
还可以发送消息时对消息进行转换,而不用自行实现
MessageCreator
接口.1
jmsTemplate.convertAndSend(Object message);
Spring在底层为我们提供了一个转换的方式,用来把Object类型的message转换为特定的message发送出去.
Spring为我们提供了一个默认的转换方式:SimpleMessageConverter,该转换方式可以实现String到TestMessage,字节数组到BytesMessage,Map到MapMessage,以及Serializable对象到ObjectMessage之间的转换.
Spring还为我们提供了其他的转换方式,例如:使用Jackson Json库来实现消息到Json格式间的转换
- MappingJacksonMessageConverter
- MapingJackson2MessageConverter
转换器在xml中可以进行设置,还可以使用自定义的转换方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<bean id="messageConverter" class="org.springframework.messaging.converter.MappingJackson2MessageConverter"/>
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTopicTemplate">
<constructor-arg ref="mqConnectionFactory"/>
<!--pub/sub模式(发布订阅模式)-->
<property name="pubSubDomain" value="true"/>
<!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
<property name="defaultDestination" ref="myQueue"/>
<!--或者-->
<property name="defaultDestinationName" value="bos_sms_queue"/>
<!--设置转换器-->
<property name="messageConverter" ref="messageConverter"/>
</bean>设置了转换器后,jms模板的
convertAndSend
方法就可以自动转换你传入的Object,然后发送出去了.接收消息
JMS模板有个方法可以用来接收消息——
receive()
,如果没有消息该方法会一直等待下去.如果收到了消息,就会返回一个Message对象.同发送消息,如果JMS模板设置了转换器,那么使用
receiveAndConvert()
方法可以直接把Message对象进行反向转换.创建监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">
<context:component-scan base-package="com.yuda.bos.mq"/>
<!--ActiveMQ 连接工厂-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>
<!--Spring Caching 连接工厂-->
<!--让Spring的工厂来管理ampConnectionFactory-->
<bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!--监听器-->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="mqConnectionFactory" acknowledge="auto">
<!--具体的监听器-->
<jms:listener destination="bos_sms_queue" ref="smsConsumer" method="onMessage"/>
</jms:listener-container>
</beans>
监听器用来监听消息队列,当消息队列传来消息时,就会触发监听器指定类(实现了MessageListener接口)的onMessage()方法.
1 |
|
基于消息的远程过程调用
略…