RabbitMQ快速入门

张开发
2026/4/18 15:02:24 15 分钟阅读

分享文章

RabbitMQ快速入门
1.单机部署虚拟机中使用Docker来安装Rocky9,CentOS7)下载并拉取镜像docker pull rabbitmq:3-management2.安装MQ执行下面的命令来运行MQ容器docker run \ -e RABBITMQ_DEFAULT_USER#你的账号# \ -e RABBITMQ_DEFAULT_PASS#你的密码# \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management(可选)获取虚拟机地址ip addr3.AMQP 依赖引入!--AMQP依赖包含RabbitMQ-- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependencyconsumer的yaml配置spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码pulisher的yaml配置spring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码4.访问mq控制台输入账号密码https://#你的虚拟机地址#:15672登录页面overview这里展示了节点详细信息节点个数等什么是Connections消息发布者生产者和接收者消费者与mq建立的连接什么是Channels连接后需要建立的通道可以理解为消息发送和接受的具体对象什么是Exchange交换机消息接收的方式直连交换机Direct Exchange直连交换机通过精确匹配路由键Routing Key将消息路由到队列。消息的路由键必须与队列绑定的路由键完全一致才会被转发。适用于需要精确路由的场景例如日志系统中根据日志级别分发消息。指定路由的routing_key red,yellow只接收到属性为red或yellow的队列消息路由规则忽略某一路由消息发送到所有绑定队列。典型用例任务分发、日志级别过滤。RabbitListener(bindings QueueBinding( valueQueue(namedirect.queue1), exchange Exchange(name666.direct,type ExchangeTypes.DIRECT), key {red,blue} )) public void listenDireQueue1(String message)throws Exception{ System.out.println(消费者接受到direct.queue1消息:【message】); } RabbitListener(bindings QueueBinding( valueQueue(namedirect.queue2), exchange Exchange(name666.direct,type ExchangeTypes.DIRECT), key {red,yellow} )) public void listenDireQueue2(String message)throws Exception{ System.out.println(消费者接受到direct.queue2消息:【message】);广播交换机Fanout Exchange扇形交换机会将消息广播到所有绑定的队列忽略路由键。适用于需要一对多广播的场景例如新闻推送或事件通知。路由规则通过绑定某一队列消息发送到所有绑定队列。绑定队列通过config或注解Queuebinding绑定某一队列典型用例广播通知、事件发布。configBean public FanoutExchange fanoutExchange(){ return new FanoutExchange( itcast.fanout ); } //f.queue1 Bean public Queue fanoutQueue1(){ return new Queue( fanout.queue1 ); } //绑定队列到交换机 Bean public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //f.queue2 Bean public Queue fanoutQueue2(){ return new Queue( fanout.queue2 ); } Bean public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); }listenerRabbitListener(queues fanout.queue1 ) public void listenFanoutQueue1(String message)throws Exception{ System.out.println(消费者接受到fanout.queue1消息:【message】); } RabbitListener(queues fanout.queue2 ) public void listenFanoutQueue2(String message)throws Exception{ System.out.println(消费者接受到fanout.queue2消息:【message】); }主题交换机Topic Exchange主题交换机通过路由键的模式匹配将消息路由到队列。路由键支持通配符*匹配一个单词#匹配零或多个单词灵活性高。适用于需要基于模式的路由场景例如多维度消息分类。路由规则通过在routing_key中指定通配符赖匹配队列# 匹配零个或一个或多个 * 仅匹配一个china.# 可以匹配 china.news和china.news.nowchina.* 匹配 china.news和china.user典型用例动态路由、多条件过滤。RabbitListener(bindings QueueBinding( valueQueue(nametopic.queue1), exchange Exchange(name666.topic,type ExchangeTypes.TOPIC), key {china.#} )) public void listenTopicQueue1(String message)throws Exception{ System.out.println(消费者接受到topic.queue1消息:【message】); } RabbitListener(bindings QueueBinding( valueQueue(nametopic.queue2), exchange Exchange(name666.topic,type ExchangeTypes.TOPIC), key {#.news} )) public void listenTopicQueue2(String message)throws Exception{ System.out.println(消费者接受到topic.queue2消息:【message】);什么是Queues队列消息存储的位置Classic Queue预取消息且顺序分配消息Work Queue持久化队列 消息 手动 ACK 公平分发如何实现一个简单的Work Queue持久化队列 消息 手动 ACK 公平分发配置文件如何公平分发,设置预取消息为prefetch : 1如何手动ack设置 acknowledge-mode: manualspring: rabbitmq: host:#你的虚拟机地址 port: 5672 virtual-host: / username: #你的账号 password: #你的密码 virtual-host: / #虚拟主机 listener: simple: acknowledge-mode: manual # 手动确认消息生产环境必须用 prefetch: 1 # 每次只预取1条消息 → 公平分发核心 concurrency: 1 # 每个消费者实例的并发数声明工作队列创建配置类声明持久化队列服务重启消息不丢失import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Work Queue 队列配置 * 无需声明交换机使用默认交换机 */ Configuration public class WorkQueueConfig { // 队列名称 public static final String WORK_QUEUE work.queue; /** * 声明队列持久化、非排他、非自动删除 */ Bean public Queue workQueue() { // 参数队列名、是否持久化、是否排他、是否自动删除 return new Queue(WORK_QUEUE, true, false, false); } }消息生产者发送任务消息到工作队列import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; Service public class WorkQueueProducer { Resource private RabbitTemplate rabbitTemplate; /** * 发送任务消息 * param message 任务内容 */ public void sendTask(String message) { // 参数路由键(队列名)、消息内容 rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message); System.out.println(生产者发送消息 message); } }多个消息消费者创建两个消费者监听同一个工作队列模拟并行处理任务import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.io.IOException; Service public class WorkQueueConsumer1 { /** * 监听工作队列 * param message 消息内容 * param channel 信道 * param msg 原始消息对象 */ RabbitListener(queues WorkQueueConfig.WORK_QUEUE) public void consume(String message, Channel channel, Message msg) throws IOException { long deliveryTag msg.getMessageProperties().getDeliveryTag(); try { // 模拟业务处理耗时任务 System.out.println(消费者1 处理消息 message); // 模拟耗时1秒 Thread.sleep(1000); // 【手动确认】消息处理成功 // 参数消息标识、是否批量确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { System.err.println(消费者1 处理消息失败 message); // 【手动拒绝】消息处理失败重回队列 // 参数消息标识、是否批量、是否重回队列 channel.basicNack(deliveryTag, false, true); } } }什么是Admin用户权限管理这里采用多租户架构不同用户间相互隔离

更多文章