:Go 并发编程
Goroutine基础
什么是Goroutine
Goroutine是Go语言的轻量级线程,由Go运行时管理。它们比操作系统线程更轻量,创建成本极低。
创建Goroutine
package main
import (
"fmt"
"time"
)
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("Hello %s! (%d)\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// 普通函数调用(同步)
fmt.Println("=== 同步执行 ===")
sayHello("Alice")
sayHello("Bob")
fmt.Println("\n=== 异步执行 ===")
// 使用go关键字启动goroutine(异步)
go sayHello("Charlie")
go sayHello("David")
// 主goroutine等待一段时间
time.Sleep(500 * time.Millisecond)
fmt.Println("主程序结束")
}
匿名函数Goroutine
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// 启动多个匿名函数goroutine
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d 开始执行\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("Goroutine %d 执行完成\n", id)
}(i) // 传递参数避免闭包陷阱
}
// 等待所有goroutine完成
wg.Wait()
fmt.Println("所有goroutine执行完成")
}
Channel通信
Channel基础
package main
import (
"fmt"
"time"
)
func main() {
// 创建channel
ch := make(chan string)
// 启动goroutine发送数据
go func() {
time.Sleep(1 * time.Second)
ch <- "Hello from goroutine!"
}()
// 主goroutine接收数据
message := <-ch
fmt.Println("收到消息:", message)
}
带缓冲的Channel
package main
import (
"fmt"
"time"
)
func main() {
// 创建带缓冲的channel
ch := make(chan int, 3)
// 发送数据(不会阻塞,因为有缓冲)
ch <- 1
ch <- 2
ch <- 3
fmt.Printf("Channel长度: %d, 容量: %d\n", len(ch), cap(ch))
// 启动goroutine处理数据
go func() {
for i := 0; i < 3; i++ {
value := <-ch
fmt.Printf("处理数据: %d\n", value)
time.Sleep(500 * time.Millisecond)
}
}()
time.Sleep(2 * time.Second)
}
Channel方向
package main
import (
"fmt"
"time"
)
// 只能发送的channel
func sender(ch chan<- string) {
for i := 1; i <= 3; i++ {
message := fmt.Sprintf("消息 %d", i)
ch <- message
fmt.Printf("发送: %s\n", message)
time.Sleep(500 * time.Millisecond)
}
close(ch)
}
// 只能接收的channel
func receiver(ch <-chan string) {
for message := range ch {
fmt.Printf("接收: %s\n", message)
}
fmt.Println("接收完成")
}
func main() {
ch := make(chan string, 2)
go sender(ch)
go receiver(ch)
time.Sleep(3 * time.Second)
}
生产者-消费者模式
单生产者单消费者
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func producer(ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
defer close(ch)
for i := 1; i <= 5; i++ {
product := rand.Intn(100)
ch <- product
fmt.Printf("生产者生产: %d\n", product)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
fmt.Println("生产者完成")
}
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for product := range ch {
fmt.Printf("消费者%d消费: %d\n", id, product)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
fmt.Printf("消费者%d完成\n", id)
}
func main() {
rand.Seed(time.Now().UnixNano())
ch := make(chan int, 2)
var wg sync.WaitGroup
// 启动1个生产者
wg.Add(1)
go producer(ch, &wg)
// 启动2个消费者
wg.Add(2)
go consumer(1, ch, &wg)
go consumer(2, ch, &wg)
wg.Wait()
fmt.Println("所有工作完成")
}
多生产者多消费者
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
Data string
}
func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 3; i++ {
task := Task{
ID: id*100 + i,
Data: fmt.Sprintf("Producer%d-Task%d", id, i),
}
tasks <- task
fmt.Printf("生产者%d生产任务: %+v\n", id, task)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
fmt.Printf("生产者%d完成\n", id)
}
func consumer(id int, tasks <-chan Task, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
// 模拟处理任务
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
result := fmt.Sprintf("消费者%d处理了任务%d: %s", id, task.ID, task.Data)
results <- result
fmt.Printf("消费者%d完成任务%d\n", id, task.ID)
}
fmt.Printf("消费者%d完成\n", id)
}
func main() {
rand.Seed(time.Now().UnixNano())
tasks := make(chan Task, 5)
results := make(chan string, 10)
var producerWg sync.WaitGroup
var consumerWg sync.WaitGroup
// 启动3个生产者
for i := 1; i <= 3; i++ {
producerWg.Add(1)
go producer(i, tasks, &producerWg)
}
// 启动2个消费者
for i := 1; i <= 2; i++ {
consumerWg.Add(1)
go consumer(i, tasks, results, &consumerWg)
}
// 等待所有生产者完成,然后关闭任务channel
go func() {
producerWg.Wait()
close(tasks)
}()
// 等待所有消费者完成,然后关闭结果channel
go func() {
consumerWg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println("结果:", result)
}
fmt.Println("所有工作完成")
}
Select语句
基本Select
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自channel 2"
}()
// select语句会等待多个channel操作
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
}
}
带超时的Select
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "延迟的消息"
}()
select {
case msg := <-ch:
fmt.Println("收到消息:", msg)
case <-time.After(2 * time.Second):
fmt.Println("超时了!")
}
}
非阻塞Select
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
// 非阻塞发送
select {
case ch <- "hello":
fmt.Println("发送成功")
default:
fmt.Println("channel已满,发送失败")
}
// 非阻塞接收
select {
case msg := <-ch:
fmt.Println("收到:", msg)
default:
fmt.Println("channel为空,接收失败")
}
// 启动goroutine发送数据
go func() {
time.Sleep(1 * time.Second)
ch <- "延迟消息"
}()
// 轮询检查
for i := 0; i < 5; i++ {
select {
case msg := <-ch:
fmt.Println("收到:", msg)
return
default:
fmt.Printf("第%d次检查,暂无消息\n", i+1)
time.Sleep(300 * time.Millisecond)
}
}
}
并发模式
Worker Pool模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Data int
}
type Result struct {
JobID int
Result int
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 开始处理 Job %d\n", id, job.ID)
// 模拟工作
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
result := job.Data * 2
results <- Result{
JobID: job.ID,
Result: result,
}
fmt.Printf("Worker %d 完成 Job %d, 结果: %d\n", id, job.ID, result)
}
fmt.Printf("Worker %d 退出\n", id)
}
func main() {
rand.Seed(time.Now().UnixNano())
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送jobs
for i := 1; i <= numJobs; i++ {
jobs <- Job{
ID: i,
Data: rand.Intn(100),
}
}
close(jobs)
// 等待所有worker完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Job %d 的结果: %d\n", result.JobID, result.Result)
}
fmt.Println("所有工作完成")
}
Fan-in模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func producer(name string, out chan<- string) {
for i := 1; i <= 5; i++ {
message := fmt.Sprintf("%s-消息%d", name, i)
out <- message
fmt.Printf("%s 发送: %s\n", name, message)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
close(out)
}
func fanIn(inputs ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
// 为每个输入channel启动一个goroutine
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for msg := range ch {
out <- msg
}
}(input)
}
// 等待所有输入完成后关闭输出channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
rand.Seed(time.Now().UnixNano())
// 创建多个生产者
ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)
go producer("生产者1", ch1)
go producer("生产者2", ch2)
go producer("生产者3", ch3)
// 合并所有channel
merged := fanIn(ch1, ch2, ch3)
// 消费合并后的消息
for message := range merged {
fmt.Printf("消费者收到: %s\n", message)
}
fmt.Println("所有消息处理完成")
}
Fan-out模式
package main
import (
"fmt"
"sync"
"time"
)
func producer(out chan<- int) {
defer close(out)
for i := 1; i <= 10; i++ {
out <- i
fmt.Printf("生产者发送: %d\n", i)
time.Sleep(100 * time.Millisecond)
}
}
func consumer(id int, in <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range in {
fmt.Printf("消费者%d处理: %d\n", id, data)
time.Sleep(500 * time.Millisecond)
}
fmt.Printf("消费者%d完成\n", id)
}
func fanOut(in <-chan int, numConsumers int) {
var wg sync.WaitGroup
// 启动多个消费者
for i := 1; i <= numConsumers; i++ {
wg.Add(1)
go consumer(i, in, &wg)
}
wg.Wait()
}
func main() {
data := make(chan int)
// 启动生产者
go producer(data)
// 启动多个消费者处理数据
fanOut(data, 3)
fmt.Println("所有处理完成")
}
同步原语
Mutex互斥锁
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
// 启动多个goroutine并发增加计数
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
counter.Increment()
}
fmt.Printf("Goroutine %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Printf("最终计数: %d\n", counter.Value())
}
RWMutex读写锁
package main
import (
"fmt"
"sync"
"time"
)
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string]int),
}
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
fmt.Printf("设置 %s = %d\n", key, value)
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
value, ok := sm.data[key]
fmt.Printf("读取 %s = %d\n", key, value)
return value, ok
}
func main() {
safeMap := NewSafeMap()
var wg sync.WaitGroup
// 启动写入goroutine
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
key := fmt.Sprintf("key%d", id*5+j)
safeMap.Set(key, id*5+j)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 启动读取goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
key := fmt.Sprintf("key%d", j)
safeMap.Get(key)
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Println("所有操作完成")
}
Once确保只执行一次
package main
import (
"fmt"
"sync"
)
var (
instance *Singleton
once sync.Once
)
type Singleton struct {
data string
}
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("创建单例实例")
instance = &Singleton{
data: "单例数据",
}
})
return instance
}
func main() {
var wg sync.WaitGroup
// 启动多个goroutine获取单例
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
singleton := GetInstance()
fmt.Printf("Goroutine %d 获取到实例: %p, 数据: %s\n",
id, singleton, singleton.data)
}(i)
}
wg.Wait()
}
总结
本课我们学习了Go语言的并发编程:
- Goroutine:轻量级线程,Go并发的基础
- Channel:goroutine间通信的管道
- Select:多路复用,处理多个channel操作
- 并发模式:生产者-消费者、Worker Pool、Fan-in/Fan-out
- 同步原语:Mutex、RWMutex、Once等
下一课预告
在下一课中,我们将学习Go语言的Web开发,包括:
- HTTP服务器开发
- 路由和中间件
- JSON处理
- 模板引擎
💡 小贴士:Go的并发模型基于CSP(Communicating Sequential Processes),提倡"不要通过共享内存来通信,而要通过通信来共享内存"。Channel是实现这一理念的核心工具。
📚 文章对你有帮助?请关注我的公众号,万分感谢!
获取更多优质技术文章,第一时间掌握最新技术动态

关注公众号
第一时间获取最新技术文章

添加微信
技术交流 · 问题答疑 · 学习指导
评论讨论
欢迎留下你的想法和建议