Go 并发模型
Goroutine 调度器:GMP 模型
Go 的并发建立在 Goroutine 之上——一种用户态轻量级线程,初始栈仅 2KB(可动态增长至 1GB)。调度器采用 GMP 模型管理 Goroutine 的调度:
graph TD
subgraph "GMP 调度模型"
G1["G: Goroutine"] --> P1["P: Processor<br/>本地队列 (256)"]
G2["G"] --> P1
G3["G"] --> P1
P1 --> M1["M: Machine<br/>操作系统线程"]
P2["P: Processor<br/>本地队列"] --> M2["M: Machine"]
M1 --> RUN["运行 G"]
GR["全局队列"] -.->|work stealing| P1
GR -.->|work stealing| P2
end
三个核心组件
| 组件 | 职责 | 数量 |
|---|---|---|
| G(Goroutine) | 用户代码的执行体,包含栈、指令指针等 | 按需创建 |
| M(Machine) | 操作系统线程,真正执行代码 | 按需创建,默认上限 10000 |
| P(Processor) | 调度上下文,持有本地运行队列 | GOMAXPROCS(默认=CPU 核心数) |
调度策略
1. Work Stealing(工作窃取)
当一个 P 的本地队列为空时,它会从其他 P 的本地队列窃取一半的 G:
// P1 本地队列空了
// → 尝试从全局队列取
// → 尝试从其他 P 窃取
// → 都没有则 M 与 P 解绑,M 休眠
2. Hand Off(交接)
当正在运行的 G 进行系统调用阻塞 M 时,P 会与 M 解绑,寻找或创建新的 M 继续运行本地队列中的其他 G:
sequenceDiagram
participant P as Processor
participant M1 as 线程M1
participant G1 as Goroutine G1
participant M2 as 线程M2
M1->>G1: 执行系统调用(阻塞)
Note over P: G1 阻塞了 M1
P->>M1: 解绑
P->>M2: 绑定新线程,继续调度
Note over M1: 系统调用完成
M1->>P: 尝试重新绑定
alt P 有空闲
M1->>P: 绑定,继续执行
else P 全忙
M1->>G1: 放入全局队列
M1->>M1: 休眠
end
3. 抢占式调度
Go 1.14 引入基于信号的异步抢占,解决了早期版本中 Goroutine 长时间占用线程不释放的问题(如死循环)。
Channel 通信
Go 的并发哲学:不要通过共享内存来通信,而要通过通信来共享内存。Channel 是这一哲学的核心实现。
Channel 基础
// 无缓冲 channel:同步通信
ch := make(chan int) // 发送和接收必须同时就绪
// 有缓冲 channel:异步通信
ch := make(chan int, 100) // 缓冲区满时发送阻塞,缓冲区空时接收阻塞
// 发送与接收
ch <- 42 // 发送
value := <-ch // 接收
// 关闭 channel
close(ch) // 关闭后不能再发送,但可以继续接收直到缓冲区为空
Channel 方向
func producer(out chan<- int) { // 只写 channel
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
func consumer(in <-chan int) { // 只读 channel
for v := range in { // range 自动在 channel 关闭时退出
fmt.Println(v)
}
}
select 多路复用
select {
case msg := <-ch1:
fmt.Println("received from ch1:", msg)
case msg := <-ch2:
fmt.Println("received from ch2:", msg)
case ch3 <- 42:
fmt.Println("sent to ch3")
case <-time.After(5 * time.Second):
fmt.Println("timeout")
default:
fmt.Println("no channel ready") // 非阻塞模式
}
sync 包
Mutex 与 RWMutex
var (
mu sync.RWMutex
cache map[string]string
)
func Read(key string) string {
mu.RLock() // 读锁,多个读可并行
defer mu.RUnlock()
return cache[key]
}
func Write(key, value string) {
mu.Lock() // 写锁,独占
defer mu.Unlock()
cache[key] = value
}
WaitGroup
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
doWork(id)
}(i)
}
wg.Wait() // 等待所有 goroutine 完成
sync.Once
var once sync.Once
var instance *Database
func GetDB() *Database {
once.Do(func() {
instance = &Database{}
instance.Connect()
})
return instance
}
sync.Pool
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func handler(w http.ResponseWriter, r *http.Request) {
buf := bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufPool.Put(buf) // 用完归还
}()
// 使用 buf...
}
Context 取消传播
Context 是 Go 中控制 Goroutine 生命周期的标准机制,实现取消传播和超时控制:
flowchart TD
A[context.Background] --> B[WithCancel]
A --> C[WithTimeout 5s]
B --> D[WithCancel<br/>子任务1]
B --> E[子任务2]
C --> F[WithDeadline<br/>子任务3]
B -->|cancel()| G[通知 D, E 取消]
C -->|超时| H[通知 F 取消]
func fetchData(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err // ctx 取消时,Do 返回 context.Canceled
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
func main() {
// 5 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
data, err := fetchData(ctx, "https://api.example.com/data")
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("请求超时")
}
return
}
fmt.Println(data)
}
并发模式实战
Fan-out / Fan-in
func fanOutFanIn(ctx context.Context, input <-chan int, workerCount int) <-chan int {
// Fan-out: 启动多个 worker
workers := make([]<-chan int, workerCount)
for i := 0; i < workerCount; i++ {
workers[i] = worker(ctx, input)
}
// Fan-in: 合并所有 worker 的输出
return merge(ctx, workers...)
}
func worker(ctx context.Context, input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range input {
select {
case out <- process(v):
case <-ctx.Done():
return
}
}
}()
return out
}
Pipeline 模式
// 生成 → 过滤 → 聚合
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func filter(ctx context.Context, in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
}()
return out
}
限流器
// 基于令牌桶的限流
func rateLimiter(ctx context.Context, requests <-chan Request, rate time.Duration) <-chan Response {
limiter := time.NewTicker(rate)
defer limiter.Stop()
out := make(chan Response)
go func() {
defer close(out)
for req := range requests {
select {
case <-limiter.C: // 令牌
out <- handle(req)
case <-ctx.Done():
return
}
}
}()
return out
}
Go 并发的精髓在于:用 Channel 连接 Goroutine,用 Context 控制生命周期,用 select 处理多路事件。掌握这三个原语,就能构建出各种并发模式。
评论