Skip to main content
 首页 » 编程设计

实现同步Go协程的几种方式

2022年07月19日135qq号

实现同步Go协程的几种方式

前面已经提到同步等待组和通道的概念,本文整理几种方式实现同步go协程。

1. 等待单个协程

等待单个协程可以通过通道实现。当执行完成时,工作协程发送消息给正在等待的主协程。代码如下:

package main 
 
import ( 
		"fmt" 
		"time" 
) 
 
func worker(finished chan bool) {
    
	fmt.Println("Worker: Started") 
	time.Sleep(time.Second) 
	fmt.Println("Worker: Finished") 
	finished <- true 
} 
 
func main() {
    
	finished := make(chan bool) 
 
	fmt.Println("Main: Starting worker") 
	go worker(finished) 
 
	fmt.Println("Main: Waiting for worker to finish") 
	<- finished 
	fmt.Println("Main: Completed") 
} 

运行程序输出如下:

Main: Starting worker
Main: Waiting for worker to finish
Worker: Started
Worker: Finished
Main: Completed

2. 生产和消费

一个生产协程,一个消费协程,当生产结束时关闭通道,消费协程一直从通道中取值,直到通道关闭,这里使用通道作为两个协程之间通信工具。

func TestCh(t *testing.T)  {
    
   // 待加工数据 
   data := []int{
   1,2,3,4,5} 
   //生产和消费交互的通道 
   var ch = make(chan int) 
   // 结束标志 
   var exit = make(chan bool) 
 
   // 生产协程 
   go func() {
    
      for i,item := range data{
    
         ch <- item*(i+1) 
      } 
      close(ch) 
   }() 
 
   // 消费协程 
   go func() {
    
      var result []int 
      for  {
    
         item,ok := <- ch 
         if !ok {
    
            break 
         }else{
    
            result = append(result,item) 
         } 
      } 
      for _, d := range result {
    
         fmt.Println(d) 
      } 
      exit <- true 
   }() 
 
   // 阻塞主协程 
   <- exit 
} 

输出结果:

1
4
9
16
25

生产在完成工作时,主动关闭通道。当然前提是生产知道自己什么时间结束,如果生产不知道怎么办,接着往下读。

3. 等待多个协程

如果等待多个协程完成,可以使用同步等待组。WaitGroup 用于等待一组协程都完成,主协程通过Add方法设置等待协程的数量,然后每个工作协程运行,结束时调用Done方法。同时主协程调用Wait方法阻塞程序,直到所有协程都完成任务。

package main 
 
import ( 
		"fmt" 
		"sync" 
		"time" 
) 
 
func worker(wg *sync.WaitGroup, id int) {
    
	defer wg.Done() 
 
	fmt.Printf("Worker %v: Started\n", id) 
	time.Sleep(time.Second) 
	fmt.Printf("Worker %v: Finished\n", id) 
} 
 
func main() {
    
	var wg sync.WaitGroup 
 
	for i := 0; i < 5; i++ {
    
		fmt.Println("Main: Starting worker", i) 
		wg.Add(1) 
		go worker(&wg, i) 
	} 
 
	fmt.Println("Main: Waiting for workers to finish") 
	wg.Wait() 
	fmt.Println("Main: Completed") 
} 

运行输出结果:

Main: Starting worker 0
Main: Starting worker 1
Main: Starting worker 2
Main: Starting worker 3
Main: Starting worker 4
Main: Waiting for workers to finish
Worker 4: Started
Worker 1: Started
Worker 2: Started
Worker 0: Started
Worker 3: Started
Worker 3: Finished
Worker 1: Finished
Worker 4: Finished
Worker 2: Finished
Worker 0: Finished
Main: Completed

4. 综合示例

假设我们有一个任务,包括两个独立工作过程,每个过程执行时间不同,希望能并发执行提升任务执行效率。现在我有是10工作任务,希望这10个任务并行执行。

4.1 过程数据容器

定义WorkData结构体作为容器,用于接收工作过程的结果,这里使用锁避免同时对map进行写操作。

type WorkData struct {
    
   Data map[string] string 
   mux sync.RWMutex 
} 
 
func (s *WorkData) AddData(key, value string) {
    
   s.mux.Lock() 
   defer s.mux.Unlock() 
   if s.Data == nil {
    
      s.Data = make(map[string]string) 
   } 
 
   s.Data[key] = value 
} 
 
func (s *WorkData) GetData() string  {
    
   return s.Data["1"] + "," + s.Data["2"] 
} 

4.2 工作过程实现

workUnit内部启动两个子过程,利用同步等待组实现协同,两者都完成时把结果发给通道。

func workUnit(name string, ch chan string){
    
   var wd = &WorkData{
   } 
 
   var group = sync.WaitGroup{
   } 
   group.Add(2) 
 
   go pro1(&group, wd) 
   go pro2(&group, wd) 
 
   group.Wait() 
 
   ch <- name + ":" + wd.GetData() 
} 
 
func pro1(group *sync.WaitGroup, gData *WorkData) {
    
   defer group.Done() 
   time.Sleep(time.Microsecond * 1) 
 
   gData.AddData("1",strconv.Itoa(rand.Intn(10))) 
} 
 
func pro2(group *sync.WaitGroup, gData *WorkData) {
    
   defer group.Done() 
   time.Sleep(time.Microsecond * 2) 
 
   gData.AddData("2",strconv.Itoa(rand.Intn(10))) 
} 

4.3 生产者

生产者启动10任务,每个任务的名称以 task 开头并加上序号。由于并不知道10个任务什么时间完成,这里并没有关闭通道。

func producer(ch chan string) {
    
   name := "task" 
 
   // 启动10个任务 
   for i := 1; i <= 10; i++ {
    
      go workUnit(name + strconv.Itoa(i), ch) 
   } 
} 

4.4 消费者

消费者通过定义变量 i 判断10个任务是否都执行完毕。全部完成后给finished通道发送true。

func consumer(ch chan string, finished chan bool)  {
    
   // 消费10个任务的返回值 
   var result string 
   i := 0 
   for value := range ch{
    
      result += value + "\n" 
      if i++; i == 10{
    
         break 
      } 
   } 
 
   finished <- true 
   fmt.Println(result) 
} 

4.5 完成测试

func TestTask(t *testing.T) {
    
   var ch = make(chan string, 3) 
   // 结束标志 
   var finished = make(chan bool) 
 
   go producer(ch) 
   go consumer(ch, finished) 
 
   <- finished 
} 

5. 总结

本文介绍了几种同步go协程的方法,针对单个或多个协程,协同生产者和消费者,综合多种情况的协同。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/107689152
阅读延展