V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
dunhanson
V2EX  ›  程序员

MQ 消费者阻塞如何处理?(ActiveMQ、RocketMQ)

  •  
  •   dunhanson · 2019-10-30 16:23:57 +08:00 · 7040 次点击
    这是一个创建于 1835 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题大概描述是:

    邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息

    我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。

    但不可能天天守着看然后重启一下吧

    于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果

    <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">               
        <slowConsumerStrategy>
            <abortSlowConsumerStrategy abortConnection="false"/>
        </slowConsumerStrategy>  
    </policyEntry>
    

    我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果

    我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息

    package cn.msb.rocketmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.annotation.SelectorType;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1",
            consumerGroup = "my-consumer_test-topic-1",
            selectorExpression = "first",
            selectorType = SelectorType.TAG,
            consumeThreadMax = 1, consumeTimeout = 1000)
    public class MyConsumer1 implements RocketMQListener<String> {
        public void onMessage(String message) {
            if(message.contains("1")) {
                try {
                    System.out.println("1 阻塞中。。。");
                    Thread.sleep(1000*60*60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.info("received message: {}", message);
        }
    }
    

    我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理

    16 条回复    2019-11-01 10:29:59 +08:00
    superrichman
        1
    superrichman  
       2019-10-30 16:36:25 +08:00 via iPhone
    consumeTimeout 的单位是不是分钟
    dunhanson
        2
    dunhanson  
    OP
       2019-10-30 16:39:33 +08:00
    @superrichman 毫秒,默认 30000
    lucifer1108
        3
    lucifer1108  
       2019-10-30 16:42:57 +08:00
    让我想到了一个面试题,怎么限制一个方法的执行时间.
    可以用 callable+executors 实现.
    贴个 demo 代码
    ```java
    Callable<String> call = new Callable<String>() {
    public String call() throws Exception {
    // 开始执行耗时操作
    // Thread.sleep(1000 * 5);
    // return "线程执行完成.";
    // 响应时间较长的方法或接口调用,返回 String 类型
    return getRecCourses(params);
    }
    };
    try {
    ExecutorService exec = Executors.newFixedThreadPool(1);
    Future<String> future = exec.submit(call);
    // csvData 为 call 方法里的返回值,也就是我们方法的返回值
    csvData = future.get(1000 * 1, TimeUnit.MILLISECONDS); // 任务处理超时时间设为 1 秒
    } catch (TimeoutException ex) {
    // 捕获超时异常,超时处理,可以通过 ex 抛出异常,如果不抛出,则控制台不输出异常。
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据超时,使用原来推荐系统"ex);
    } catch (Exception e) {
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据失败,使用原来推荐系统");
    }
    ```
    lucifer1108
        4
    lucifer1108  
       2019-10-30 16:43:36 +08:00
    @lucifer1108 什么鬼,是我用 md 的姿势不正确么
    softtwilight
        5
    softtwilight  
       2019-10-30 16:44:53 +08:00
    consumeThreadMax = 1, 单线程消费是业务需求吗? 改成多线程不会影响阻塞不会影响别的消费,但是阻塞的问题还是要解决
    dunhanson
        6
    dunhanson  
    OP
       2019-10-30 17:24:00 +08:00
    @lucifer1108 这个有点繁杂
    dunhanson
        7
    dunhanson  
    OP
       2019-10-30 17:24:35 +08:00
    @softtwilight 不是单线程消费,只是用单线程好模拟和控制
    dunhanson
        8
    dunhanson  
    OP
       2019-10-30 17:27:27 +08:00
    @lucifer1108 按道理 MQ 都应该有这个具体的配置的
    x537196
        9
    x537196  
       2019-10-30 17:35:19 +08:00
    为什么阻塞呢?其实我没怎么看懂问题,把消息取下来,放入线程池中执行响应业务不可以吗?
    justfly
        10
    justfly  
       2019-10-30 17:35:48 +08:00
    从根上解决问题,找到阻塞的原因。

    根据我的经验,如果消费者突然拿不到消息,而队列又有消息堆积的话,从客户端和服务端两侧都看下 tcp 连接还在不在。

    在某些低吞吐量的场景,tcp 连接长时间空闲,某些网络中间件会断掉连接而客户端没感知,就会 block 住了,再有大吞吐量后也不会恢复。

    如果连接已经断了,设置 rabbitmq 的心跳,而且心跳时间要比 tcp 自身的 keep alive 间隔短一些,保证连接活跃。
    dunhanson
        11
    dunhanson  
    OP
       2019-10-31 09:11:46 +08:00
    @x537196 还没找
    dunhanson
        12
    dunhanson  
    OP
       2019-10-31 09:12:02 +08:00
    @justfly 问题确实要找的
    Dabaicong
        13
    Dabaicong  
       2019-10-31 09:30:38 +08:00
    #9 楼说的对,拉下消息,放线程池中异步执行,执行成功回调。再加上守护线程,监视任务执行,超时的话,守护线程就干掉 。
    jyounn
        14
    jyounn  
       2019-10-31 13:51:41 +08:00
    ......
    consumeThreadMax = 1, consumeTimeout = 1000)
    ......
    Thread.sleep(1000*60*60);

    消费线程数最大为 1,然后又让消费者线程 sleep 3600 秒?线程 sleep 是不会结束的,这个时候不会创建新的消费线程,导致无法创建新线程消费.消费者消费建议使用线程池,可以复用且好管理.另外你说的阻塞具体是什么现象呢?
    dunhanson
        15
    dunhanson  
    OP
       2019-10-31 16:29:04 +08:00
    @jyounn 我这个是模拟线上环境的阻塞状态
    线上肯定不止一个消费者线程数的
    阻塞情况是指,线上配置 5 个消费者线程池,然后刚好 5 个都在执行的过程中卡住了(问题还不清楚,你就理解为都因为某种原因 sleep 了)
    jyounn
        16
    jyounn  
       2019-11-01 10:29:59 +08:00
    @dunhanson 根据你的描述,看下消费的逻辑中是否有导致无限等待的情况?可以搭一个 RocketMQ 控制台看下生产者消费者的状态,通过 debug 看看.
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5687 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 03:04 · PVG 11:04 · LAX 19:04 · JFK 22:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.