Goroutine编程入门
基于CSP并发模式
CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型
Goroutine
Goroutine可以看做一个轻量的线程,它解决了系统线程启动代价,切换开销大的问题。
因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。
经典小例子 - 生产者消费者
这里我们实现一个生产者消费者问题
var products = 0
// Producer 生产者
func Producer() {
for i := 0; i < 1000000; i++ {
newProducts := products + 1
products = newProducts
}
}
// Consumer 消费者
func Consumer() {
for i := 0; i < 1000000; i++ {
newProducts := products - 1
products = newProducts
}
}
如何让他们同时一起运行呢,只需要使用go
关键字就行了!
go Producer()
go Consumer()
由于每个函数里的获取值和设置值是两步操作,它们的执行顺序也不固定,所以在这计算的中间products
有可能会被另一个函数所更改,比如:
newProducts /* 2 */ := products /* 1 */ + 1 // 生产者
newProducts /* 0 */ := products /* 1 */ - 1 // 消费者
products /* 2 */ = newProducts // 生产者
products /* 0 */ = newProducts // 消费者
所以在这里我们还得做的一件事就是,在一个Goroutine访问资源的时候不允许其他Goroutine访问该资源,也就是加锁。
互斥锁
sync
包提供了异步操作相关的函数,这里也提供了互斥锁sync.Mutex
我们改造一下我们的例子:
import (
"sync"
)
var products = 0
var mutex sync.Mutex
// Producer 生产者
func Producer() {
for i := 0; i < 1000000; i++ {
mutex.Lock()
newProducts := products + 1
products = newProducts
mutex.Unlock()
}
}
// Consumer 消费者
func Consumer() {
for i := 0; i < 1000000; i++ {
mutex.Lock()
newProducts := products - 1
products = newProducts
mutex.Unlock()
}
}
这次我们再同步执行这两个操作,会发现我们的products
稳定为0,所有生产的产品全部正确的消费掉。
原子操作
除了自己加锁以外,我们还可以使用标准的原子操作库sync/atomic
。
import (
"sync/atomic"
)
// Producer 生产者
func Producer() {
for i := 0; i < 10000000; i++ {
atomic.AddInt64(&products, 1)
}
}
// Consumer 消费者
func Consumer() {
for i := 0; i < 10000000; i++ {
atomic.AddInt64(&products, -1)
}
}
使用互斥锁的开销远大于使用原子操作。
经典小例子 - 单例模式
单例模式指全局只存在一个实例,重复实例化只会返回同一个实例。
type OnlyInitOnce struct{}
var (
instance *OnlyInitOnce
initialized uint32
)
func NewInstance() *OnlyInitOnce {
if initialized == 1 {
return instance
}
fmt.Println("init!")
instance = &OnlyInitOnce{}
return &OnlyInitOnce{}
}
考虑以上单例模式,如果按照正常的单线程模式执行,是会按照预期的,全局只会进行一次初始化!但是,如果是在多线程中执行多个实例化操作,则可能会出现多次初始化的操作。
for i := 0; i < 10000; i++ {
go NewInstance()
}
这是因为可能同时取的initialized
都为0,所以我们要为其执行加锁操作,使其线程安全。
互斥锁
func NewInstance() *OnlyInitOnce {
mutex.Lock()
defer mutex.Unlock()
if atomic.LoadUint32(&initialized) == 1 {
return instance
}
atomic.StoreUint32(&initialized, 1)
instance = &OnlyInitOnce{}
fmt.Println("init!")
return instance
}
once
sync
包中存在只允许操作一次的类型 - once
,我们用once
可以简化以上代码:
func NewInstance() *OnlyInitOnce {
once.Do(func() {
fmt.Println("init!")
instance = &OnlyInitOnce{}
})
return instance
}
并且达到同样的目的
Channel
多线程对于普遍的资源竞争,大多采用加锁的方式实现。GO采用的了另一种方法 - 管道(Channel)
Do not communicate by sharing memory; instead, share memory by communicating.
在并发编程时,多线程之间的通信同步是很重要的事情,对于Gotoutine来说,可以使用特有的Channle管道来实现通信(Communicating)。
无缓冲管道
// 定义一个传输字符串的管道
var ch = make(chan string)
然后我们可以在多个Goroutine中使用管道
func WriteData(ch chan string, done chan bool) {
for i := 0; i < 10; i++ {
ch <- "Hello"
}
done <- true
}
func ReadData(ch chan string, done chan bool) {
for i := 0; i < 10; i++ {
input := <-ch
time.Sleep(time.Second)
fmt.Println(input)
}
done <- true
}
开启Goroutine
ch := make(chan string)
done := make(chan bool)
go WriteData(ch, done)
go ReadData(ch, done)
由于没有缓冲,会大概率出现阻塞的情况,也就是当通道中没有数据时,接受端阻塞,通道中有数据时,发送端阻塞。
带缓冲的通道
// 定义一个带缓冲的通道
var ch = make(chan string, 100)
当通道的缓冲没有满时,发送端和接受端都不会阻塞。
在设计算法时首先考虑使用无缓冲通道,只在不确定的情况下使用缓冲。
信号量模式
当处理完一系列任务后,我们可以往通道里发送一个信号量通知任务已完成,而另一个go协程中阻塞的通道将会被继续执行下去。
func main() {
ch := make(chan string)
done := make(chan bool)
go WriteData(ch, done)
go ReadData(ch, done)
<-done
<-done
}
在读写完毕后我们会往done
通道中写入一个true
,没有执行完读写Go协程之前,main函数是阻塞的,只有当写入两次done
通道后,才会执行完毕。
管道处理器
对输入管道进行处理之后发送到输出管道
func processChannel(in <-chan string, out chan<- string) {
for inValue := range in {
result := fmt.Sprintf("-> %s", inValue)
out <- result
}
}