协程和通道 在go 中使用比线程更加轻量的协程来完成并发任务。
协程可以理解为轻量级的线程。
在传统的线程中,应用程序和操作系统的线程是一对一的关系,直接调用操作系统的线程开启方法。
而协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的。可以理解为协程是对操作系统的线程进行了复用,而协程本身维护了一些协程的队列,由协程来处理这些任务的执行顺序。
这种方式可以大大减少应用程序执行时的用户态和内核态的转换开销。
使用协程 go 使用go 关键字来开启协程
1 2 3 4 5 6 7 8 9 10 11 12 func TestCoroutine(t *testing.T) { t.Log("开始执行-->") for i := 0; i < 10; i++ { //此方法的参数是传递进去的,值会被拷贝 go func(i int) { t.Logf("执行num %d",i) }(i) } time.Sleep(time.Second * 1) t.Log("执行完毕-->") }
使用锁来控制并发问题 使用锁
go 中也有类似于java 中的 Lock 的方式 来解决多线程下的并发访问问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 var num int = 0 func TestConcurrent(t *testing.T) { var mutex sync.Mutex for i := 0; i < 1000; i++ { go func() { defer func() { mutex.Unlock() }() mutex.Lock() num++ }() } time.Sleep(time.Second * 2) t.Logf("执行完毕--> 最终num= %d",num) }
使用waitGroup 检测代码执行完成 (和java中的CountDownLatch 很像)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 var num int = 0 func TestConcurrent(t *testing.T) { var mutex sync.Mutex var wg sync.WaitGroup for i := 0; i < 1000; i++ { //增加指定的数量 wg.Add(1) go func() { defer func() { mutex.Unlock() }() mutex.Lock() num++ //每次减去1 wg.Done() }() } //线程等待,等wg 为0 的时候继续执行 wg.Wait() t.Logf("执行完毕--> 最终num= %d",num) }
CSP并发机制 CSP模型是上个世纪七十年代提出的,不同于传统的多线程通过共享内存来通信,CSP讲究的“以通信的方式来共享内存”。用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。CSP中 channel 是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的 channel。
Go中 channel 是被单独创建并且可以在进程之间传递,它的通信模式类似于 boss-worker 模式的,一个实体通过将消息发送到 channel 中,然后又监听这个 channel 的实体处理,两个实体之间是匿名的,这个就实现实体中间的解耦,其中 channel 是同步的一个消息被发送到 channel 中,最终是一定要被另外的实体消费掉的,在实现原理上其实类似于一个阻塞的消息队列。
Golang的CSP并发模型,是通过 Goroutine 和 Channel 来实现的。
Goroutine 是Go语言中并发的执行单位,在多个Goroutine使用一个channel 可以来通信,多个Goroutine 连接一个channel 可以向channel中发送消息和获取消息达到通信的目的。
channl 可以理解为一个阻塞队列。此阻塞队列可以是有缓冲的也可以设置成无缓冲的,如果没有缓冲(buffer)那么向channel中写入的消息必须等待被获取到才能返回,如果有缓冲的,那么写着者可以直接返回。 其实是一种生产者 消费者机制。
参考: https://zhuanlan.zhihu.com/p/399205141
通过channel 实现异步返回 使用make 创建一个channel
指定容量
1 2 //表示通道可以存放100个元素 ch := make(chan string,100)
-> 和 <- 表示通道的流向
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 //异步的任务 func AsyncTask() chan string { //创建无缓冲的 retChan := make(chan string) go func() { fmt.Println("task1 before") syncRet := task1() retChan <- syncRet fmt.Println("task1 end") }() return retChan } func task1() string{ time.Sleep(time.Second *1) return "task1 done" } func task2() string { time.Sleep(time.Second *3) return "task2 done" } func TestExec(t *testing.T) { taskChannel := AsyncTask() retTask2 := task2() fmt.Println(<- taskChannel + " task channel") fmt.Println(retTask2) time.Sleep(time.Second *4) }
执行结果:
1 2 3 4 5 6 7 === RUN TestExec task1 before task1 end task1 donetask channel task2 done --- PASS: TestExec (7.03s) PASS
这里因为用的是无缓冲的channel ,所以向channel写入的时候会阻塞,直到 有消费者从channel 中获取数据。
当在创建channel 的时候加上容量参数,参数不为0 的时候既 为异步channel
1 2 retChan := make(chan string,2)
当修改为此后,执行结果为
1 2 3 4 5 6 task1 before task1 end task1 done task channel task2 done --- PASS: TestExec (7.02s) PASS
向带缓冲的channel写入数据的时候,不会被阻塞。
多渠道的选择和超时控制 从不同的并发执行的协程中获取值可以通过关键字 select 来完成。 select 做的就是:选择处理列出的多个通信情况中的一个。
如果都阻塞了,会等待直到其中一个可以处理
如果多个可以处理,随机选择一个
如果没有通道操作可以处理并且写了 default 语句,它就会执行:default
select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。
同时select 配合时间可以实现channel超时机制。
基本语法
如果不加default 那么select 将会循环监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 select { case ret1 := <- ch1 case ret2 := <- ch2 default: }
示例,监听多个channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func TestSelect(t *testing.T) { //创建2个channel numChan1 := make(chan int) numChan2 := make(chan int) go func() { for i := 0; i < 10; i++ { numChan1 <- i time.Sleep(time.Second * 1) } }() go func() { for i := 0; i < 10; i++ { numChan2 <- i time.Sleep(time.Second * 1) } }() //通过select 监听多个channel for { select { case ret_ch1 := <-numChan1: fmt.Printf("ch1 get %d \n", ret_ch1) case ret_ch2 := <-numChan2: fmt.Printf("ch2 get %d \n", ret_ch2) default: fmt.Printf("not get \n") } time.Sleep(time.Second * 1) } }
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 not get ch2 get 0 ch1 get 0 ch2 get 1 ch1 get 1 ch2 get 2 ch1 get 2 ch2 get 3 ch1 get 3 ch2 get 4 ch1 get 4 ch1 get 5 ch2 get 5 ch1 get 6 ch2 get 6 ch1 get 7 ch1 get 8 ch1 get 9 ch2 get 7 ch2 get 8 ch2 get 9 not get not get not get not get not get
分别从ch1 和ch2 中获取结果,当ch1 和ch2 都没有数据后,就执行default。
select 的超时控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func TestSelectTimeOut(t *testing.T) { ch := make(chan int) go func() { time.Sleep(time.Second * 3) ch <- 1 }() select { case ret := <- ch: fmt.Printf("ch get %d \n", ret) case <-time.After(time.Second * 2): fmt.Printf("ch time out") } }
time.After 会返回一个Time 的channel,在指定的时间返回,这里通过时间的channel 与目标的channel哪个先返回,哪个就先执行。
注意:实现超时控制这里不能加default 才能循环获取。time.
channel 的关闭 通过close 函数可以关闭一个channel
close(channel)
当向一个关闭的channel发送数据,或从一个关闭的channel 获取数据的时候,会抛出 panic
为了防止从一个关闭的channel中获取数据的抛出panic ,在从channel 获取数据的时候可以多接收一个是否正常的参数
data,ok -< channel
如果 ok 为true 表示channel是正常的。
当多个消费者都在阻塞在通道获取数据的时候,一旦channel关闭,将立马收到一个ok = false的信息,这个信息将以广播的方式发送,所有的接收者都能收到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 //关闭channel func TestCloseChannel(t *testing.T) { ch := make(chan int) go receiveCha(ch,"receive1") go receiveCha(ch,"receive2") go receiveCha(ch,"receive3") for i := 0; i < 10; i++ { ch <- i } time.Sleep(time.Second * 1) //关闭 close(ch) time.Sleep(time.Second * 3) } func receiveCha(ch chan int,receiveName string) { for { data,ok := <- ch if(!ok){ fmt.Printf(receiveName+"close channel \n") break } fmt.Printf(receiveName +" get data %d \n",data) } }
执行结果 看到都收到了关闭通知
1 2 3 4 5 6 7 8 9 10 11 12 13 receive3 get data 0 receive1 get data 1 receive1 get data 4 receive1 get data 5 receive1 get data 6 receive1 get data 7 receive1 get data 8 receive1 get data 9 receive2 get data 3 receive3 get data 2 receive3close channel receive2close channel receive1close channel
利用channel 取消任务 针循环中执行的任务,可以利用判断channel来决定取消的条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func cycleTask(ch chan int,name string,wg sync.WaitGroup) { //循环任务 for{ if(isCancel(ch)){ fmt.Printf(name + " --> break \n") wg.Done() break } time.Sleep(time.Second * 1) fmt.Printf(name + " --> doing \n") } } //判断是否取消 func isCancel(ch chan int) bool { select { case <- ch: return true default: return false } } //测试取消 func TestCancel(t *testing.T) { ch := make(chan int,0) var wg sync.WaitGroup wg.Add(1) go cycleTask(ch,"task1",wg) wg.Add(1) go cycleTask(ch,"task2",wg) //过2秒后 time.Sleep(time.Second * 2) //发送一个取消信号 ch <- 1 //再发送一个信号 ch <- 1 wg.Wait() }
利用context 取消任务 多个任务之间是有关联关系的,子任务通过父任务创建出来,当取消父任务的时候对应的子任务也被取消,利用此机制可以方便的关闭多个子任务。
根context : 通过 context.Background 创建 子context: 通过 context.WithCancel(parentContext) 创建
接收取消的通知 <- ctx.Done()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func cycleTask2(ctx context.Context,name string,wg sync.WaitGroup) { //循环任务 for{ if(isCancel2(ctx)){ fmt.Printf(name + " --> break \n") wg.Done() break } time.Sleep(time.Second * 1) fmt.Printf(name + " --> doing \n") } } //判断是否取消 func isCancel2(ctx context.Context) bool { select { case <- ctx.Done(): return true default: return false } } func TestContextCancel(t *testing.T) { rootContxt := context.Background() ctx1,cancel1 := context.WithCancel(rootContxt) ctx2,cancel2 := context.WithCancel(rootContxt) var wg sync.WaitGroup wg.Add(1) go cycleTask2(ctx1,"task1",wg) wg.Add(1) go cycleTask2(ctx2,"task2",wg) time.Sleep(time.Second * 2) cancel1() cancel2() wg.Wait() }
仅获取一次 通过监听channel来实现只要有获取的值就返回
注意使用有缓冲的channel ,防止内存泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func taskRun(i int,ch chan string) { time.Sleep(time.Millisecond * 10) ch <- fmt.Sprintf("get form %d",i) } func multiRun() string { ch := make(chan string,10) for i := 0; i < 10; i++ { go taskRun(i,ch) } return <- ch } func TestFirstGet(t *testing.T) { ret := multiRun() fmt.Printf(ret) }
buffer chanenl实现对象池 使用空接口表示任意类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 type Obj interface { } type ObjPool struct { bufChan chan *Obj } func NewObjPool(size int) *ObjPool { var pool ObjPool = ObjPool{} pool.bufChan = make(chan *Obj,size) for i := 0; i < size; i++ { pool.bufChan <- new(Obj) } return &pool } func (pool ObjPool) getObj() (*Obj,error) { select { case ret := <- pool.bufChan: return ret,nil case <- time.After(time.Second *1): return nil,errors.New("get obj timeout") } } func (pool ObjPool) releaseObj(obj *Obj) (bool,error) { bufChan := pool.bufChan if bufChan == nil{ return false,errors.New("not init") } bufChan <- obj return true,nil } func TestObjPool(t *testing.T) { fmt.Printf("obj ==> %x \n",unsafe.Pointer(new(Obj))) fmt.Printf("obj ==> %x \n",unsafe.Pointer(new(Obj))) pool := NewObjPool(8) for i := 0; i < 10; i++ { o,err := pool.getObj() if(err == nil){ fmt.Printf("obj ==> %x \n",unsafe.Pointer(o)) }else{ fmt.Printf("没有了=== \n") } pool.releaseObj(o) } }
sync.Pool 首先sync.Pool是可伸缩的临时对象池,也是并发安全的。其可伸缩的大小会受限于内存的大小,可以理解为是一个存放可重用对象的容器。sync.Pool设计的目的就是用于存放已经分配的但是暂时又不用的对象,而且在需要用到的时候,可以直接从该pool中取。
为了使得可以在多个goroutine中高效的使用并发,sync.Pool会为每个P(对应CPU,这里有点像GMP模型)都分配一个本地池,当执行Get或者Put操作的时候,会先将goroutine和某个P的对象池关联,再对该池进行操作。
每个P的对象池分为私有对象和共享列表对象,私有对象只能被特定的P访问,共享列表对象可以被任何P访问。因为同一时刻一个P只能执行一个goroutine,所以无需加锁,但是对共享列表对象进行操作时,因为可能有多个goroutine同时操作,即并发操作,所以需要加锁。
主要的方法有 PUT 和GET 方法。
PUT 方法的主要逻辑为当对象不为空的时候,查看当前goroutine的private是否设置对象池私有值,如果没有则将put的对象赋值给该私有成员,如果私有成员已经有值了,那么将值追加到共享列表。
GET 方法的主要逻辑是首先从本地对象池中获取对象值,并将对象池中删除掉此值,如果本地对象池获取失败,那么从共享池中获取,并删除。如果共享列表中也获取失败了,那么从其他的processor 的共享池中获取一个并删除。如果从其他的共享池中也获取失败了,那么就使用New 分配一个返回值,这个返回值并不会放到对象池中。
gc 会清除sync.Pool 缓存的对象,所以有很大的可能获取不到缓存的对象
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func TestSyncPool(t *testing.T) { pool := &sync.Pool{New: func() interface{} { t.Log("调用了初始化的方法....") return -1 }} get1 := pool.Get().(int) t.Log(get1) get1 = pool.Get().(int) t.Log(get1) pool.Put(4) get1 = pool.Get().(int) t.Log(get1) get1 = pool.Get().(int) t.Log(get1) pool.Put(1) pool.Put(2) pool.Put(3) get1 = pool.Get().(int) t.Log(get1) get1 = pool.Get().(int) t.Log(get1) get1 = pool.Get().(int) t.Log(get1) } csp_test.go:343: 调用了初始化的方法.... csp_test.go:348: -1 csp_test.go:343: 调用了初始化的方法.... csp_test.go:351: -1 csp_test.go:356: 4 csp_test.go:343: 调用了初始化的方法.... csp_test.go:358: -1 csp_test.go:365: 1 //从私有获取 csp_test.go:367: 3 //共有获取 csp_test.go:369: 2 //共有获取