Skip to main content
 首页 » 编程设计

go语言学习--channel的关闭

2022年07月19日138java哥

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要在多个并发发送端中关闭channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。
(我们将会称上面的原则为channel closing principle)

保持channel closing principle的优雅方案

channel closing principle要求我们只能在发送端进行channel的关闭,对于日常遇到的可以归结为三类

1、m个receivers,一个sender.

2、一个receiver,n个sender

3、m个receivers,n个sender

1、m个receivers,一个sender

M个receivers,一个sender,sender通过关闭data channel说“不再发送”

这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:

 1 package main 
 2  
 3 import ( 
 4     "time" 
 5     "math/rand" 
 6     "sync" 
 7     "log" 
 8 ) 
 9  
10 func main() { 
11     rand.Seed(time.Now().UnixNano()) 
12     log.SetFlags(0) 
13  
14     // ... 
15     const MaxRandomNumber = 100000 
16     const NumReceivers = 100 
17  
18     wgReceivers := sync.WaitGroup{} 
19     wgReceivers.Add(NumReceivers) 
20  
21     // ... 
22     dataCh := make(chan int, 100) 
23  
24     // the sender 
25     go func() { 
26         for { 
27             if value := rand.Intn(MaxRandomNumber); value == 0 { 
28                 // the only sender can close the channel safely. 
29                 close(dataCh) 
30                 return 
31             } else {             
32                 dataCh <- value 
33             } 
34         } 
35     }() 
36  
37     // receivers 
38     for i := 0; i < NumReceivers; i++ { 
39         go func() { 
40             defer wgReceivers.Done() 
41  
42             // receive values until dataCh is closed and 
43             // the value buffer queue of dataCh is empty. 
44             for value := range dataCh { 
45                 log.Println(value) 
46             } 
47         }() 
48     } 
49  
50     wgReceivers.Wait() 
51 }

2、一个receiver,n个senders

      一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送”
这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:

 1 package main 
 2  
 3 import ( 
 4     "time" 
 5     "math/rand" 
 6     "sync" 
 7     "log" 
 8 ) 
 9  
10 func main() { 
11     rand.Seed(time.Now().UnixNano()) 
12     log.SetFlags(0) 
13  
14     // ... 
15     const MaxRandomNumber = 100000 
16     const NumSenders = 1000 
17  
18     wgReceivers := sync.WaitGroup{} 
19     wgReceivers.Add(1) 
20  
21     // ... 
22     dataCh := make(chan int, 100) 
23     stopCh := make(chan struct{}) 
24         // stopCh is an additional signal channel. 
25         // Its sender is the receiver of channel dataCh. 
26         // Its reveivers are the senders of channel dataCh. 
27  
28     // senders 
29     for i := 0; i < NumSenders; i++ { 
30         go func() { 
31             for { 
32                 value := rand.Intn(MaxRandomNumber) 
33  
34                 select { 
35                 case <- stopCh: 
36                     return 
37                 case dataCh <- value: 
38                 } 
39             } 
40         }() 
41     } 
42  
43     // the receiver 
44     go func() { 
45         defer wgReceivers.Done() 
46  
47         for value := range dataCh { 
48             if value == MaxRandomNumber-1 { 
49                 // the receiver of the dataCh channel is 
50                 // also the sender of the stopCh cahnnel. 
51                 // It is safe to close the stop channel here. 
52                 close(stopCh) 
53                 return 
54             } 
55  
56             log.Println(value) 
57         } 
58     }() 
59  
60     // ... 
61     wgReceivers.Wait() 
62 }

3、m个receivers,n个sender

M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”
这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:

  1 package main 
  2  
  3 import ( 
  4     "time" 
  5     "math/rand" 
  6     "sync" 
  7     "log" 
  8     "strconv" 
  9 ) 
 10  
 11 func main() { 
 12     rand.Seed(time.Now().UnixNano()) 
 13     log.SetFlags(0) 
 14  
 15     // ... 
 16     const MaxRandomNumber = 100000 
 17     const NumReceivers = 10 
 18     const NumSenders = 1000 
 19  
 20     wgReceivers := sync.WaitGroup{} 
 21     wgReceivers.Add(NumReceivers) 
 22  
 23     // ... 
 24     dataCh := make(chan int, 100) 
 25     stopCh := make(chan struct{}) 
 26         // stopCh is an additional signal channel. 
 27         // Its sender is the moderator goroutine shown below. 
 28         // Its reveivers are all senders and receivers of dataCh. 
 29     toStop := make(chan string, 1) 
 30         // the channel toStop is used to notify the moderator 
 31         // to close the additional signal channel (stopCh). 
 32         // Its senders are any senders and receivers of dataCh. 
 33         // Its reveiver is the moderator goroutine shown below. 
 34  
 35     var stoppedBy string 
 36  
 37     // moderator 
 38     go func() { 
 39         stoppedBy = <- toStop // part of the trick used to notify the moderator 
 40                               // to close the additional signal channel. 
 41         close(stopCh) 
 42     }() 
 43  
 44     // senders 
 45     for i := 0; i < NumSenders; i++ { 
 46         go func(id string) { 
 47             for { 
 48                 value := rand.Intn(MaxRandomNumber) 
 49                 if value == 0 { 
 50                     // here, a trick is used to notify the moderator 
 51                     // to close the additional signal channel. 
 52                     select { 
 53                     case toStop <- "sender#" + id: 
 54                     default: 
 55                     } 
 56                     return 
 57                 } 
 58  
 59                 // the first select here is to try to exit the 
 60                 // goroutine as early as possible. 
 61                 select { 
 62                 case <- stopCh: 
 63                     return 
 64                 default: 
 65                 } 
 66  
 67                 select { 
 68                 case <- stopCh: 
 69                     return 
 70                 case dataCh <- value: 
 71                 } 
 72             } 
 73         }(strconv.Itoa(i)) 
 74     } 
 75  
 76     // receivers 
 77     for i := 0; i < NumReceivers; i++ { 
 78         go func(id string) { 
 79             defer wgReceivers.Done() 
 80  
 81             for { 
 82                 // same as senders, the first select here is to  
 83                 // try to exit the goroutine as early as possible. 
 84                 select { 
 85                 case <- stopCh: 
 86                     return 
 87                 default: 
 88                 } 
 89  
 90                 select { 
 91                 case <- stopCh: 
 92                     return 
 93                 case value := <-dataCh: 
 94                     if value == MaxRandomNumber-1 { 
 95                         // the same trick is used to notify the moderator  
 96                         // to close the additional signal channel. 
 97                         select { 
 98                         case toStop <- "receiver#" + id: 
 99                         default: 
100                         } 
101                         return 
102                     } 
103  
104                     log.Println(value) 
105                 } 
106             } 
107         }(strconv.Itoa(i)) 
108     } 
109  
110     // ... 
111     wgReceivers.Wait() 
112     log.Println("stopped by", stoppedBy) 
113 }

打破channel closing principle

有没有一个内置函数可以检查一个channel是否已经关闭。如果你能确定不会向channel发送任何值,那么也确实需要一个简单的方法来检查channel是否已经关闭:

 1 package main 
 2  
 3 import "fmt" 
 4  
 5 type T int 
 6  
 7 func IsClosed(ch <-chan T) bool { 
 8     select { 
 9     case <-ch: 
10         return true 
11     default: 
12     } 
13  
14     return false 
15 } 
16  
17 func main() { 
18     c := make(chan T) 
19     fmt.Println(IsClosed(c)) // false 
20     close(c) 
21     fmt.Println(IsClosed(c)) // true 
22 }

上面已经提到了,没有一种适用的方式来检查channel是否已经关闭了。但是,就算有一个简单的 closed(chan T) bool函数来检查channel是否已经关闭,它的用处还是很有限的,就像内置的len函数用来检查缓冲channel中元素数量一样。原因就在于,已经检查过的channel的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的channel的当前状态了。
尽管在调用closed(ch)返回true的情况下停止向channel发送值是可以的,但是如果调用closed(ch)返回false,那么关闭channel或者继续向channel发送值就不安全了(会panic)。

The Channel Closing Principle

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要在多个并发发送端中关闭channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。
(下面,我们将会称上面的原则为channel closing principle

打破channel closing principle的解决方案

如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)

 1 func SafeSend(ch chan T, value T) (closed bool) { 
 2     defer func() { 
 3         if recover() != nil { 
 4             // the return result can be altered  
 5             // in a defer function call 
 6             closed = true 
 7         } 
 8     }() 
 9  
10     ch <- value // panic if ch is closed 
11     return false // <=> closed = false; return 
12 }

如果channel ch没有被关闭的话,那么这个函数的性能将和ch <- value接近。对于channel关闭的时候,SafeSend函数只会在每个sender goroutine中调用一次,因此程序不会有太大的性能损失。
同样的想法也可以用在从多个goroutine关闭channel中:

 1 func SafeClose(ch chan T) (justClosed bool) { 
 2     defer func() { 
 3         if recover() != nil { 
 4             justClosed = false 
 5         } 
 6     }() 
 7  
 8     // assume ch != nil here. 
 9     close(ch) // panic if ch is closed 
10     return true 
11 }

很多人喜欢用sync.Once来关闭channel:

 1 type MyChannel struct { 
 2     C    chan T 
 3     once sync.Once 
 4 } 
 5  
 6 func NewMyChannel() *MyChannel { 
 7     return &MyChannel{C: make(chan T)} 
 8 } 
 9  
10 func (mc *MyChannel) SafeClose() { 
11     mc.once.Do(func(){ 
12         close(mc.C) 
13     }) 
14 }

当然了,我们也可以用sync.Mutex来避免多次关闭channel:

 1 type MyChannel struct { 
 2     C      chan T 
 3     closed bool 
 4     mutex  sync.Mutex 
 5 } 
 6  
 7 func NewMyChannel() *MyChannel { 
 8     return &MyChannel{C: make(chan T)} 
 9 } 
10  
11 func (mc *MyChannel) SafeClose() { 
12     mc.mutex.Lock() 
13     if !mc.closed { 
14         close(mc.C) 
15         mc.closed = true 
16     } 
17     mc.mutex.Unlock() 
18 } 
19  
20 func (mc *MyChannel) IsClosed() bool { 
21     mc.mutex.Lock() 
22     defer mc.mutex.Unlock() 
23     return mc.closed 
24 }

我们应该要理解为什么Go不支持内置SafeSendSafeClose函数,原因就在于并不推荐从接收端或者多个并发发送端关闭channel。Golang甚至禁止关闭只接收(receive-only)的channel。


本文参考链接:https://www.cnblogs.com/ricklz/p/9693225.html
阅读延展