第6课:Go 并发编程

【腾讯云】语音识别准确率高,支持多语种,多场景,限时特惠,最低14.9元起

推广

【腾讯云】语音识别准确率高,支持多语种,多场景,限时特惠,最低14.9元起

:Go 并发编程

Goroutine基础

什么是Goroutine

Goroutine是Go语言的轻量级线程,由Go运行时管理。它们比操作系统线程更轻量,创建成本极低。

创建Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("Hello %s! (%d)\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 普通函数调用(同步)
    fmt.Println("=== 同步执行 ===")
    sayHello("Alice")
    sayHello("Bob")
    
    fmt.Println("\n=== 异步执行 ===")
    // 使用go关键字启动goroutine(异步)
    go sayHello("Charlie")
    go sayHello("David")
    
    // 主goroutine等待一段时间
    time.Sleep(500 * time.Millisecond)
    fmt.Println("主程序结束")
}

匿名函数Goroutine

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    
    // 启动多个匿名函数goroutine
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 开始执行\n", id)
            time.Sleep(time.Duration(id*100) * time.Millisecond)
            fmt.Printf("Goroutine %d 执行完成\n", id)
        }(i) // 传递参数避免闭包陷阱
    }
    
    // 等待所有goroutine完成
    wg.Wait()
    fmt.Println("所有goroutine执行完成")
}

Channel通信

Channel基础

package main

import (
    "fmt"
    "time"
)

func main() {
    // 创建channel
    ch := make(chan string)
    
    // 启动goroutine发送数据
    go func() {
        time.Sleep(1 * time.Second)
        ch <- "Hello from goroutine!"
    }()
    
    // 主goroutine接收数据
    message := <-ch
    fmt.Println("收到消息:", message)
}

带缓冲的Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    // 创建带缓冲的channel
    ch := make(chan int, 3)
    
    // 发送数据(不会阻塞,因为有缓冲)
    ch <- 1
    ch <- 2
    ch <- 3
    
    fmt.Printf("Channel长度: %d, 容量: %d\n", len(ch), cap(ch))
    
    // 启动goroutine处理数据
    go func() {
        for i := 0; i < 3; i++ {
            value := <-ch
            fmt.Printf("处理数据: %d\n", value)
            time.Sleep(500 * time.Millisecond)
        }
    }()
    
    time.Sleep(2 * time.Second)
}

Channel方向

package main

import (
    "fmt"
    "time"
)

// 只能发送的channel
func sender(ch chan<- string) {
    for i := 1; i <= 3; i++ {
        message := fmt.Sprintf("消息 %d", i)
        ch <- message
        fmt.Printf("发送: %s\n", message)
        time.Sleep(500 * time.Millisecond)
    }
    close(ch)
}

// 只能接收的channel
func receiver(ch <-chan string) {
    for message := range ch {
        fmt.Printf("接收: %s\n", message)
    }
    fmt.Println("接收完成")
}

func main() {
    ch := make(chan string, 2)
    
    go sender(ch)
    go receiver(ch)
    
    time.Sleep(3 * time.Second)
}

生产者-消费者模式

单生产者单消费者

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func producer(ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(ch)
    
    for i := 1; i <= 5; i++ {
        product := rand.Intn(100)
        ch <- product
        fmt.Printf("生产者生产: %d\n", product)
        time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
    }
    fmt.Println("生产者完成")
}

func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for product := range ch {
        fmt.Printf("消费者%d消费: %d\n", id, product)
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }
    fmt.Printf("消费者%d完成\n", id)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    ch := make(chan int, 2)
    var wg sync.WaitGroup
    
    // 启动1个生产者
    wg.Add(1)
    go producer(ch, &wg)
    
    // 启动2个消费者
    wg.Add(2)
    go consumer(1, ch, &wg)
    go consumer(2, ch, &wg)
    
    wg.Wait()
    fmt.Println("所有工作完成")
}

多生产者多消费者

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Task struct {
    ID   int
    Data string
}

func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for i := 1; i <= 3; i++ {
        task := Task{
            ID:   id*100 + i,
            Data: fmt.Sprintf("Producer%d-Task%d", id, i),
        }
        tasks <- task
        fmt.Printf("生产者%d生产任务: %+v\n", id, task)
        time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
    }
    fmt.Printf("生产者%d完成\n", id)
}

func consumer(id int, tasks <-chan Task, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        // 模拟处理任务
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        result := fmt.Sprintf("消费者%d处理了任务%d: %s", id, task.ID, task.Data)
        results <- result
        fmt.Printf("消费者%d完成任务%d\n", id, task.ID)
    }
    fmt.Printf("消费者%d完成\n", id)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    tasks := make(chan Task, 5)
    results := make(chan string, 10)
    
    var producerWg sync.WaitGroup
    var consumerWg sync.WaitGroup
    
    // 启动3个生产者
    for i := 1; i <= 3; i++ {
        producerWg.Add(1)
        go producer(i, tasks, &producerWg)
    }
    
    // 启动2个消费者
    for i := 1; i <= 2; i++ {
        consumerWg.Add(1)
        go consumer(i, tasks, results, &consumerWg)
    }
    
    // 等待所有生产者完成,然后关闭任务channel
    go func() {
        producerWg.Wait()
        close(tasks)
    }()
    
    // 等待所有消费者完成,然后关闭结果channel
    go func() {
        consumerWg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Println("结果:", result)
    }
    
    fmt.Println("所有工作完成")
}

Select语句

基本Select

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自channel 1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自channel 2"
    }()
    
    // select语句会等待多个channel操作
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("收到:", msg1)
        case msg2 := <-ch2:
            fmt.Println("收到:", msg2)
        }
    }
}

带超时的Select

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(3 * time.Second)
        ch <- "延迟的消息"
    }()
    
    select {
    case msg := <-ch:
        fmt.Println("收到消息:", msg)
    case <-time.After(2 * time.Second):
        fmt.Println("超时了!")
    }
}

非阻塞Select

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    // 非阻塞发送
    select {
    case ch <- "hello":
        fmt.Println("发送成功")
    default:
        fmt.Println("channel已满,发送失败")
    }
    
    // 非阻塞接收
    select {
    case msg := <-ch:
        fmt.Println("收到:", msg)
    default:
        fmt.Println("channel为空,接收失败")
    }
    
    // 启动goroutine发送数据
    go func() {
        time.Sleep(1 * time.Second)
        ch <- "延迟消息"
    }()
    
    // 轮询检查
    for i := 0; i < 5; i++ {
        select {
        case msg := <-ch:
            fmt.Println("收到:", msg)
            return
        default:
            fmt.Printf("第%d次检查,暂无消息\n", i+1)
            time.Sleep(300 * time.Millisecond)
        }
    }
}

并发模式

Worker Pool模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data int
}

type Result struct {
    JobID  int
    Result int
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d 开始处理 Job %d\n", id, job.ID)
        
        // 模拟工作
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        result := job.Data * 2
        
        results <- Result{
            JobID:  job.ID,
            Result: result,
        }
        
        fmt.Printf("Worker %d 完成 Job %d, 结果: %d\n", id, job.ID, result)
    }
    
    fmt.Printf("Worker %d 退出\n", id)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // 发送jobs
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{
            ID:   i,
            Data: rand.Intn(100),
        }
    }
    close(jobs)
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("Job %d 的结果: %d\n", result.JobID, result.Result)
    }
    
    fmt.Println("所有工作完成")
}

Fan-in模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func producer(name string, out chan<- string) {
    for i := 1; i <= 5; i++ {
        message := fmt.Sprintf("%s-消息%d", name, i)
        out <- message
        fmt.Printf("%s 发送: %s\n", name, message)
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }
    close(out)
}

func fanIn(inputs ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup
    
    // 为每个输入channel启动一个goroutine
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan string) {
            defer wg.Done()
            for msg := range ch {
                out <- msg
            }
        }(input)
    }
    
    // 等待所有输入完成后关闭输出channel
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    // 创建多个生产者
    ch1 := make(chan string)
    ch2 := make(chan string)
    ch3 := make(chan string)
    
    go producer("生产者1", ch1)
    go producer("生产者2", ch2)
    go producer("生产者3", ch3)
    
    // 合并所有channel
    merged := fanIn(ch1, ch2, ch3)
    
    // 消费合并后的消息
    for message := range merged {
        fmt.Printf("消费者收到: %s\n", message)
    }
    
    fmt.Println("所有消息处理完成")
}

Fan-out模式

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(out chan<- int) {
    defer close(out)
    
    for i := 1; i <= 10; i++ {
        out <- i
        fmt.Printf("生产者发送: %d\n", i)
        time.Sleep(100 * time.Millisecond)
    }
}

func consumer(id int, in <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for data := range in {
        fmt.Printf("消费者%d处理: %d\n", id, data)
        time.Sleep(500 * time.Millisecond)
    }
    
    fmt.Printf("消费者%d完成\n", id)
}

func fanOut(in <-chan int, numConsumers int) {
    var wg sync.WaitGroup
    
    // 启动多个消费者
    for i := 1; i <= numConsumers; i++ {
        wg.Add(1)
        go consumer(i, in, &wg)
    }
    
    wg.Wait()
}

func main() {
    data := make(chan int)
    
    // 启动生产者
    go producer(data)
    
    // 启动多个消费者处理数据
    fanOut(data, 3)
    
    fmt.Println("所有处理完成")
}

同步原语

Mutex互斥锁

package main

import (
    "fmt"
    "sync"
    "time"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    // 启动多个goroutine并发增加计数
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Increment()
            }
            fmt.Printf("Goroutine %d 完成\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("最终计数: %d\n", counter.Value())
}

RWMutex读写锁

package main

import (
    "fmt"
    "sync"
    "time"
)

type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]int),
    }
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
    fmt.Printf("设置 %s = %d\n", key, value)
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    value, ok := sm.data[key]
    fmt.Printf("读取 %s = %d\n", key, value)
    return value, ok
}

func main() {
    safeMap := NewSafeMap()
    var wg sync.WaitGroup
    
    // 启动写入goroutine
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                key := fmt.Sprintf("key%d", id*5+j)
                safeMap.Set(key, id*5+j)
                time.Sleep(100 * time.Millisecond)
            }
        }(i)
    }
    
    // 启动读取goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                key := fmt.Sprintf("key%d", j)
                safeMap.Get(key)
                time.Sleep(50 * time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("所有操作完成")
}

Once确保只执行一次

package main

import (
    "fmt"
    "sync"
)

var (
    instance *Singleton
    once     sync.Once
)

type Singleton struct {
    data string
}

func GetInstance() *Singleton {
    once.Do(func() {
        fmt.Println("创建单例实例")
        instance = &Singleton{
            data: "单例数据",
        }
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    // 启动多个goroutine获取单例
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            singleton := GetInstance()
            fmt.Printf("Goroutine %d 获取到实例: %p, 数据: %s\n", 
                id, singleton, singleton.data)
        }(i)
    }
    
    wg.Wait()
}

总结

本课我们学习了Go语言的并发编程:

  1. Goroutine:轻量级线程,Go并发的基础
  2. Channel:goroutine间通信的管道
  3. Select:多路复用,处理多个channel操作
  4. 并发模式:生产者-消费者、Worker Pool、Fan-in/Fan-out
  5. 同步原语:Mutex、RWMutex、Once等

下一课预告

在下一课中,我们将学习Go语言的Web开发,包括:

  • HTTP服务器开发
  • 路由和中间件
  • JSON处理
  • 模板引擎

💡 小贴士:Go的并发模型基于CSP(Communicating Sequential Processes),提倡"不要通过共享内存来通信,而要通过通信来共享内存"。Channel是实现这一理念的核心工具。

Vue3 + TypeScript 企业级项目实战

课程推荐

Vue3 + TypeScript 企业级项目实战
Python 全栈开发工程师培训

热门课程

Python 全栈开发工程师培训

📚 文章对你有帮助?请关注我的公众号,万分感谢!

获取更多优质技术文章,第一时间掌握最新技术动态

关注公众号

关注公众号

第一时间获取最新技术文章

添加微信

添加微信

技术交流 · 问题答疑 · 学习指导

评论讨论

欢迎留下你的想法和建议