- 导入依赖的库
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.7.RELEASE</version> </dependency>
- 添加配置(消息发布方和订阅方同时添加):
@Configuration public class TopicQueueConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
- yml中添加配置
spring: rabbitmq: username: system password: system addresses: 192.168.3.210:5672 #rabbitMQ宿主机IP和默认端口 virtual-host: my_vhost cache: connection: mode: channel
- docker安装RabbitMQ:
容器启动后访问http://192.168.3.210:15672时无法访问,进入容器执行:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=system -e RABBITMQ_DEFAULT_PASS=system rabbitmq:3.10.6
rabbitmq-plugins enable rabbitmq_management
- 创建Queues:
- 通过RabbitTemplate的convertAndSend方法发布消息:
rabbitTemplate.convertAndSend("taskExchange", "rebate.direct", TaskMessage.buildRebateMessage());
- 消息接收端:
@Component @RabbitListener(bindings = @QueueBinding( value = @Queue("taskQueue"), exchange = @Exchange(value = "taskExchange", type = ExchangeTypes.DIRECT), key = "rebate.direct" )) public class TaskReceiveListener { @RabbitHandler public void onMessage(@Payload TaskMessage msg) { log.info("mq receive msg = " + msg.toString()); log.info(msg.getCategory().toString()); } }
注意:本文归作者所有,未经作者允许,不得转载