问题大概描述是:
邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息
我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。
但不可能天天守着看然后重启一下吧
于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
</policyEntry>
我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果
我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息
package cn.msb.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
selectorExpression = "first",
selectorType = SelectorType.TAG,
consumeThreadMax = 1, consumeTimeout = 1000)
public class MyConsumer1 implements RocketMQListener<String> {
public void onMessage(String message) {
if(message.contains("1")) {
try {
System.out.println("1 阻塞中。。。");
Thread.sleep(1000*60*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("received message: {}", message);
}
}
我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理
1
superrichman 2019-10-30 16:36:25 +08:00 via iPhone
consumeTimeout 的单位是不是分钟
|
2
dunhanson OP @superrichman 毫秒,默认 30000
|
3
lucifer1108 2019-10-30 16:42:57 +08:00
让我想到了一个面试题,怎么限制一个方法的执行时间.
可以用 callable+executors 实现. 贴个 demo 代码 ```java Callable<String> call = new Callable<String>() { public String call() throws Exception { // 开始执行耗时操作 // Thread.sleep(1000 * 5); // return "线程执行完成."; // 响应时间较长的方法或接口调用,返回 String 类型 return getRecCourses(params); } }; try { ExecutorService exec = Executors.newFixedThreadPool(1); Future<String> future = exec.submit(call); // csvData 为 call 方法里的返回值,也就是我们方法的返回值 csvData = future.get(1000 * 1, TimeUnit.MILLISECONDS); // 任务处理超时时间设为 1 秒 } catch (TimeoutException ex) { // 捕获超时异常,超时处理,可以通过 ex 抛出异常,如果不抛出,则控制台不输出异常。 csvData = null; LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据超时,使用原来推荐系统"ex); } catch (Exception e) { csvData = null; LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据失败,使用原来推荐系统"); } ``` |
4
lucifer1108 2019-10-30 16:43:36 +08:00
@lucifer1108 什么鬼,是我用 md 的姿势不正确么
|
5
softtwilight 2019-10-30 16:44:53 +08:00
consumeThreadMax = 1, 单线程消费是业务需求吗? 改成多线程不会影响阻塞不会影响别的消费,但是阻塞的问题还是要解决
|
6
dunhanson OP @lucifer1108 这个有点繁杂
|
7
dunhanson OP @softtwilight 不是单线程消费,只是用单线程好模拟和控制
|
8
dunhanson OP @lucifer1108 按道理 MQ 都应该有这个具体的配置的
|
9
x537196 2019-10-30 17:35:19 +08:00
为什么阻塞呢?其实我没怎么看懂问题,把消息取下来,放入线程池中执行响应业务不可以吗?
|
10
justfly 2019-10-30 17:35:48 +08:00
从根上解决问题,找到阻塞的原因。
根据我的经验,如果消费者突然拿不到消息,而队列又有消息堆积的话,从客户端和服务端两侧都看下 tcp 连接还在不在。 在某些低吞吐量的场景,tcp 连接长时间空闲,某些网络中间件会断掉连接而客户端没感知,就会 block 住了,再有大吞吐量后也不会恢复。 如果连接已经断了,设置 rabbitmq 的心跳,而且心跳时间要比 tcp 自身的 keep alive 间隔短一些,保证连接活跃。 |
13
Dabaicong 2019-10-31 09:30:38 +08:00
#9 楼说的对,拉下消息,放线程池中异步执行,执行成功回调。再加上守护线程,监视任务执行,超时的话,守护线程就干掉 。
|
14
jyounn 2019-10-31 13:51:41 +08:00
......
consumeThreadMax = 1, consumeTimeout = 1000) ...... Thread.sleep(1000*60*60); 消费线程数最大为 1,然后又让消费者线程 sleep 3600 秒?线程 sleep 是不会结束的,这个时候不会创建新的消费线程,导致无法创建新线程消费.消费者消费建议使用线程池,可以复用且好管理.另外你说的阻塞具体是什么现象呢? |