Spring Reactive Web Webflux 整合 RabbitMQ

张开发
2026/4/18 7:12:14 15 分钟阅读

分享文章

Spring Reactive Web Webflux 整合 RabbitMQ
Spring Reactive Web Webflux 整合 rabbitMQ引言例子(广播消息)pom.xmlConfiguration写一个监听容器工厂在Controller中使用思考引言在使用spring-web 的 websocket 时我们可以在RabbitListener或CloudStream StreamListener中直接使用messagingTemplate.convertAndSend或SendTo 广播消息。只需要一个消费者监听就可以不管有没有客户端连接。在webflux中如何使用mq进行消息推送呢首先需要知道的是当用户请求发生时我们才创建队列因为这和websocket不同我们是被拉取方我们无法主动发送消息SSE协议本身就是如此。因此我们需要针对每个用户创建一个单独的队列否则就成轮询或公平分发了。下面不写废话不写原理不复制粘贴源码美其名曰分析直接上例子。例子(广播消息)pom.xmldependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-webflux/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencyConfiguration创建扇形交换机和RabbitAdmin这里不创建绑定因为用户请求时才会创建队列和绑定然后监听消息。CachingConnectionFactory 是amqp的autoconfiguration自动配置的写接口名也可以因为代码中没有import具体包名怕你们导入错所以使用的CachingConnectionFactory创建扇形交换机的原因就是为了广播fanout不关心路由键和此交换机绑定的所有队列都可以收到消息有些人喜欢topic用topic也行。ConfigurationpublicclassRabbitMqConfig{BeanExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange(fanoutExchange).durable(true).build();}BeanpublicRabbitAdminrabbitAdmin(CachingConnectionFactoryconnectionFactory){returnnewRabbitAdmin(connectionFactory);}}写一个监听容器工厂功能就是当用户请求时使用工厂创建队列和进行绑定并返回监听容器。写了2个create一个是可以指定队列名一个是随机队列名如果需要根据用户名进行队列创建可以使用第一个。容器中的队列可以多个有需要可以自己改一改注意创建容器时我只放入了队列并没有设置Listener设置Listener是在用户请求创建时才会设置。因为需要在Listner代码块中使用Flux的sink去推送消息所以无法提前创建。importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;ComponentpublicclassMessageListenerContainerFactory{AutowiredprivateCachingConnectionFactoryconnectionFactory;AutowiredprivateRabbitAdminrabbitAdmin;AutowiredprivateExchangefanoutExchange;publicSimpleMessageListenerContainercreate(StringqueueName){QueuequeueQueueBuilder.nonDurable(queueName).maxLength(10000).autoDelete().exclusive().build();rabbitAdmin.declareQueue(queue);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with().noargs());SimpleMessageListenerContainersimpleMessageListenerContainernewSimpleMessageListenerContainer();//在容器中放入刚创建好的队列simpleMessageListenerContainer.setQueueNames(queue.getName());simpleMessageListenerContainer.setConnectionFactory(connectionFactory);/* //设置当前的消费者数量 simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setMaxConcurrentConsumers(1); //设置消息是否重回队列 simpleMessageListenerContainer.setDefaultRequeueRejected(false); //设置自动确认消息simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); //设置暴露监听器通道 simpleMessageListenerContainer.setExposeListenerChannel(true); */returnsimpleMessageListenerContainer;}publicSimpleMessageListenerContainercreate(){QueuequeueQueueBuilder.nonDurable().maxLength(10000).autoDelete().exclusive().build();rabbitAdmin.declareQueue(queue);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange).with().noargs());SimpleMessageListenerContainersimpleMessageListenerContainernewSimpleMessageListenerContainer();//在容器中放入刚创建好的队列simpleMessageListenerContainer.setQueueNames(queue.getName());simpleMessageListenerContainer.setConnectionFactory(connectionFactory);returnsimpleMessageListenerContainer;}}在Controller中使用//我们自己写的工厂AutowiredMessageListenerContainerFactorycontainerFactory;RequestMapping(value/test,producesMediaType.TEXT_EVENT_STREAM_VALUE)ResponseBodypublicFluxStringtest(){//用自己写的工厂创建一个监听容器SimpleMessageListenerContainercontainercontainerFactory.create();returnFlux.create(sink-{//容器中设置监听器用于接收到消息后使用sink发送给客户端container.setupMessageListener((ChannelAwareMessageListener)(Messagemessage,Channelchannel)-{if(sink.isCancelled()){container.stop();return;}StringmsgnewString(message.getBody());sink.next(msg);});//启动容器和停止容器sink.onRequest(r-container.start());sink.onDispose(container::stop);});}使用rabbitmq和redis的区别是队列名无法固定必须一个请求对应一个队列否则消息会成轮询或公平分发手动ack因此在监听容器中队列名要么根据用户名创建要么不写队列名会随机生成。思考如果基于SSE协议难道只能每个请求创建一个队列吗10w客户端创建10w个队列有没有办法优化呢首先上面提到过如果相同队列会产生轮询或公平分发但是针对每个请求创建一个队列太过浪费事实上我们可以这么做每个服务创建一个队列这样达到广播的目的在这个队列监听者中我们进行消息消费将消息转发到我们自己写被观察者类中。客户端请求我们添加为观察者如果被观察者有变化推送消息。这样我们就可以做到每个服务一个队列所有消费我们自己通知。

更多文章