V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
dt2vba
V2EX  ›  程序员

求助 go-nsq 消费者的问题

  •  
  •   dt2vba · 2019-06-04 17:14:41 +08:00 · 2064 次点击
    这是一个创建于 1759 天前的主题,其中的信息可能已经有所发展或是发生改变。
    大佬,下午好,我使用 go-nsq 消息队列处理请求时遇到一个问题。

    假设有以下三个文件
    main.go publishMessage.go consumeMessage.go

    启动服务
    go run main.go publishMessage.go
    发送请求
    curl -d "localhost:1323" -d "MESSAGE"

    现在的问题是,我可以发布消息,但是只能启动新的进程去消费消息,go run consumeMessage.go

    请问如何在一个进程中处理请求呢?
    8 条回复    2019-06-12 15:23:28 +08:00
    lockerhyz
        1
    lockerhyz  
       2019-06-04 17:27:25 +08:00   ❤️ 1
    go run main.go publishMessage.go consumeMessage.go
    dt2vba
        2
    dt2vba  
    OP
       2019-06-04 18:18:42 +08:00 via Android
    @lockerhyz 谢谢关注。我试了一下,不能正常工作。
    dt2vba
        3
    dt2vba  
    OP
       2019-06-04 18:55:51 +08:00
    @lockerhyz

    你好,如果你有时间的话,可以帮我看一下吗?以下是工作流程,

    消息是 HTTP 请求表单数据

    publishMessage.go 发布消息

    consumeMessage.go 消费消息


    FILE 1 main.go

    package main

    import (
    "github.com/labstack/echo"
    )

    type User struct {Name string}

    func main() {
    e:=echo.New()
    r:=e.Group("/api/v1/applicant/register/user")
    r.POST("",createUserInNSQ)
    e.Start(":1323")
    }


    FILE 2 publishMessage.go

    package main

    import (
    ...
    "github.com/nsqio/go-nsq"
    ...
    )

    var tcpNsqdAddr="127.0.0.1:4150"

    func publishMessage(topic string,command []uint8) error {
    config:=nsq.NewConfig()
    producer,_:=nsq.NewProducer(tcpNsqdAddr,config)
    producer.Publish(topic,command)
    return nil
    }

    func createUserInNSQ(c echo.Context) (err error) {
    u:=new(User)

    topic:="InsertUser"
    command,err:=json.Marshal(u)
    publishMessage(topic,command)

    return c.String( http.StatusOK,"OK")
    }


    FILE 3 consumeMessage.go

    package main

    import (
    ...
    "github.com/nsqio/go-nsq"
    ...
    )

    var tcpNsqdAddr="127.0.0.1:4150"

    type NsqHandler struct {
    }

    func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    //insert message into MySQL
    ...

    return nil
    }

    func main() {
    config:=nsq.NewConfig()
    com,_:=nsq.NewConsumer("InsertUser","channel1",config)
    com.AddHandler(&NsqHandler{NsqHandlerId:"one"})
    com.ConnectToNSQD(tcpNsqdAddr)

    var wg=&sync.WaitGroup{}
    wg.Add(1)
    wg.Wait()
    }
    keepeye
        4
    keepeye  
       2019-06-04 18:58:31 +08:00   ❤️ 1
    学会使用 goroutine
    dt2vba
        5
    dt2vba  
    OP
       2019-06-04 19:03:28 +08:00
    @keepeye 你好,感谢提供帮助!
    lockerhyz
        6
    lockerhyz  
       2019-06-11 16:08:13 +08:00   ❤️ 1
    @dt2vba
    1. 把 consumeMessage.go 的 main 函数修改成其他函数名,例如 consumer()
    2. 修改 main.go 中的 main 函数

    ```
    func main() {
    go consumer()
    e:=echo.New()
    r:=e.Group("/api/v1/applicant/register/user")
    r.POST("",createUserInNSQ)
    e.Start(":1323")
    }
    ```
    lockerhyz
        7
    lockerhyz  
       2019-06-11 16:09:39 +08:00
    @lockerhyz
    3. go run main.go publishMessage.go consumeMessage.go
    dt2vba
        8
    dt2vba  
    OP
       2019-06-12 15:23:28 +08:00 via Android
    @lockerhyz 非常感谢你的帮助。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3146 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 12:35 · PVG 20:35 · LAX 05:35 · JFK 08:35
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.