消息队列 nsq 简介
消息队列
编程时我们常常会应用到“队列”这一数据结构。在单一程序里应用队列很容易,但当有多个进程,甚至是多个不同机器上的多个进程需要共享一个队列里的数据时,情况就比较复杂。这时不同机器上的不同进程,有的作为生产者推送消息到队列中,有的作为消费者订阅并获取队列中的数据。这时我们就需要一个作为中间件的分布式消息队列。
NSQ是一个Golang实现的开源的分布式消息队列。优点……Go写的东西优点就是轻量易部署,学习门槛相对低点,语言层面支持并发什么的,没用过其它的MQ也不太好对比。不过官方介绍上说,相对于Kafka,NSQ的Feature会少一些,当然学习难度也低点。
项目结构
NSQ的核心组件有三个。从目录结构就能看出来,分别是nsqadmin,nsqd,nsqlookupd。nsqadmin是一套Web端的可视化工具(可有可无),最重要的还是用来提供消息处理和转发的nsqd,和提供服务发现的nsqlookupd。nsqd和nsqlookupd都提供了http和tcp的接口调用。
消息处理逻辑
- nsqd
首先要引入topic和channel的概念。每条消息都有一个topic,这条消息会被推送给订阅了这个topic的消费者。假设我有若干台机器都订阅了某个topic,并且在被推送消息后做的操作相同,那么这几台机器就构成了一个channel。个人觉得可以理解成给消费者做了一次负载均衡。
关于单个nsqd的处理消息的逻辑,官网给了个动图:
nsqd中,信息被复制给对应topics下的所有channel。channel把这个消息分配给隶属于channel的某一个消费者。
nsqd具体实现还要更复杂一些,还要考虑流量控制等等问题,留到下回慢慢研究。
- nsqlookupd
nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。
那么,nsq怎么知道哪些消费者订阅了topic为“order_created”的消息呢?我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nslookupd所起的作用。
nsqlookupd本质上提供了一个key-value存储的服务,记录了topic,channel,consumer之间的对应关系。
nsq很重要的一点是要支持分布式,官网推荐我们一个生产者对应部署一个nsqd就行了。
应用
官方提供了一个go-nsq直接把通过tcp/http调用的接口封装起来了。通过这个包就能开始在代码里使用nsq了。
试着写了一下Demo。
客户端接受消息:
package main
import (
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
type ConsumerT struct{}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("from: ", msg.NSQDAddress, " msg: ", string(msg.Body))
return nil
}
func RunConsumer() {
c, err := nsq.NewConsumer("test", "hi", nsq.NewConfig()) // topic为test channel为hi
if err != nil {
log.Fatal(err)
}
c.AddHandler(&ConsumerT{})
if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
log.Fatal(err)
}
}
func main() {
RunConsumer()
for {
time.Sleep(3 * time.Second)
}
}
客户端读取终端中的输入,按行发送消息:
package main
import (
"bufio"
"log"
"fmt"
"os"
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func InitProducer(str string) {
var err error
producer, err = nsq.NewProducer(str, nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
}
func SendMsg(topic string, msg string) error {
if producer != nil {
if msg == "" {
return nil
}
err := producer.Publish(topic, []byte(msg))
return err
}
return fmt.Errorf("Producer is nil")
}
func main() {
strHost := "127.0.0.1:4150"
InitProducer(strHost)
reader := bufio.NewReader(os.Stdin)
for true {
data, _, _ := reader.ReadLine()
command := string(data)
if command == "stop" {
break
}
err := SendMsg("test", command)
if err != nil {
log.Fatal(err)
}
}
producer.Stop()
}