手撕并发队列(Go版)

手撕并发队列(Go版)

0. 简介

【Java高级】项目亮点实现细节重现!核心逻辑编写思路!手把手教你写!

实现思路主要源于👆的视频,我自己用Go实现了一版,感觉和Java的差异还是挺大的,欢迎大伙批评指正~(我是菜狗

今天下午本来打算去健身房的,但不小心打开了这个视频👀,感觉挺有意思的,就想着自己能不能用Go实现一下,关于这种缓存区思想很早之前就在各大计算机基础学科(计网、操作系统、数据库…)里看到过,也没自己动手玩过…

1. 实践出真知

1.1. v1版本

1.1.1. 实现思路

  1. 创建一个缓存区,即阻塞队列,Go语言的话这部分可以使用channel来完成;
  2. 定时任务取出阻塞队列的任务,校验本批任务购买的总和与库存的大小关系再执行相应的流程;
  3. 返回对应的结果给到用户。

1.1.2. 实现代码

package memory_queue

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

// UserReq 请求
type UserReq struct {
	OrderId string
	UserId  string
	Count   int
}

// Result 响应
type Result struct {
	IsSucceed bool
	Msg       string
}

// ReqPromise 在阻塞队列中的一次完整的请求
type ReqPromise struct {
	Req  *UserReq
	Resp *Result

	// 本次请求是否已经被处理完
	done chan struct{}
}

// queueMaxSize 队列长度
const queueMaxSize = 10

// 10 足够; 6 不足
var stock = 6

// queue channel阻塞队列
var queue = make(chan *ReqPromise, queueMaxSize)

func main() {

	// 启动异步合并线程
	mergeJobs()

	ans := make([]*Result, 0, queueMaxSize)
	wg := &sync.WaitGroup{}
	lock := &sync.Mutex{}
	wg.Add(queueMaxSize)
	for i := 0; i < queueMaxSize; i++ {
		userId := strconv.Itoa(i)
		orderId := strconv.Itoa(i + 100)
		go func() {
			defer func() {
				wg.Done()
				lock.Unlock()
			}()

			// append会有并发问题 所以上锁
			lock.Lock()
			ans = append(ans, operation(&UserReq{orderId, userId, 1}))
		}()
	}

	// 同步等待goroutine跑完
	wg.Wait()

	// 打印结果
	for i := 0; i < len(ans); i++ {
		fmt.Println(ans[i].IsSucceed, ans[i].Msg)
	}
}

// operation 用户库存扣减
func operation(userReq *UserReq) *Result {
	reqPromise := &ReqPromise{Req: userReq, done: make(chan struct{})}

	// 调度请求进入队列
	select {
	case queue <- reqPromise:
		fmt.Printf("%v 成功进入队列\n", reqPromise.Req.UserId)
		// 超时处理
	case <-time.After(100 * time.Millisecond):
		return &Result{false, "系统繁忙,请稍后重试"}
	}

	// 调度请求执行是否完成
	select {
	case <-reqPromise.done:
		fmt.Printf("%v 任务执行完成\n", reqPromise.Req.UserId)
		// 超时处理
	case <-time.After(200 * time.Millisecond):
		return &Result{false, "超时等待,请稍后重试"}
	}

	return reqPromise.Resp
}

// mergeJobs 轮询取出队列中的任务执行扣除存库操作
func mergeJobs() {
	// 异步线程
	go func() {
		list := make([]*ReqPromise, 0)
		for {
			if len(queue) == 0 {
				time.Sleep(10 * time.Millisecond)
				continue
			}

			// 后续还会进行append操作,所以使用该方式保留原cap,减少扩容操作
			list = list[0:0]
			for len(queue) != 0 {
				list = append(list, <-queue)
			}

			// 求和
			sum := 0
			for i := 0; i < len(list); i++ {
				sum += list[i].Req.Count
			}

			// 库存足够
			if sum <= stock {
				stock -= sum

				// 唤醒执行完的线程
				for i := 0; i < len(list); i++ {
					list[i].Resp = &Result{true, fmt.Sprintf("userId:%v orderId:%v 购买成功", list[i].Req.UserId, list[i].Req.OrderId)}

					// 处理完成
					list[i].done <- struct{}{}
				}

				continue
			}

			// 库存不足
			for i := 0; i < len(list); i++ {
				if stock >= list[i].Req.Count {
					stock -= list[i].Req.Count
					list[i].Resp = &Result{true, fmt.Sprintf("userId:%v orderId:%v 购买成功", list[i].Req.UserId, list[i].Req.OrderId)}
				} else {
					list[i].Resp = &Result{false, fmt.Sprintf("userId:%v orderId:%v 库存不足,静待补货", list[i].Req.UserId, list[i].Req.OrderId)}
				}

				// 处理完成
				list[i].done <- struct{}{}
			}
		}
	}()
}

1.1.3. 测试代码

package memory_queue

import (
	"fmt"
	"testing"
	"time"
)

// TestChannel 测试队列阻塞
func TestChannel(t *testing.T) {
	ch := make(chan int, 1)
	ch <- 1
	select {
	case ch <- 1:
		fmt.Println("成功")
	case <-time.After(2 * time.Second):
		fmt.Println("超时")
	}
}

// TestSlice 测试切片清空
func TestSlice(t *testing.T) {
	ints := make([]int, 0, 10)
	ints = append(ints, 123)
	fmt.Println(len(ints), " ", cap(ints))
	for i := 0; i < len(ints); i++ {
		fmt.Println(ints[i])
	}
	ints = ints[0:0]
	fmt.Println(len(ints), " ", cap(ints))
	for i := 0; i < len(ints); i++ {
		fmt.Println(ints[i])
	}
	ints = []int{}
	fmt.Println(len(ints), " ", cap(ints))
}

// 测试并发队列
func Test_main(t *testing.T) {
	main()
}

1.2. v2版本

感觉会有更新,mark!

2. 总结

2.1. v1版本

2.1.1. 超时处理

select {
  case ch <- 1:
  fmt.Println("成功")
  case <-time.After(2 * time.Second):
  fmt.Println("超时")
}

2.1.2. 清空切片

如果清空切片后还需要继续append操作,建议使用如下方式:

// 该方式可以避免数组从0开始扩容,即会保留原cap大小
ints = ints[0:0]

2.1.3. Go的wait&notify

Go没有Java每个对象都自带的wait&notify,但还是可以用channel来搞

我想到的就是给一个done标记,完成了就会通知到goroutine不需要等待了,继续执行下去吧,具体的可以看一下上面完整的代码实现👀

// ReqPromise 在阻塞队列中的一次完整的请求
type ReqPromise struct {
	Req  *UserReq
	Resp *Result

	// 本次请求是否已经被处理完
	done chan struct{}
}

2.2. v2版

感觉会有更新,mark!

3. 参考文献

  1. 【Java高级】项目亮点实现细节重现!核心逻辑编写思路!手把手教你写!
  2. Go并发编程 — sync.Cond
  3. 【Golang】怎样优雅的清空切片

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×