消息队列 nsq 简介

消息队列

编程时我们常常会应用到“队列”这一数据结构。在单一程序里应用队列很容易,但当有多个进程,甚至是多个不同机器上的多个进程需要共享一个队列里的数据时,情况就比较复杂。这时不同机器上的不同进程,有的作为生产者推送消息到队列中,有的作为消费者订阅并获取队列中的数据。这时我们就需要一个作为中间件的分布式消息队列。

NSQ是一个Golang实现的开源的分布式消息队列。优点……Go写的东西优点就是轻量易部署,学习门槛相对低点,语言层面支持并发什么的,没用过其它的MQ也不太好对比。不过官方介绍上说,相对于Kafka,NSQ的Feature会少一些,当然学习难度也低点。

项目结构

NSQ的核心组件有三个。从目录结构就能看出来,分别是nsqadmin,nsqd,nsqlookupd。nsqadmin是一套Web端的可视化工具(可有可无),最重要的还是用来提供消息处理和转发的nsqd,和提供服务发现的nsqlookupd。nsqd和nsqlookupd都提供了http和tcp的接口调用。

消息处理逻辑

  • nsqd

首先要引入topic和channel的概念。每条消息都有一个topic,这条消息会被推送给订阅了这个topic的消费者。假设我有若干台机器都订阅了某个topic,并且在被推送消息后做的操作相同,那么这几台机器就构成了一个channel。个人觉得可以理解成给消费者做了一次负载均衡。

关于单个nsqd的处理消息的逻辑,官网给了个动图:

nsqd

nsqd中,信息被复制给对应topics下的所有channel。channel把这个消息分配给隶属于channel的某一个消费者。

nsqd具体实现还要更复杂一些,还要考虑流量控制等等问题,留到下回慢慢研究。

  • nsqlookupd

nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。

那么,nsq怎么知道哪些消费者订阅了topic为“order_created”的消息呢?我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nslookupd所起的作用。

nsqlookupd本质上提供了一个key-value存储的服务,记录了topic,channel,consumer之间的对应关系。

nsq很重要的一点是要支持分布式,官网推荐我们一个生产者对应部署一个nsqd就行了。

应用

官方提供了一个go-nsq直接把通过tcp/http调用的接口封装起来了。通过这个包就能开始在代码里使用nsq了。

试着写了一下Demo。

客户端接受消息:

package main
import (
	"fmt"
	"log"
	"time"
	"github.com/nsqio/go-nsq"
)
type ConsumerT struct{}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("from: ", msg.NSQDAddress, " msg: ", string(msg.Body))
	return nil
}
func RunConsumer() {
	c, err := nsq.NewConsumer("test", "hi", nsq.NewConfig()) // topic为test channel为hi
	if err != nil {
		log.Fatal(err)
	}
	c.AddHandler(&ConsumerT{})
	if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
		log.Fatal(err)
	}
}
func main() {
	RunConsumer()
	for {
		time.Sleep(3 * time.Second)
	}
}

客户端读取终端中的输入,按行发送消息:

package main
import (
    "bufio"
    "log"
	"fmt"
	"os"
	"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func InitProducer(str string) {
	var err error
	producer, err = nsq.NewProducer(str, nsq.NewConfig())
	if err != nil {
		log.Fatal(err)
	}
}
func SendMsg(topic string, msg string) error {
	if producer != nil {
		if msg == "" {
			return nil
		}
		err := producer.Publish(topic, []byte(msg))
		return err
	}
	return fmt.Errorf("Producer is nil")
}
func main() {
	strHost := "127.0.0.1:4150"
	InitProducer(strHost)
	reader := bufio.NewReader(os.Stdin)
	for true {
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			break
		}
		err := SendMsg("test", command)
		if err != nil {
			log.Fatal(err)
		}
	}
	producer.Stop()
}

标签:

更新时间: