Go的消息队列NSQ源码分析
一、前言
实习期间,发现项目很多地方都用到了MQ,之前本来想通过RocketMQ的源码学习的,但无奈RMQ体量有点大,所以打算须循渐进,先从简单的开始,找了一圈Github,就发现了NSQ(主要是因为有我司)

二、NSQ简介
1. 为什么使用MQ
已知的:流程异步、代码解耦、数据的最终一致性
未知的:流量削峰、高吞吐量、高可用广播分发
2. NSQ的优势
-
NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异;
-
NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征;
-
适合小型项目使用,用来学习消息队列实现原理、学习 golang channel知识以及如何用 go 来写分布式
三、NSQ的使用
https://blog.csdn.net/weixin_37717557/article/details/103017512
四、启动流程
1. Start详解
Run()----->Init()----->Start()
// Start
func (p *program) Start() error {
......
// 初始化配置项
options.Resolve(opts, flagSet, cfg)
// 加载历史数据
err = p.nsqd.LoadMetadata()
// 持久化最新数据
err = p.nsqd.PersistMetadata()
signalChan := make(chan os.Signal, 1)
go func() {
// range over all term signals
// we don't want to un-register our sigterm handler which would
// cause default go behavior to apply
for range signalChan {
// once使函数只执行一次的实现
p.once.Do(func() {
p.nsqd.Exit()
})
}
}()
// 注册作用,即系统需要将syscall.SIGTERM通知到signalChan
signal.Notify(signalChan, syscall.SIGTERM)
go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()
return nil
}
其中涉及到了sync.Once,这里有一篇文章感觉讲得不错,Go sync.Once
这里做个简单总结:
- sync.Once提供给函数只执行一次的实现
- sync.Once和init的区别在于sync.Once可以在代码的任意位置初始化和调用,且线程安全,而init则是在package首次加载时执行
- sync.Once的实现原理通过标识变量和互斥锁实现,具体参考上面的文章
2. Main详解
func (n *NSQD) Main() error {
// 退出管道
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
// 仅做一次错误处理
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
// 初始化TCP服务
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
// 初始化HTTP服务
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
// 初始化HTTPS服务
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(n, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
// 循环监控队列信息
n.waitGroup.Wrap(n.queueScanLoop)
// 节点信息监控
n.waitGroup.Wrap(n.lookupLoop)
// 统计信息
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
代码总体看起来比较简单,但其实蕴含的东西值得学习
2.1. Once错误处理
使用sync.Once仅做一次错误处理
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
// 仅做一次错误处理
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
....
err := <-exitCh
return err
2.2. 装饰者包装异步操作
sync.WaitGroup用来等待一组goroutine的完成
你真的会用sync.WaitGroup吗
// 初始化HTTP服务
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
// waitGroup WaitGroupWrapper
n.waitGroup.Wrap(func() {
// 同时使用exitFunc判断服务是否报错
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
.....
n.waitGroup.Wait()
.....
// util
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
五、 TCP服务
1. TCP服务流程
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
.....
for {
// socket accept
clientConn, err := listener.Accept()
.....
}
wg.Add(1)
go func() {
// 执行连接后的处理
handler.Handle(clientConn)
wg.Done()
}()
}
// wait to return until all handler goroutines complete
wg.Wait()
return nil
}
func (p *tcpServer) Handle(conn net.Conn) {
// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)
// 读取4byte的魔数
_, err := io.ReadFull(conn, buf)
.....
protocolMagic := string(buf)
var prot protocol.Protocol
switch protocolMagic {
// V2协议
case " V2":
prot = &protocolV2{nsqd: p.nsqd}
// 其他协议报错
default:
protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL"))
conn.Close()
p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
conn.RemoteAddr(), protocolMagic)
return
}
// ip - client
client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
err = prot.IOLoop(client)
......
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
func (p *protocolV2) IOLoop(c protocol.Client) error {
......
for {
......
params := bytes.Split(line, separatorBytes)
var response []byte
// 接受命令
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
// 返回响应
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
2. 生产者生产消息
通过PUB指令完成
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
......
// 读取长度
bodyLen, err := readLen(client.Reader, client.lenSlice)
......
messageBody := make([]byte, bodyLen)
// 读取内容
_, err = io.ReadFull(client.Reader, messageBody)
......
// 检验身份
if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
return nil, err
}
// 获取topic结构体
topic := p.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
// 消息放到结构体中
err = topic.PutMessage(msg)
......
// 计数
client.PublishedMessage(topicName, 1)
return okBytes, nil
}
- 读取数据的长度
- 读取数据
- 检验身份
- 获取topic结构体
- 发布消息到对应的topic
- topic消息计数
2.1. 获取topic
func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely we already have this topic, so try read lock first
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
}
// 写锁
n.Lock()
// 此时可能另一个线程已经完成topic的放置,所以需要添加一个判断
t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
}
// 删除topic的回调
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, n, deleteCallback)
n.topicMap[topicName] = t
n.Unlock()
// 加载元数据
// if this topic was created while loading metadata at startup don't do any further initialization
// (topic will be "started" after loading completes)
if atomic.LoadInt32(&n.isLoading) == 1 {
return t
}
/* 下面省略从nsqlookupd获取topic信息代码,因为这个nsqd实例可能是新加的机器,所以需要执行nsqlookupd查询 */
return t
}
可以学习到的点:
- 如果已经存在topic就使用读锁,可以并发读取
- 如果不存在,就意味着修改,要使用写锁
- 代码里第二个ok判断很精髓,可以仔细看看
2.2. 新建topic
func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
// 内存队列
memoryMsgChan: nil,
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
nsqd: nsqd,
paused: 0,
pauseChan: make(chan int),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
// 磁盘队列
t.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
opts := nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
}
t.backend = diskqueue.New(
topicName,
nsqd.getOpts().DataPath,
nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
nsqd.getOpts().SyncEvery,
nsqd.getOpts().SyncTimeout,
dqLogf,
)
}
// TODO分析
t.waitGroup.Wrap(t.messagePump)
t.nsqd.Notify(t, !t.ephemeral)
return t
}
2.3. topic事件循环
TODO
255、参考文献
- 走进 NSQ 源码细节
- NSQ源码剖析——简单高性能的MQ实现