Go:Channel使用模式
有幾種重要的channel模式需要理解,因為channel實現了Goroutine之間的通信。
等待結果模式
這是channel的基本使用模式,創建一個goroutine來執行任務,然后將執行結果通過channel通知到對應的其他Goroutine。
func WaitForResult() {
ch := make(chan string)
go func() {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
ch <- "data"
fmt.Println("child : sent signal")
}()
d := <-ch
fmt.Println("parent : recv'd signal :", d)
time.Sleep(time.Second)
fmt.Println("--------------------")
}
這里使用不帶緩存的channel來接收數據,可以保證子goroutine發送的數據立刻被接收到。
扇出/扇入模式
這種模式是包含多個Goroutine向channel發送數據,要保證數據都能接收到。
func FanOut() {
children := 2000
ch := make(chan string, children)
for c := 0; c < children; c++ {
go func(child int) {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "data"
fmt.Println("child : sent signal :", child)
}(c)
}
for children > 0 {
d := <-ch
children--
fmt.Println(d)
fmt.Println("parent : recv'd signal :", children)
}
time.Sleep(time.Second)
fmt.Println("---------------")
}
這里我們創建了2000個goroutine來執行任務,為了保證Goroutine不會相互影響,采用帶緩存的channel來接收執行結果。主goroutine使用for循環來接收channel里面的數據。sleep模擬執行的任務。
等待任務模式
這種模式是子goroutine通過channel接收來自主goroutine發送的數據,也可以是執行任務的函數。
func WaitForTask() {
ch := make(chan string)
go func() {
d := <-ch
fmt.Println("child : recv'd signal :", d)
}()
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
ch <- "data"
fmt.Println("parent : sent signal")
time.Sleep(time.Second)
fmt.Println("---------------")
}
這里也是使用不帶緩存的channel,子goroutine等待channel發送數據,接收并執行任務。
Goroutine池
該模式還使用了等待任務模式,允許根據資源情況限制子goroutine的個數。
func pooling() {
ch := make(chan string)
g := runtime.GOMAXPROCS(0)
for c := 0; c < g; c++ {
go func(child int) {
for d := range ch {
fmt.Printf("child %d : recv'd signal : %s\n", child, d)
}
fmt.Printf("child %d : recv'd shutdown signal\n", child)
}(c)
}
const work = 100
for w := 0; w < work; w++ {
ch <- "data"
fmt.Println("parent : sent signal :", w)
}
close(ch)
fmt.Println("parent : sent shutdown signal")
time.Sleep(time.Second)
fmt.Println("------------------")
}
這里我們創建了一組Goroutine來接收同一個channel發送來的數據。這里高效的原因是多個goroutine可以并行執行,注意不是并發。
首先創建一個不帶緩沖的通道。使用無緩沖的通道是至關重要的,因為如果沒有信號級別的保證,就不能在發送時執行超時和取消。代碼的下一部分決定池將包含的子Goroutines的數量。
g := runtime.GOMAXPROCS(0)
該函數可以讀取機器cpu核數,也就是能并行執行代碼的cpu核數。如果參數大于0,直接返回的是并發數。
使用for-range讀取channel中的數據可以節省代碼,當然也可以使用以下代碼來讀取channel數據:
for c := 0; c < g; c++ {
go func( child int) {
for {
d, wd := <-ch <-- CHANGED
if !wd { <-- CHANGED
break <-- CHANGED
}
fmt.Printf("child %d : recv'd signal : %s\n", child, d)
}
fmt.Printf("child %d : recv'd shutdown signal\n", child)
}(c)
}
Drop模式
該模式在寫入channel的數據量比較大的時候,超出緩沖的容量就選擇丟棄數據。例如當應用程序負載太大就可以丟棄一些請求。
func Drop() {
const cap = 100
ch := make(chan string, cap)
go func() {
for p := range ch {
fmt.Println("child : recv'd signal :", p)
}
}()
const work = 2000
for w := 0; w < work; w++ {
select {
case ch <- "data":
fmt.Println("parent : sent signal :", w)
default:
fmt.Println("parent : dropped data :", w)
}
}
close(ch)
fmt.Println("parent : sent shutdown signal")
time.Sleep(time.Second)
fmt.Println("-----------------")
}
我們創建一個帶緩存的channel和一個Goroutine來接收任務。在緩沖區被填滿之前,Goroutine無法及時處理所有的工作。表示服務已滿負荷運行。
在for循環里面使用select,是一個受阻塞的模塊,每個case代表一個channel操作,發送或接收。但是,這個select也使用了default關鍵字,它將select轉換為非阻塞調用。關鍵就在這里,如果channel緩沖區滿了,select就會執行default。在web服務里面,我們可以在default中返回500內部錯誤,或者將請求存起來。
取消模式
取消模式用于在執行一些IO操作的時候,可以選擇超時時間。你可以選擇取消操作,或者直接退出。
func Cancellation() {
duration := 150 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
ch := make(chan string, 1)
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "data"
}()
select {
case d := <-ch:
fmt.Println("work complete", d)
case <-ctx.Done():
fmt.Println("work cancelled")
}
time.Sleep(time.Second)
fmt.Println("-------------")
}
這里使用context創建超時上下文實例ctx,主Goroutine在select中通過ctx.Done讀取是否超時。這里使用defer調用cancel()防止內存溢出。
在主goroutine中select等待ch通道數據或者超時,哪個先完成就執行哪個case。
帶信號量的扇入/扇出
這種模式可以隨時控制可執行的Goroutine數量。
func FanOutSem() {
children := 2000
ch := make(chan string, children)
g := runtime.GOMAXPROCS(0)
sem := make(chan bool, g)
for c := 0; c < children; c++ {
go func(child int) {
sem <- true
{
t := time.Duration(rand.Intn(200)) * time.Millisecond
time.Sleep(t)
ch <- "data"
fmt.Println("child : sent signal :", child)
}
<-sem
}(c)
}
for children > 0 {
d := <-ch
children--
fmt.Println(d)
fmt.Println("parent : recv'd signal :", children)
}
time.Sleep(time.Second)
fmt.Println("-------------")
}
這里一開始創建了一個緩沖為2000的channel。和前面的扇入/扇出沒啥區別。另一個chennel sem也被創建了,在每個子goroutine內部使用,可以控制子Goroutine是否能夠寫入數據容量,緩沖區滿的話子goroutine就會阻塞。后面的for循環用于等待每個goroutine執行完成。
重試超時模式
這種模式在網絡服務中很實用,例如在連接數據庫的時候,發起ping操作可能會失敗,但是并不希望馬上退出,而是在一定時間內發起重試。
func RetryTimeout(ctx context.Context, retryInterval time.Duration,
check func(ctx context.Context) error) {
for {
fmt.Println("perform user check call")
if err := check(ctx); err == nil {
fmt.Println("work finished successfully")
return
}
fmt.Println("check if timeout has expired")
if ctx.Err() != nil {
fmt.Println("time expired 1 :", ctx.Err())
return
}
fmt.Printf("wait %s before trying again\n", retryInterval)
t := time.NewTimer(retryInterval)
select {
case <-ctx.Done():
fmt.Println("timed expired 2 :", ctx.Err())
t.Stop()
return
case <-t.C:
fmt.Println("retry again")
}
}
}
這里函數接收一個context指定超時時間,每次重試時間間隔,以及重試函數check,由函數調用者定義。
該函數的核心是for無限循環,在循環內先檢查check函數是否執行完成,接著在select中判斷是否超時,以及定義重試計時器。
channel取消模式
可以創建一個單獨的channel用來實現取消的功能。
func channelCancellation(stop <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-stop:
cancel()
case <-ctx.Done():
}
}()
func(ctx context.Context) error {
req, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
"https://www.ardanlabs.com/blog/index.xml",
nil,
)
if err != nil {
return err
}
_, err = http.DefaultClient.Do(req)
if err != nil {
return err
}
return nil
}(ctx)
}
該函數的關鍵在于創建一個新的goroutine并使用select來等待兩個channel發送的數據。第一個channel是一個空結構體類型,另一個是context。在接收到stop通道值時,就調用cancel函數,取消所有接收了對應context函數的執行。