V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
loveaeen
V2EX  ›  Java

Spring-kafka 指定分区消费者,发现消费者不活动

  •  
  •   loveaeen · 2023-01-06 15:35:09 +08:00 · 1126 次点击
    这是一个创建于 447 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题

    问题最近想试试 kafka 的相关操作,在 spring-kafka 3.0.1 版本下。发现当我指定消费者去消费指定分区时,kafka 无法检测到该消费者(消费组内无消费者),但是却可以正常消费。 如果不指定,而是由 kafka 自动平衡消费者的话,就可以检测到

    示例代码

    
    // 该代码正常
    @KafkaListener(id = "consumer-all", clientIdPrefix = "consumer-all", topics = "topic1", groupId = "mygroup", concurrency = "1")
    public void listenAllOne(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
    	System.out.println("这是第一个消费者-----"+consumerRecord.toString());
        ack.acknowledge();
    }
    
    
    // 该消费者可以正常消费,但是无法被 kafka 检测到
    @KafkaListener(id="consumer-1", clientIdPrefix = "consumer-1", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0,1", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
    public void listenTop1(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        System.out.println("这是第一个分区消费者-----"+consumerRecord.toString());
        ack.acknowledge();
    }
    
    @KafkaListener(id="consumer-2", clientIdPrefix = "consumer-2", topicPartitions = {@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "2,3", initialOffset = "0"))}, groupId = "mygroup1", concurrency = "1")
    public void listenTop2(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        System.out.println("这是第二个分区消费者-----"+consumerRecord.toString());
        ack.acknowledge();
    }
    
    
    第 1 条附言  ·  2023-01-06 16:41:17 +08:00

    CLI查询的方式也是一样的结果,如图

    6 条回复    2023-01-06 17:00:13 +08:00
    wangxin3
        1
    wangxin3  
       2023-01-06 16:13:31 +08:00
    web 监控界面是第三方的吧,可能没有适配?
    wangxin3
        2
    wangxin3  
       2023-01-06 16:22:01 +08:00
    可以试试 kafka 的官方命令行来查询
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 你的 groupId
    loveaeen
        3
    loveaeen  
    OP
       2023-01-06 16:40:15 +08:00
    这个我也试了。也是没有 active members
    ![]( https://hexo-1302895213.cos.ap-shanghai.myqcloud.com/blog/202301061639200.png)
    loveaeen
        4
    loveaeen  
    OP
       2023-01-06 16:42:17 +08:00
    @wangxin3 感觉只能想到是 spring-kafka 的版本问题了。
    novolunt
        5
    novolunt  
       2023-01-06 16:44:03 +08:00
    https://www.kafkatool.com/ GUI 客户端看看
    wangxin3
        6
    wangxin3  
       2023-01-06 17:00:13 +08:00
    @loveaeen #4 原文:“@wangxin3 感觉只能想到是 spring-kafka 的版本问题了。”
    ======
    回复:降版本试试吧
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1246 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 17:59 · PVG 01:59 · LAX 10:59 · JFK 13:59
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.