【NSQ源码分析】(一)NSQ的启动 & TCP服务

Go的消息队列NSQ源码分析

一、前言

实习期间,发现项目很多地方都用到了MQ,之前本来想通过RocketMQ的源码学习的,但无奈RMQ体量有点大,所以打算须循渐进,先从简单的开始,找了一圈Github,就发现了NSQ(主要是因为有我司)

image-20210731144607890

二、NSQ简介

1. 为什么使用MQ

已知的:流程异步、代码解耦、数据的最终一致性

未知的:流量削峰、高吞吐量、高可用广播分发

2. NSQ的优势

  1. NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异;

  2. NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征;

  3. 适合小型项目使用,用来学习消息队列实现原理、学习 golang channel知识以及如何用 go 来写分布式

三、NSQ的使用

https://blog.csdn.net/weixin_37717557/article/details/103017512

四、启动流程

1. Start详解

Run()----->Init()----->Start()

// Start
func (p *program) Start() error {
	......

	// 初始化配置项
	options.Resolve(opts, flagSet, cfg)

	// 加载历史数据
	err = p.nsqd.LoadMetadata()

	// 持久化最新数据
	err = p.nsqd.PersistMetadata()

	signalChan := make(chan os.Signal, 1)
	go func() {
		// range over all term signals
		// we don't want to un-register our sigterm handler which would
		// cause default go behavior to apply
		for range signalChan {
			// once使函数只执行一次的实现
			p.once.Do(func() {
				p.nsqd.Exit()
			})
		}
	}()
	// 注册作用,即系统需要将syscall.SIGTERM通知到signalChan
	signal.Notify(signalChan, syscall.SIGTERM)

	go func() {
		err := p.nsqd.Main()
		if err != nil {
			p.Stop()
			os.Exit(1)
		}
	}()

	return nil
}

其中涉及到了sync.Once,这里有一篇文章感觉讲得不错,Go sync.Once

这里做个简单总结:

  1. sync.Once提供给函数只执行一次的实现
  2. sync.Once和init的区别在于sync.Once可以在代码的任意位置初始化和调用,且线程安全,而init则是在package首次加载时执行
  3. sync.Once的实现原理通过标识变量和互斥锁实现,具体参考上面的文章

2. Main详解

func (n *NSQD) Main() error {
	// 退出管道
	exitCh := make(chan error)
	var once sync.Once
	exitFunc := func(err error) {
		// 仅做一次错误处理
		once.Do(func() {
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}

	// 初始化TCP服务
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})

	// 初始化HTTP服务
	httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
	n.waitGroup.Wrap(func() {
		exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
	})

	// 初始化HTTPS服务
	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
		httpsServer := newHTTPServer(n, true, true)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
		})
	}

	// 循环监控队列信息
	n.waitGroup.Wrap(n.queueScanLoop)

	// 节点信息监控
	n.waitGroup.Wrap(n.lookupLoop)

	// 统计信息
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

代码总体看起来比较简单,但其实蕴含的东西值得学习

2.1. Once错误处理

使用sync.Once仅做一次错误处理

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
    // 仅做一次错误处理
    once.Do(func() {
      if err != nil {
        n.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
}
....
err := <-exitCh
return err

2.2. 装饰者包装异步操作

sync.WaitGroup用来等待一组goroutine的完成

你真的会用sync.WaitGroup吗

// 初始化HTTP服务
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
// waitGroup WaitGroupWrapper
n.waitGroup.Wrap(func() {
  	// 同时使用exitFunc判断服务是否报错
  	exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
.....

n.waitGroup.Wait()

.....

// util
type WaitGroupWrapper struct {
		sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

五、 TCP服务

1. TCP服务流程

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  	.....
  
    for {
      // socket accept
      clientConn, err := listener.Accept()
      .....
    }

      wg.Add(1)
      go func() {
        // 执行连接后的处理
        handler.Handle(clientConn)
        wg.Done()
      }()
    }

    // wait to return until all handler goroutines complete
    wg.Wait()

    return nil
}

func (p *tcpServer) Handle(conn net.Conn) {
    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    buf := make([]byte, 4)
    // 读取4byte的魔数
    _, err := io.ReadFull(conn, buf)
  
   	.....
  
    protocolMagic := string(buf)

    var prot protocol.Protocol
    switch protocolMagic {
    // V2协议
    case "  V2":
      prot = &protocolV2{nsqd: p.nsqd}
    // 其他协议报错
    default:
      protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL"))
      conn.Close()
      p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
        conn.RemoteAddr(), protocolMagic)
      return
    }

    // ip - client
    client := prot.NewClient(conn)
    p.conns.Store(conn.RemoteAddr(), client)

    err = prot.IOLoop(client)
  	......

    p.conns.Delete(conn.RemoteAddr())
    client.Close()
}

func (p *protocolV2) IOLoop(c protocol.Client) error {
  
  ......
	for {
		......
		params := bytes.Split(line, separatorBytes)

		var response []byte
		// 接受命令
		response, err = p.Exec(client, params)
		if err != nil {
			ctx := ""
			if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
				ctx = " - " + parentErr.Error()
			}
			p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

			// 返回响应
			sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
			if sendErr != nil {
				p.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
				break
			}

			// errors of type FatalClientErr should forceably close the connection
			if _, ok := err.(*protocol.FatalClientErr); ok {
				break
			}
			continue
		}

		if response != nil {
			err = p.Send(client, frameTypeResponse, response)
			if err != nil {
				err = fmt.Errorf("failed to send response - %s", err)
				break
			}
		}
	}

	close(client.ExitChan)
	if client.Channel != nil {
		client.Channel.RemoveClient(client.ID)
	}

	return err
}
img

2. 生产者生产消息

通过PUB指令完成

func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    ......
  	
  	// 读取长度
    bodyLen, err := readLen(client.Reader, client.lenSlice)
    ......

    messageBody := make([]byte, bodyLen)
  	// 读取内容
    _, err = io.ReadFull(client.Reader, messageBody)
    ......

  	// 检验身份
    if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
      return nil, err
    }

  	// 获取topic结构体
    topic := p.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
  
  	// 消息放到结构体中
    err = topic.PutMessage(msg)
   	......

  	// 计数
    client.PublishedMessage(topicName, 1)

    return okBytes, nil
}
  1. 读取数据的长度
  2. 读取数据
  3. 检验身份
  4. 获取topic结构体
  5. 发布消息到对应的topic
  6. topic消息计数

2.1. 获取topic

func (n *NSQD) GetTopic(topicName string) *Topic {
	// most likely we already have this topic, so try read lock first
	n.RLock()
	t, ok := n.topicMap[topicName]
	n.RUnlock()
	if ok {
		return t
	}

	// 写锁
	n.Lock()

	// 此时可能另一个线程已经完成topic的放置,所以需要添加一个判断
	t, ok = n.topicMap[topicName]
	if ok {
		n.Unlock()
		return t
	}

	// 删除topic的回调
	deleteCallback := func(t *Topic) {
		n.DeleteExistingTopic(t.name)
	}
	t = NewTopic(topicName, n, deleteCallback)
	n.topicMap[topicName] = t

	n.Unlock()

  // 加载元数据
	// if this topic was created while loading metadata at startup don't do any further initialization
	// (topic will be "started" after loading completes)
	if atomic.LoadInt32(&n.isLoading) == 1 {
		return t
	}

	/* 下面省略从nsqlookupd获取topic信息代码,因为这个nsqd实例可能是新加的机器,所以需要执行nsqlookupd查询 */
  
	return t
}

可以学习到的点:

  1. 如果已经存在topic就使用读锁,可以并发读取
  2. 如果不存在,就意味着修改,要使用写锁
  3. 代码里第二个ok判断很精髓,可以仔细看看

2.2. 新建topic

func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
    t := &Topic{
      name:              topicName,
      channelMap:        make(map[string]*Channel),
      // 内存队列
      memoryMsgChan:     nil,
      startChan:         make(chan int, 1),
      exitChan:          make(chan int),
      channelUpdateChan: make(chan int),
      nsqd:              nsqd,
      paused:            0,
      pauseChan:         make(chan int),
      deleteCallback:    deleteCallback,
      idFactory:         NewGUIDFactory(nsqd.getOpts().ID),
    }
    // create mem-queue only if size > 0 (do not use unbuffered chan)
    if nsqd.getOpts().MemQueueSize > 0 {
      t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
    }
    if strings.HasSuffix(topicName, "#ephemeral") {
      t.ephemeral = true
      // 磁盘队列
      t.backend = newDummyBackendQueue()
    } else {
      dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
        opts := nsqd.getOpts()
        lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
      }
      t.backend = diskqueue.New(
        topicName,
        nsqd.getOpts().DataPath,
        nsqd.getOpts().MaxBytesPerFile,
        int32(minValidMsgLength),
        int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
        nsqd.getOpts().SyncEvery,
        nsqd.getOpts().SyncTimeout,
        dqLogf,
      )
    }

  	// TODO分析
    t.waitGroup.Wrap(t.messagePump)

    t.nsqd.Notify(t, !t.ephemeral)

    return t
}

2.3. topic事件循环

TODO

255、参考文献

  1. 走进 NSQ 源码细节
  2. NSQ源码剖析——简单高性能的MQ实现

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×