消息中间件

异步消息简介

远程调用机制是同步的,当客户端调用远程方法时,客户端必须等到远程方法完成后,才能继续执行,即使远程方法什么也不返回,客户端也必须要阻塞到服务完成.

而消息则是异步发送的,不需要等到服务处理完毕,客户端发送消息,然后继续执行,对于同步通信,异步通信有很多的优势.

发送消息

间接性是异步消息的一个关键,一个应用向另外一个应用发送消息时,两个应用之间没有直接的联系.相反的是,发送方会把消息发送给一个服务,然后由那个服务来将消息投递给接收方应用程序.

异步消息有两个主要的概念:消息代理(message broker)* 和目的地(destination) .当一个应用发送消息时,会将消息给消息代理,消息代理确保把消息投递给目的地,同时解放发送者.

不同的消息系统会提供不同的消息路由模式,但是有两个通用的目的地:队列(queue)主题(tipic) .每个类型都与特定的消息模型相关联,分别是点对点和发布/订阅模式..

点对点

每个消息都有一个发送者和一个接收者,当消息代理收到消息,他会把消息放到一个队列中,当接受者请求队列的下一条消息时,消息就会从队列中取出,并传递给接收者.然后消息传递后就会从队列中删除,这样确保了消息只能传递给一个接收者.

发布/订阅

对于一条消息,发布/订阅模式会让所有订阅该消息的接收者都收到该消息.

异步消息的优点

先说说同步:

  1. 同步意味着等待.
  2. 客户端通过服务接口与远程服务相耦合.
  3. 客户端与远程服务的位置耦合.
  4. 客户端与服务的可用性相耦合.

再说说异步:

  1. 无需等待.
  2. 面向消息和解耦.
  3. 位置独立
  4. 确保投递

使用JMS

ActiveMQ结合Spring

  1. 下载并打开ActiveMQ

    ActiveMQ官网

  2. 导入jar包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>${activemq.version}</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
    </dependency>
  3. 创建连接工厂

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd">
    <!--ActiveMQ 连接工厂,默认密码和用户名为admin-->
    <amq:connectionFactory
    id="amqConnectionFactory"
    brokerURL="tcp://localhost:61616"
    userName="admin"
    password="admin"/>
    <!--或者(二选一)-->
    <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
    </bean>
    </beans>
  4. 声明ActiveMQ的目的地(使用jms模板后可以不用在xml中设置目的地)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <amq:queue id="myQueue" physicalName="bos_sms_queue"/>
    <amq:topic id="myTopic" physicalName="bos_sms_topic"/>
    <!--或者(二选一)-->
    <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="bos_sms_queue"/>
    </bean>

    <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="bos_sms_topic"/>
    </bean>
  5. 使用jms模板

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    <!--Spring Caching 连接工厂-->
    <!--让Spring的工厂来管理ampConnectionFactory-->
    <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
    <property name="sessionCacheSize" value="100"/>
    </bean>
    <!--定义JMSTemplate的Queue类型-->
    <bean class="org.springframework.jms.core.JmsTemplate" id="jmsQueueTemplate">
    <constructor-arg ref="mqConnectionFactory"/>
    <!--非pub/sub模式(发布订阅模式),队列模式-->
    <property name="pubSubDomain" value="false"/>

    <!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
    <property name="defaultDestination" ref="myQueue"/>
    <!--或者-->
    <property name="defaultDestinationName" value="bos_sms_queue"/>

    </bean>

    <bean class="org.springframework.jms.core.JmsTemplate" id="jmsTopicTemplate">
    <constructor-arg ref="mqConnectionFactory"/>
    <!--pub/sub模式(发布订阅模式)-->
    <property name="pubSubDomain" value="true"/>

    <!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
    <property name="defaultDestination" ref="myQueue"/>
    <!--或者-->
    <property name="defaultDestinationName" value="bos_sms_queue"/>
    </bean>
  6. 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Qualifier("jmsQueueTemplate")
    @Autowired
    private JmsTemplate jmsTemplate;

    //发送到消息队列
    jmsTemplate.send("目的地名字", new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    MapMessage mapMessage = session.createMapMessage();
    mapMessage.setString("telephone", customer.getTelephone());
    mapMessage.setString("msg", randomCode);
    return mapMessage;
    }
    });
  7. 发送消息(二)

    还可以发送消息时对消息进行转换,而不用自行实现MessageCreator 接口.

    1
    jmsTemplate.convertAndSend(Object message);

    Spring在底层为我们提供了一个转换的方式,用来把Object类型的message转换为特定的message发送出去.

    Spring为我们提供了一个默认的转换方式:SimpleMessageConverter,该转换方式可以实现String到TestMessage,字节数组到BytesMessage,Map到MapMessage,以及Serializable对象到ObjectMessage之间的转换.

    Spring还为我们提供了其他的转换方式,例如:使用Jackson Json库来实现消息到Json格式间的转换

    • MappingJacksonMessageConverter
    • MapingJackson2MessageConverter

    转换器在xml中可以进行设置,还可以使用自定义的转换方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <bean id="messageConverter" class="org.springframework.messaging.converter.MappingJackson2MessageConverter"/>

    <bean class="org.springframework.jms.core.JmsTemplate" id="jmsTopicTemplate">
    <constructor-arg ref="mqConnectionFactory"/>
    <!--pub/sub模式(发布订阅模式)-->
    <property name="pubSubDomain" value="true"/>

    <!--可以设置默认的目的地,这样send()方法就可以不用指定目的地了-->
    <property name="defaultDestination" ref="myQueue"/>
    <!--或者-->
    <property name="defaultDestinationName" value="bos_sms_queue"/>

    <!--设置转换器-->
    <property name="messageConverter" ref="messageConverter"/>
    </bean>

    设置了转换器后,jms模板的convertAndSend 方法就可以自动转换你传入的Object,然后发送出去了.

  8. 接收消息

    JMS模板有个方法可以用来接收消息——receive() ,如果没有消息该方法会一直等待下去.如果收到了消息,就会返回一个Message对象.

    同发送消息,如果JMS模板设置了转换器,那么使用receiveAndConvert() 方法可以直接把Message对象进行反向转换.

  9. 创建监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms.xsd">

    <context:component-scan base-package="com.yuda.bos.mq"/>
    <!--ActiveMQ 连接工厂-->
    <amq:connectionFactory id="amqConnectionFactory"
    brokerURL="tcp://localhost:61616" userName="admin" password="admin"/>

    <!--Spring Caching 连接工厂-->
    <!--让Spring的工厂来管理ampConnectionFactory-->
    <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
    <property name="sessionCacheSize" value="100"/>
    </bean>

    <!--监听器-->
    <jms:listener-container destination-type="queue" container-type="default"
    connection-factory="mqConnectionFactory" acknowledge="auto">
    <!--具体的监听器-->
    <jms:listener destination="bos_sms_queue" ref="smsConsumer" method="onMessage"/>
    </jms:listener-container>
    </beans>

监听器用来监听消息队列,当消息队列传来消息时,就会触发监听器指定类(实现了MessageListener接口)的onMessage()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class SmsConsumer implements MessageListener{
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
String telephone = mapMessage.getString("telephone");
String msg = mapMessage.getString("msg");
System.out.println(telephone + "/" + msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}

基于消息的远程过程调用

略…