RabbitMQ工作模式与实现方式
Work queues
- 不需要设置交换机,只需指定唯一的消息队列即可进行消息传递
- 可以有多个消费者,多个消费者通过轮询从队列中取消息
- 消息被接受后,队列将消息移除
- 消费在可以在没有处理完消息的情况下继续获取消息
- 通过设置 spring.rabbitmq.listener.simple.prefetch: 1 来设置每次处理完消息后才能获取下一条

consumer
@Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到simple.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(20); }
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue02(String msg) throws InterruptedException { System.err.println("消费者02 接收到simple.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); } }
|
监听队列,自动绑定消息。通过设置休眠来模拟不同的消费能力
publisher
@Test public void testSendMessage02() throws InterruptedException { String queueName = "simple.queue"; String message = "hello,spring amqp_"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
Publish/Subscribe
- 需要设置交换机,并将队列绑定到交换机
- 常见的交换机类型有 fanout、direct、topic
- 可以通过基于配置和基于注解的方式来声明交换机、声明队列、绑定队列到交换机

主要是 consumer 的不同,publish 只需指定队列发送消息
基于配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("heroxin.fanout"); }
@Bean public Queue fanoutQueue01() { return new Queue("fanout.queue01"); }
@Bean public Queue fanoutQueue02() { return new Queue("fanout.queue02"); }
@Bean public Binding fanoutBinding01(Queue fanoutQueue01, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue01).to(fanoutExchange); }
@Bean public Binding fanoutBinding02(Queue fanoutQueue02, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue02).to(fanoutExchange); } }
|
配置好后在 service 中直接监听对应队列即可
@RabbitListener(queues = "fanout.queue01") public void listenFanoutQueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到fanout.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
@RabbitListener(queues = "fanout.queue02") public void listenFanoutQueue02(String msg) throws InterruptedException { System.out.println("消费者02 接收到fanout.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
|
基于注解
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue01"), exchange = @Exchange(name = "heroxin.direct"), key = {"red", "blue"} ) ) public void listenDirectqueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到direct.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue02"), exchange = @Exchange(name = "heroxin.direct"), key = {"red", "yellow"} ) ) public void listenDirectqueue02(String msg) throws InterruptedException { System.out.println("消费者02 接收到direct.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
|