avatar

SpringBoot消息队列

概述

  • 大多数应用中,可以通过消息服务中间件来提高系统异步通信、扩展解耦能力

  • 消息服务中两个重要的概念:消息代理(message broker)和目的地(destination)

    • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
  • 消息队列主要由两种形式的目的地

    • 队列(queue):点对点消息通信(point-to-point)
    • 主题(topic):发布(publish)/订阅(subscribe)消息通信
  • 应用场景:

    • 异步的处理

    • 应用解耦
      在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    • 流量削峰
      用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面秒杀业务根据消息队列中的请求信息,再做后续处理

  • 点对点式:

    • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列
    • 消息只有唯一的发送者和接收者,但并不是说只能有一个接收者,有一个抢到了消息,消息就会删除
  • 发布订阅式:

    • 发布者发送消息到主题,多个接收者监听订阅主题,那么就会在消息到达同时受到消息
  • JMS(Java Message Service)java消息服务

    • 基于JVM消息代理规范.ActiveMQ,HornetMQ时JMS实现的
  • AMQP(Advanced Message Queuing Protocol)

    • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
    • RabbitMQ是AMQP的实现
  • JMS与AMQP的区别

JMS AMQP
定义 java api 网络栈级协议
跨语言
跨语言
model 提供两种消息模型:
(1)、Peer-2-Peer
(2)、Pub/sub
提供五种消息模型:
(1)、direct exchange
(2)、fanout exchange
(3)、potic change
(4)、headers exchange
(5)、system exchange
本质来讲,后面四种和JMS的pub/sub模型没有太大查别,仅仅是在路由机制做了梗详细的划分
支持消息类型 多种消息类型:
TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message(只有消息头和属性)
bytes[] 当实际应用时,有复杂的消息,可以讲消息序列化后发送。
综合评价 JMS定义了java api层面的标准,在java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持比较差 AMQP定义了wrie-level层的协议标准;天然具有跨平台、跨语言特性
  • Spring支持
    • spring-jms提供对JMS的支持
    • spring-rabbit提供了对AMQP的支持
    • 需要ConnectionFactory的实现来连接消息代理
    • 提供JmsTemplate、RabbitTemplate来发送消息
    • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
    • @Enablejms、@EnableRabbit开启支持
  • SpringBoot自动配置
    • jmsAutoConfiguration
    • RabbitAutoConfiguration

RabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP的开源实现。

核心概念

  • Message:消息,消息是不具名的。它由消息头和消息体组成。消息体是不透明的,而消息头是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等

  • Publisher:消息的生产者,也是一个由交换器发布消息的客户端应用程序

  • Exchange:交换器,用于接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认)、fanout,topic和headers不同类型的Exchange转发消息的策略有所区别。

  • Queue:消息队列,用于保存消息知道发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或者多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Binding:绑定,用于消息队列和交换机的一种关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

    • Exchange和Queue的绑定可以是多对多的关系
  • Connection:网络连接,例如一个TCP连接

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成,因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

  • Consumer:消费得消费者,表示一个从消息队列中取得消息的客户端应用程序

  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。

  • Broker:表示消息队列服务器实体

RabbitMQ运行机制

AMQP中的消息路由

  • AMQP中的消息的路由过程和java开发中的JMS存在一些差别,AMQP中增加了Exchange和Bingding的角色,生成者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Banding决定交换器的消息应该发送到哪个队列
  • Exchange类型分发消息时,根据类型的不同分发策略有区别,目前四种类型:directfanouttopicheaders。headers匹配AMQP消息的header而不是路由键,headers交换器于direct交换器完全一致。但是性能差很多,目前几乎用不到了,所以直接看另外三种类型:

    • Direct Exchange:消息中的路由键(routing key)如果和Binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换器要求路由键为“dog”,则只转发royting key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

    • Fanout Exchange:每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上,很像子网广播,每台子网内的主机都获得一份复制的消息。fanout类型转发是最快的。

    • Topic Exchange: topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样会识别两个通配符:”#”和”*”。#匹配0个或多个单词,*匹配一个单词。

springboot整合RabbitMQ

我们首先进入RabbitMQ的管理界面,创建一下我们的交换器方便进行测试

按照这个进行创建,三个交换器和四个队列
1、创建交换器:

2、创建消息队列

3、交换器与队列进行绑定

topic交换器有点特殊

我们可以在RabbitMQ的界面进行测试,看消息是否能发到队列中,测试没有问题的话,就在springboot中进行集成

RabbitMQ整合

引入spring-boot-starter-amqp

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

SpringBoot的自动配置类给我们配置了一些什么,
1)首先需要看一下RabbitMQ的自动配置类RabbitAutoConfiguration
2)在自动配置类中配置了连接工厂CachingConnectionFactory
3)RabbitProperties封装了所有的RabbitMQ的配置我们可以对其进行配置
4)RabbitTemplate:给Rabbitmq发送和接收消息的
5) AmqpAdmin : RabbitMQ系统管理组件,可以创建队列,创建交换器等

SpringBoot配置RabbitMQ

相关配置如下相关配置

#简单配置如下
spring.rabbitmq.host=192.168.1.8
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#全部配置
# base
spring.rabbitmq.host: 服务Host
spring.rabbitmq.port: 服务端口
spring.rabbitmq.username: 登陆用户名
spring.rabbitmq.password: 登陆密码
spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
spring.rabbitmq.publisher-returns: 是否启用【发布返回】
spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.parsed-addresses:


# ssl
spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1


# cache
spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION


# listener
spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒

spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态


# template
spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled: 发送重试是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重试次数
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔

SpringBoot给RabbitMQ发消息

@SpringBootTest
public class SpringbootAmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
/*
1、单播(点对点)
*/
@Test
public void contextLoads() {
//Message需要自己构造一个,定义消息体内容和消息头
//rabbitTemplate.send(exchange,routeKey,message);
//object默认当作消息体,只需要传入发送的对象,自动序列化发送rabbiotmq
//rabbitTemplate.convertAndSend(exchange,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","这是第一个消息");
//对象被默认序列化后发送出去,这里使用的是默认的序列化器,那么就会出现在rabbitmq管理界面出现看不懂的一串东西。
rabbitTemplate.convertAndSend("exchange.direct","zenhsin.news",map);
}
//接收数据
@Test
public void receive()
{
Object o = rabbitTemplate.receiveAndConvert("zenhsin.news");
System.out.println(o.getClass());
System.out.println(o);
}
}

实现自己的序列化器,配置一个自己的Config类

@Configuration
public class MyRabbitMQConfig {
//这样可以实现序列化json,自定义对象也是可以的。
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();//使用json进行序列化
}
}

上面介绍了单播,下面是广播

/*
广播
*/
@Test
public void sendMsg()
{
Map<String,Object> map = new HashMap<>();
map.put("msg","这是第二个消息");
rabbitTemplate.convertAndSend("exchange.fanout","",map);
}

这样设置广播以后,所有的队列都会收到一个消息。

SpringBoot设置监听

1、在SpringBoot入口加入@EnableRabbit

@EnableRabbit//开启基于注解的RabbitMQ模式
@SpringBootApplication
public class SpringbootAmqpApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootAmqpApplication.class, args);
}

}

2、使用RabbitListener注解

@Service
public class TestService {

@RabbitListener(queues = "zenhsin.news")
public void receive(Map<String,Object> map)
{
System.out.println(map);
}
@RabbitListener(queues = "zenshin")
public void receive01(Message message)
{
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}//这样主要有消息就会开启消费。

AmqpAdmin管理组件的使用

使用AmqpAdmin类创建交换器和队列,删除使用deleteXX即可。

@SpringBootTest
public class RabbitMQ {
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange()
{
DirectExchange directExchange = new DirectExchange("amqpadmin.exchange");
amqpAdmin.declareExchange(directExchange);
System.out.println("创建完成");
}
@Test
public void createQueue()
{
Queue queue = new Queue("amqpadmin.queue");
amqpAdmin.declareQueue(queue);
System.out.println("创建完成");
}
@Test
public void createBinding()
{
Binding binding = new Binding("amqpadmin.queue",
Binding.DestinationType.QUEUE,
"amqpadmin.exchange",
"amqp.haha",
null);
amqpAdmin.declareBinding(binding);
System.out.println("创建完成");
}
}
文章作者: zenshin
文章链接: https://zlh.giserhub.com/2020/04/25/springboot/mq/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 zenshin's blog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论