实现同步Go协程的几种方式
前面已经提到同步等待组和通道的概念,本文整理几种方式实现同步go协程。
1. 等待单个协程
等待单个协程可以通过通道实现。当执行完成时,工作协程发送消息给正在等待的主协程。代码如下:
package main
import (
"fmt"
"time"
)
func worker(finished chan bool) {
fmt.Println("Worker: Started")
time.Sleep(time.Second)
fmt.Println("Worker: Finished")
finished <- true
}
func main() {
finished := make(chan bool)
fmt.Println("Main: Starting worker")
go worker(finished)
fmt.Println("Main: Waiting for worker to finish")
<- finished
fmt.Println("Main: Completed")
}
运行程序输出如下:
Main: Starting worker
Main: Waiting for worker to finish
Worker: Started
Worker: Finished
Main: Completed
2. 生产和消费
一个生产协程,一个消费协程,当生产结束时关闭通道,消费协程一直从通道中取值,直到通道关闭,这里使用通道作为两个协程之间通信工具。
func TestCh(t *testing.T) {
// 待加工数据
data := []int{
1,2,3,4,5}
//生产和消费交互的通道
var ch = make(chan int)
// 结束标志
var exit = make(chan bool)
// 生产协程
go func() {
for i,item := range data{
ch <- item*(i+1)
}
close(ch)
}()
// 消费协程
go func() {
var result []int
for {
item,ok := <- ch
if !ok {
break
}else{
result = append(result,item)
}
}
for _, d := range result {
fmt.Println(d)
}
exit <- true
}()
// 阻塞主协程
<- exit
}
输出结果:
1
4
9
16
25
生产在完成工作时,主动关闭通道。当然前提是生产知道自己什么时间结束,如果生产不知道怎么办,接着往下读。
3. 等待多个协程
如果等待多个协程完成,可以使用同步等待组。WaitGroup 用于等待一组协程都完成,主协程通过Add方法设置等待协程的数量,然后每个工作协程运行,结束时调用Done方法。同时主协程调用Wait方法阻塞程序,直到所有协程都完成任务。
package main
import (
"fmt"
"sync"
"time"
)
func worker(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("Worker %v: Started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %v: Finished\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
fmt.Println("Main: Starting worker", i)
wg.Add(1)
go worker(&wg, i)
}
fmt.Println("Main: Waiting for workers to finish")
wg.Wait()
fmt.Println("Main: Completed")
}
运行输出结果:
Main: Starting worker 0
Main: Starting worker 1
Main: Starting worker 2
Main: Starting worker 3
Main: Starting worker 4
Main: Waiting for workers to finish
Worker 4: Started
Worker 1: Started
Worker 2: Started
Worker 0: Started
Worker 3: Started
Worker 3: Finished
Worker 1: Finished
Worker 4: Finished
Worker 2: Finished
Worker 0: Finished
Main: Completed
4. 综合示例
假设我们有一个任务,包括两个独立工作过程,每个过程执行时间不同,希望能并发执行提升任务执行效率。现在我有是10工作任务,希望这10个任务并行执行。
4.1 过程数据容器
定义WorkData结构体作为容器,用于接收工作过程的结果,这里使用锁避免同时对map进行写操作。
type WorkData struct {
Data map[string] string
mux sync.RWMutex
}
func (s *WorkData) AddData(key, value string) {
s.mux.Lock()
defer s.mux.Unlock()
if s.Data == nil {
s.Data = make(map[string]string)
}
s.Data[key] = value
}
func (s *WorkData) GetData() string {
return s.Data["1"] + "," + s.Data["2"]
}
4.2 工作过程实现
workUnit内部启动两个子过程,利用同步等待组实现协同,两者都完成时把结果发给通道。
func workUnit(name string, ch chan string){
var wd = &WorkData{
}
var group = sync.WaitGroup{
}
group.Add(2)
go pro1(&group, wd)
go pro2(&group, wd)
group.Wait()
ch <- name + ":" + wd.GetData()
}
func pro1(group *sync.WaitGroup, gData *WorkData) {
defer group.Done()
time.Sleep(time.Microsecond * 1)
gData.AddData("1",strconv.Itoa(rand.Intn(10)))
}
func pro2(group *sync.WaitGroup, gData *WorkData) {
defer group.Done()
time.Sleep(time.Microsecond * 2)
gData.AddData("2",strconv.Itoa(rand.Intn(10)))
}
4.3 生产者
生产者启动10任务,每个任务的名称以 task 开头并加上序号。由于并不知道10个任务什么时间完成,这里并没有关闭通道。
func producer(ch chan string) {
name := "task"
// 启动10个任务
for i := 1; i <= 10; i++ {
go workUnit(name + strconv.Itoa(i), ch)
}
}
4.4 消费者
消费者通过定义变量 i 判断10个任务是否都执行完毕。全部完成后给finished通道发送true。
func consumer(ch chan string, finished chan bool) {
// 消费10个任务的返回值
var result string
i := 0
for value := range ch{
result += value + "\n"
if i++; i == 10{
break
}
}
finished <- true
fmt.Println(result)
}
4.5 完成测试
func TestTask(t *testing.T) {
var ch = make(chan string, 3)
// 结束标志
var finished = make(chan bool)
go producer(ch)
go consumer(ch, finished)
<- finished
}
5. 总结
本文介绍了几种同步go协程的方法,针对单个或多个协程,协同生产者和消费者,综合多种情况的协同。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/107689152