万籁俱寂,万字将成。
刘耀文
Stay hungry. Stay foolish.
© 2024-2026
Powered by Mix Space&
余白 / Yohaku
.
正在被0人看爆
关于
关于本站关于我
更多
时间线友链
联系
写留言发邮件 ↗
刘耀文
Stay hungry. Stay foolish.
链接
关于本站·关于我·时间线·友链·写留言·发邮件
© 2024-2026 Powered by Mix Space&
余白 / Yohaku
.
正在被0人看爆
赣ICP备2024031666号
RSS 订阅·站点地图·
··|
RSS 订阅·站点地图·|··|赣ICP备2024031666号
稍候片刻,月出文自明。

RabbitMQ快速使用

/
20
AI·GEN

关键洞察

RabbitMQ快速使用

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • 安装

    docker run --name rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    --network hm-net \
    --hostname my-rabbit \
    -e RABBITMQ_DEFAULT_USER=user \
    -e RABBITMQ_DEFAULT_PASS=password \
    -v mq_plugins:/plugins \
    -d rabbitmq:3.8-management
    

    Spring Boot 集成

    example:springboot-middlewave-example/springboot-rabbitmq at master · liyown/springboot-middlewave-example (github.com)

    发送者

    1. 发送者超时重连(异步)配置
    YAML
    spring:  
      application:  
        name: publisher  
      rabbitmq:  
        host: 192.168.208.128  
        port: 5672  
        username: user  
        password: password  
        virtual-host: /hmall  
        # 发送confirm机制
        publisher-confirm-type: correlated  
        publisher-returns: true  
        # 重试机制
        template:  
          retry:  
            enabled: true  
            max-attempts: 3  
            initial-interval: 1000  
            max-interval: 10000  
            multiplier: 2  
    server:  
      port: 7081
    
    1. 使用配置:
    • 配置消息转化服务,默认是JDK序列化,不易读,占用空间大。兼容性不高
    JAVA
    @Bean  
    public MessageConverter messageConverter() {  
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();  
        jackson2JsonMessageConverter.setCreateMessageIds(true);  
        return  jackson2JsonMessageConverter;  
    }
    
    • 设置消息发送回调 回调有两种情况,一是到了交换机就会返回confirm信息,如果没有到达队列,将会回调ReturnsCallback,此时一般是运维层面的问题
      image.png|600

      image.png|600
    CodeBlock Loading...

    JAVA API使用

    1. 带回调的使用:
    CodeBlock Loading...
    1. 延迟队列使用
    CodeBlock Loading...

    消费者

    spring配置

    CodeBlock Loading...

    创建交换机、队列、绑定关系

    CodeBlock Loading...

    创建消息监听

    CodeBlock Loading...

    死信队列的初始化 当到达重试次数后,有三种策略,分别为

    image.png|600

    image.png|600

    JAVA
    @Bean  
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){  
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");  
    }
    
    

    延迟队列的使用

    安装插件(github) rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com) 复制插件到插件文件夹 安装插件

    CodeBlock Loading...

    创建交换机、队列、绑定关系

    CodeBlock Loading...

    发送消息

    CodeBlock Loading...

    消费延迟消息:和普通队列一致

    JAVA
    public void init() {  
        rabbitTemplate.setReturnsCallback(  
                (ReturnedMessage returned) -> {  
                    System.out.println("消息丢失: " + returned.getMessage());  
                }  
        );  
        rabbitTemplate.setBeforePublishPostProcessors((message) -> {  
            MessageProperties messageProperties = message.getMessageProperties();  
            String messageId = messageProperties.getMessageId();  
            System.out.println("messageId: " + messageId);  
            return message;  
        });  
      
    }
    
    JAVA
    public void testRabiitStart() throws InterruptedException {  
      
        CorrelationData  correlationData = new CorrelationData("1");  
        // 设置回调  
        rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {  
            System.out.println("cause: " + cause);  
            System.out.println("correlationData: " + correlationData1);  
            if (ack) {  
                System.out.println("消息发送成功");  
            } else {  
                System.out.println("消息发送失败");  
            }  
        });  
        // 队列名  
        String queueName = "queue.lyw";  
      
      
        // 发送消息, 如果没有队列会自动创建  
        rabbitTemplate.convertAndSend("direct.lyw", queueName, "hello rabbitmq", correlationData);  
      
    }
    
    JAVA
    public void testRabiitMqDelayMessage() {  
        CorrelationData  correlationData = new CorrelationData("1");  
        rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                      (Message message) -> {  
                                          message.getMessageProperties().setDelayLong(10000L);  
                                          return message;  
                                      }  
                                      , correlationData);  
    }
    
    YAML
    spring:  
      application:  
        name: publisher  
      rabbitmq:  
        host: 192.168.208.128  
        port: 5672  
        username: user  
        password: password  
        virtual-host: /hmall  
        listener:  
          simple:  
            # 表示自动回应,当发送异常,将重试,重试不成功发送到死信队列(三种策略)
            acknowledge-mode: auto  
            retry:  
              enabled: true  
              max-attempts: 3  
              initial-interval: 1000  
              max-interval: 10000  
              multiplier: 2  
              stateless: true  
    server:  
      port: 7082
    
    JAVA
    @Bean  
    public DirectExchange directExchange() {  
        return new DirectExchange("direct.lyw");  
    }  
          
    @Bean  
    public Queue queue() {  
        return new Queue("queue.lyw");  
    }  
          
    @Bean  
    public Binding binding(Queue queue, FanoutExchange fanoutExchange) {  
        return BindingBuilder.bind(queue).to(fanoutExchange);  
    }
    
    JAVA
    @RabbitListener(queues = "work.queue1")  
    public void onMessageWorkerQueue2(String message) throws InterruptedException {  
        Thread.sleep(200);  
        Thread thread = Thread.currentThread();  
        log.info("工作队列2接收到消息: {},时间:{},线程{}", message, LocalDateTime.now(), thread.getName());  
    }
    
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    JAVA
    @Bean  
    public Queue delayQueue() {  
        return new Queue("delay.queue");  
    }  
          
    @Bean  
    public DirectExchange delayExchange() {  
        return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();  
    }  
          
    @Bean Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {  
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.queue");  
    }
    
    JAVA
    @Test  
    public void testRabiitMqDelayMessage() {  
        CorrelationData  correlationData = new CorrelationData("1");  
        rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "hello rabbitmq delay",  
                                      (Message message) -> {  
                                          message.getMessageProperties().setDelayLong(10000L);  
                                          return message;  
                                      }  
                                      , correlationData);  
    }  
          
    @Test  
    public void testRabiitMqDelayMessage2() {  
        rabbitTemplate.convertAndSend("delay.direct", "delay.queue", "1812793267355439105", message -> {  
            message.getMessageProperties().setDelayLong(10000L);  
            return message;  
        });  
    }