V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
heavyrainn
V2EX  ›  Go 编程语言

请教一个 NATS-STRAMING 的问题,不知道是不是个 bug…如果先 publish 消息,然后再启动 QueueSubscriber 的话,相同队列中只有一个 Subscriber 能够消费消息?

  •  
  •   heavyrainn · 2019-08-29 13:17:23 +08:00 · 2932 次点击
    这是一个创建于 1674 天前的主题,其中的信息可能已经有所发展或是发生改变。

    PUBLISHER 代码:

    func main() {
    	nc,err := stan.Connect("test-cluster","idc",stan.NatsURL("nats://127.0.0.1:4222"))
    	if err != nil{
    		panic(err)
    	}
    	fmt.Println("connect succ")
    	for i:=0;i<10;i++{
    		fmt.Println("publishing:",i)
    		err := nc.Publish("tp1",[]byte(strconv.Itoa(i)))
    		if err != nil{
    			panic(err)
    		}
    	}
    	nc.Close()
    }
    

    QueueSubscriber 代码:

    func main() {
    	nc,err := stan.Connect("test-cluster","subscriber",stan.NatsURL("nats://localhost:4222"))
    	if err != nil{
    		panic(err)
    	}
    	defer nc.Close()
    	subs := make([]stan.Subscription,3)
    	for i:=0;i<3;i++{
    		workername := "worker"+strconv.Itoa(i)
    		fmt.Println(fmt.Sprintf("QueueSubscribe %s start",workername))
    		sub,err := nc.QueueSubscribe("tp1","ch1", func(msg *stan.Msg) {
    			fmt.Println(workername,"get msg:",string(msg.Data),"start doing something")
    			time.Sleep(1*time.Second)
    		},stan.DurableName("subscriber"),stan.AckWait(time.Hour*24))
    		if err != nil{
    			panic(err)
    		}
    		subs[i] = sub
    	}
    	c := make(chan os.Signal, 1)
    	signal.Notify(c, os.Interrupt, os.Kill)
    	select{
    	case <- c:
    		fmt.Println("Subscriber CLOSE")
    		for i,_ := range subs{
    			subs[i].Close()
    		}
    		nc.Close()
    		fmt.Println("quit")
    	}
    }
    

    Publisher 输出:

    connect succ
    publishing: 0
    publishing: 1
    publishing: 2
    publishing: 3
    publishing: 4
    publishing: 5
    publishing: 6
    publishing: 7
    publishing: 8
    publishing: 9
    

    QueueSubscriber 输出:

    QueueSubscribe worker0 start
    QueueSubscribe worker1 start
    worker0 get msg: 0 start doing something
    QueueSubscribe worker2 start
    worker0 get msg: 1 start doing something
    worker0 get msg: 2 start doing something
    worker0 get msg: 3 start doing something
    worker0 get msg: 4 start doing something
    worker0 get msg: 5 start doing something
    worker0 get msg: 6 start doing something
    worker0 get msg: 7 start doing something
    worker0 get msg: 8 start doing something
    worker0 get msg: 9 start doing something
    

    请问朋友们是否有遇到过一样的问题呢?谢谢大家

    4 条回复    2019-09-11 17:28:02 +08:00
    freestyle
        1
    freestyle  
       2019-08-30 12:44:52 +08:00 via iPhone
    这是特性,queue 模式. 如果想每个订阅者都收到,设置不同的 queue 名字或普通方式订阅就行. 可以看下我的博客 https://imhanjm.com/2018/02/17/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3nats%20&%20nats%20streaming/
    heavyrainn
        2
    heavyrainn  
    OP
       2019-08-30 15:04:59 +08:00
    @freestyle 额…不是,我的意思是,为啥其他的 worker 不工作,只有 worker0 在工作
    freestyle
        3
    freestyle  
       2019-08-31 07:16:06 +08:00 via iPhone
    @heavyrainn 你这是同一个连接啊 一般 queueSub 是不同的进程即不同的连接 你试试给每个 worker 创建一个连接.
    heavyrainn
        4
    heavyrainn  
    OP
       2019-09-11 17:28:02 +08:00
    @freestyle 我搞清楚了…是因为没有设置 MaxInflight 值的问题,派出的任务都最先启动的 worker 给接收了。设置 MaxInflight 值为 1 实现了正常分 worker 执行。谢谢啦
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5342 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 08:24 · PVG 16:24 · LAX 01:24 · JFK 04:24
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.