Goroutine编程入门

Goroutine编程入门

基于CSP并发模式

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型

Goroutine

Goroutine可以看做一个轻量的线程,它解决了系统线程启动代价,切换开销大的问题。

因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。

经典小例子 - 生产者消费者

这里我们实现一个生产者消费者问题

var products = 0 // Producer 生产者 func Producer() { for i := 0; i < 1000000; i++ { newProducts := products + 1 products = newProducts } } // Consumer 消费者 func Consumer() { for i := 0; i < 1000000; i++ { newProducts := products - 1 products = newProducts } }

如何让他们同时一起运行呢,只需要使用go关键字就行了!

go Producer() go Consumer()

由于每个函数里的获取值和设置值是两步操作,它们的执行顺序也不固定,所以在这计算的中间products有可能会被另一个函数所更改,比如:

newProducts /* 2 */ := products /* 1 */ + 1 // 生产者 newProducts /* 0 */ := products /* 1 */ - 1 // 消费者 products /* 2 */ = newProducts // 生产者 products /* 0 */ = newProducts // 消费者

所以在这里我们还得做的一件事就是,在一个Goroutine访问资源的时候不允许其他Goroutine访问该资源,也就是加锁。

互斥锁

sync包提供了异步操作相关的函数,这里也提供了互斥锁sync.Mutex
我们改造一下我们的例子:

import ( "sync" ) var products = 0 var mutex sync.Mutex // Producer 生产者 func Producer() { for i := 0; i < 1000000; i++ { mutex.Lock() newProducts := products + 1 products = newProducts mutex.Unlock() } } // Consumer 消费者 func Consumer() { for i := 0; i < 1000000; i++ { mutex.Lock() newProducts := products - 1 products = newProducts mutex.Unlock() } }

这次我们再同步执行这两个操作,会发现我们的products稳定为0,所有生产的产品全部正确的消费掉。

原子操作

除了自己加锁以外,我们还可以使用标准的原子操作库sync/atomic

import ( "sync/atomic" ) // Producer 生产者 func Producer() { for i := 0; i < 10000000; i++ { atomic.AddInt64(&products, 1) } } // Consumer 消费者 func Consumer() { for i := 0; i < 10000000; i++ { atomic.AddInt64(&products, -1) } }

使用互斥锁的开销远大于使用原子操作。

经典小例子 - 单例模式

单例模式指全局只存在一个实例,重复实例化只会返回同一个实例。

type OnlyInitOnce struct{} var ( instance *OnlyInitOnce initialized uint32 ) func NewInstance() *OnlyInitOnce { if initialized == 1 { return instance } fmt.Println("init!") instance = &OnlyInitOnce{} return &OnlyInitOnce{} }

考虑以上单例模式,如果按照正常的单线程模式执行,是会按照预期的,全局只会进行一次初始化!但是,如果是在多线程中执行多个实例化操作,则可能会出现多次初始化的操作。

for i := 0; i < 10000; i++ {
	go NewInstance()
}

这是因为可能同时取的initialized都为0,所以我们要为其执行加锁操作,使其线程安全。

互斥锁

func NewInstance() *OnlyInitOnce { mutex.Lock() defer mutex.Unlock() if atomic.LoadUint32(&initialized) == 1 { return instance } atomic.StoreUint32(&initialized, 1) instance = &OnlyInitOnce{} fmt.Println("init!") return instance }

once

sync包中存在只允许操作一次的类型 - once,我们用once可以简化以上代码:

func NewInstance() *OnlyInitOnce { once.Do(func() { fmt.Println("init!") instance = &OnlyInitOnce{} }) return instance }

并且达到同样的目的

Channel

多线程对于普遍的资源竞争,大多采用加锁的方式实现。GO采用的了另一种方法 - 管道(Channel)

Do not communicate by sharing memory; instead, share memory by communicating.

在并发编程时,多线程之间的通信同步是很重要的事情,对于Gotoutine来说,可以使用特有的Channle管道来实现通信(Communicating)。

无缓冲管道

// 定义一个传输字符串的管道 var ch = make(chan string)

然后我们可以在多个Goroutine中使用管道

func WriteData(ch chan string, done chan bool) { for i := 0; i < 10; i++ { ch <- "Hello" } done <- true } func ReadData(ch chan string, done chan bool) { for i := 0; i < 10; i++ { input := <-ch time.Sleep(time.Second) fmt.Println(input) } done <- true }

开启Goroutine

ch := make(chan string) done := make(chan bool) go WriteData(ch, done) go ReadData(ch, done)

由于没有缓冲,会大概率出现阻塞的情况,也就是当通道中没有数据时,接受端阻塞,通道中有数据时,发送端阻塞。

带缓冲的通道

// 定义一个带缓冲的通道 var ch = make(chan string, 100)

当通道的缓冲没有满时,发送端和接受端都不会阻塞。

在设计算法时首先考虑使用无缓冲通道,只在不确定的情况下使用缓冲。

信号量模式

当处理完一系列任务后,我们可以往通道里发送一个信号量通知任务已完成,而另一个go协程中阻塞的通道将会被继续执行下去。

func main() { ch := make(chan string) done := make(chan bool) go WriteData(ch, done) go ReadData(ch, done) <-done <-done }

在读写完毕后我们会往done通道中写入一个true,没有执行完读写Go协程之前,main函数是阻塞的,只有当写入两次done通道后,才会执行完毕。

管道处理器

对输入管道进行处理之后发送到输出管道

func processChannel(in <-chan string, out chan<- string) { for inValue := range in { result := fmt.Sprintf("-> %s", inValue) out <- result } }

参考

上一篇 精细控制transition细节 - 实现一个活泼的弹框
下一篇 斐波那切数列的三种实现方法