手撕并发队列(Go版)
0. 简介
【Java高级】项目亮点实现细节重现!核心逻辑编写思路!手把手教你写!
实现思路主要源于👆的视频,我自己用Go实现了一版,感觉和Java的差异还是挺大的,欢迎大伙批评指正~(我是菜狗
今天下午本来打算去健身房的,但不小心打开了这个视频👀,感觉挺有意思的,就想着自己能不能用Go实现一下,关于这种缓存区思想很早之前就在各大计算机基础学科(计网、操作系统、数据库…)里看到过,也没自己动手玩过…
1. 实践出真知
1.1. v1版本
1.1.1. 实现思路
- 创建一个缓存区,即阻塞队列,Go语言的话这部分可以使用channel来完成;
- 定时任务取出阻塞队列的任务,校验本批任务购买的总和与库存的大小关系再执行相应的流程;
- 返回对应的结果给到用户。
1.1.2. 实现代码
package memory_queue
import (
"fmt"
"strconv"
"sync"
"time"
)
// UserReq 请求
type UserReq struct {
OrderId string
UserId string
Count int
}
// Result 响应
type Result struct {
IsSucceed bool
Msg string
}
// ReqPromise 在阻塞队列中的一次完整的请求
type ReqPromise struct {
Req *UserReq
Resp *Result
// 本次请求是否已经被处理完
done chan struct{}
}
// queueMaxSize 队列长度
const queueMaxSize = 10
// 10 足够; 6 不足
var stock = 6
// queue channel阻塞队列
var queue = make(chan *ReqPromise, queueMaxSize)
func main() {
// 启动异步合并线程
mergeJobs()
ans := make([]*Result, 0, queueMaxSize)
wg := &sync.WaitGroup{}
lock := &sync.Mutex{}
wg.Add(queueMaxSize)
for i := 0; i < queueMaxSize; i++ {
userId := strconv.Itoa(i)
orderId := strconv.Itoa(i + 100)
go func() {
defer func() {
wg.Done()
lock.Unlock()
}()
// append会有并发问题 所以上锁
lock.Lock()
ans = append(ans, operation(&UserReq{orderId, userId, 1}))
}()
}
// 同步等待goroutine跑完
wg.Wait()
// 打印结果
for i := 0; i < len(ans); i++ {
fmt.Println(ans[i].IsSucceed, ans[i].Msg)
}
}
// operation 用户库存扣减
func operation(userReq *UserReq) *Result {
reqPromise := &ReqPromise{Req: userReq, done: make(chan struct{})}
// 调度请求进入队列
select {
case queue <- reqPromise:
fmt.Printf("%v 成功进入队列\n", reqPromise.Req.UserId)
// 超时处理
case <-time.After(100 * time.Millisecond):
return &Result{false, "系统繁忙,请稍后重试"}
}
// 调度请求执行是否完成
select {
case <-reqPromise.done:
fmt.Printf("%v 任务执行完成\n", reqPromise.Req.UserId)
// 超时处理
case <-time.After(200 * time.Millisecond):
return &Result{false, "超时等待,请稍后重试"}
}
return reqPromise.Resp
}
// mergeJobs 轮询取出队列中的任务执行扣除存库操作
func mergeJobs() {
// 异步线程
go func() {
list := make([]*ReqPromise, 0)
for {
if len(queue) == 0 {
time.Sleep(10 * time.Millisecond)
continue
}
// 后续还会进行append操作,所以使用该方式保留原cap,减少扩容操作
list = list[0:0]
for len(queue) != 0 {
list = append(list, <-queue)
}
// 求和
sum := 0
for i := 0; i < len(list); i++ {
sum += list[i].Req.Count
}
// 库存足够
if sum <= stock {
stock -= sum
// 唤醒执行完的线程
for i := 0; i < len(list); i++ {
list[i].Resp = &Result{true, fmt.Sprintf("userId:%v orderId:%v 购买成功", list[i].Req.UserId, list[i].Req.OrderId)}
// 处理完成
list[i].done <- struct{}{}
}
continue
}
// 库存不足
for i := 0; i < len(list); i++ {
if stock >= list[i].Req.Count {
stock -= list[i].Req.Count
list[i].Resp = &Result{true, fmt.Sprintf("userId:%v orderId:%v 购买成功", list[i].Req.UserId, list[i].Req.OrderId)}
} else {
list[i].Resp = &Result{false, fmt.Sprintf("userId:%v orderId:%v 库存不足,静待补货", list[i].Req.UserId, list[i].Req.OrderId)}
}
// 处理完成
list[i].done <- struct{}{}
}
}
}()
}
1.1.3. 测试代码
package memory_queue
import (
"fmt"
"testing"
"time"
)
// TestChannel 测试队列阻塞
func TestChannel(t *testing.T) {
ch := make(chan int, 1)
ch <- 1
select {
case ch <- 1:
fmt.Println("成功")
case <-time.After(2 * time.Second):
fmt.Println("超时")
}
}
// TestSlice 测试切片清空
func TestSlice(t *testing.T) {
ints := make([]int, 0, 10)
ints = append(ints, 123)
fmt.Println(len(ints), " ", cap(ints))
for i := 0; i < len(ints); i++ {
fmt.Println(ints[i])
}
ints = ints[0:0]
fmt.Println(len(ints), " ", cap(ints))
for i := 0; i < len(ints); i++ {
fmt.Println(ints[i])
}
ints = []int{}
fmt.Println(len(ints), " ", cap(ints))
}
// 测试并发队列
func Test_main(t *testing.T) {
main()
}
1.2. v2版本
感觉会有更新,mark!
2. 总结
2.1. v1版本
2.1.1. 超时处理
select {
case ch <- 1:
fmt.Println("成功")
case <-time.After(2 * time.Second):
fmt.Println("超时")
}
2.1.2. 清空切片
如果清空切片后还需要继续append
操作,建议使用如下方式:
// 该方式可以避免数组从0开始扩容,即会保留原cap大小
ints = ints[0:0]
2.1.3. Go的wait¬ify
Go没有Java每个对象都自带的wait¬ify,但还是可以用channel来搞
我想到的就是给一个done标记,完成了就会通知到goroutine不需要等待了,继续执行下去吧,具体的可以看一下上面完整的代码实现👀
// ReqPromise 在阻塞队列中的一次完整的请求
type ReqPromise struct {
Req *UserReq
Resp *Result
// 本次请求是否已经被处理完
done chan struct{}
}
2.2. v2版
感觉会有更新,mark!
3. 参考文献
- 【Java高级】项目亮点实现细节重现!核心逻辑编写思路!手把手教你写!
- Go并发编程 — sync.Cond
- 【Golang】怎样优雅的清空切片