RabbitMQ工作模式与实现方式

Work queues

  • 不需要设置交换机,只需指定唯一的消息队列即可进行消息传递
  • 可以有多个消费者,多个消费者通过轮询从队列中取消息
  • 消息被接受后,队列将消息移除
  • 消费在可以在没有处理完消息的情况下继续获取消息
  • 通过设置 spring.rabbitmq.listener.simple.prefetch: 1 来设置每次处理完消息后才能获取下一条

consumer

@Component
public class SpringRabbitListener {
// workqueue

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue01(String msg) throws InterruptedException {
System.out.println("消费者01 接收到simple.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue02(String msg) throws InterruptedException {
System.err.println("消费者02 接收到simple.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(200);
}
}

监听队列,自动绑定消息。通过设置休眠来模拟不同的消费能力

publisher

//    workqueue
@Test
public void testSendMessage02() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello,spring amqp_";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}

Publish/Subscribe

  • 需要设置交换机,并将队列绑定到交换机
  • 常见的交换机类型有 fanout、direct、topic
  • 可以通过基于配置和基于注解的方式来声明交换机、声明队列、绑定队列到交换机

主要是 consumer 的不同,publish 只需指定队列发送消息

基于配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
* 这是基于配置类的方式来注册交换机、队列,并绑定。
*/
@Configuration
public class FanoutConfig {
// 声明交换机 heroxin.fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("heroxin.fanout");
}

// 声明队列一 heroxin.queue01
@Bean
public Queue fanoutQueue01() {
return new Queue("fanout.queue01");
}

// 声明队列二 heroxin.queue02
@Bean
public Queue fanoutQueue02() {
return new Queue("fanout.queue02");
}

// 绑定队列一到交换机
@Bean
public Binding fanoutBinding01(Queue fanoutQueue01, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue01).to(fanoutExchange);
}

// 绑定队列二到交换机
@Bean
public Binding fanoutBinding02(Queue fanoutQueue02, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue02).to(fanoutExchange);
}
}

配置好后在 service 中直接监听对应队列即可

//    fanoutExchange
// 监听队列一
@RabbitListener(queues = "fanout.queue01")
public void listenFanoutQueue01(String msg) throws InterruptedException {
System.out.println("消费者01 接收到fanout.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(200);
}

// 监听队列二
@RabbitListener(queues = "fanout.queue02")
public void listenFanoutQueue02(String msg) throws InterruptedException {
System.out.println("消费者02 接收到fanout.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(200);
}

基于注解

//    directExchange

// 监听队列一
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "direct.queue01"),
exchange = @Exchange(name = "heroxin.direct"),
key = {"red", "blue"}
)
)
public void listenDirectqueue01(String msg) throws InterruptedException {
System.out.println("消费者01 接收到direct.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(200);
}


// 监听队列二
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "direct.queue02"),
exchange = @Exchange(name = "heroxin.direct"),
key = {"red", "yellow"}
)
)
public void listenDirectqueue02(String msg) throws InterruptedException {
System.out.println("消费者02 接收到direct.queue的消息为:" + msg + "," + LocalTime.now());
Thread.sleep(200);
}