Three-day Thinking in Go (Day 3)
这是这个系列最后一天的笔记。
Concurrency
go语言有很好的并发。go语言的并发是依靠go routines,go routine是协程。 协程和线程最大的不同是,协程是协作式的,而线程是抢占式的。协程的开销也要比线程小很多。
Go Routines
- 轻量级线程
- 用函数来定义 (可以是有名字的,也可以是匿名的)
- GOMAXPROCS 默认值是当前cpu的core的数目,定义了并发度
- core是在thread中共享的
- 一个OS thread只能在一个core上(线程是cpu调度的最小单位)
- 触发线程reschedule的事件:sys call, channel(mutex), go func, GC
- GC是有成本的(用Java的人应该很明白)
- context switch是非常昂贵的
- goroutine有这些状态: Running, Sleeping, Waiting
- 需要想清楚你的goroutine什么时候开始和结束
1package main
2
3import (
4 "fmt"
5 "runtime"
6)
7
8func init() {
9 runtime.GOMAXPROCS(1)
10}
11
12func count(min, max int) {
13 for i := min; i < max; i++ {
14 fmt.Print(i)
15 }
16 fmt.Println()
17}
18
19func main() {
20 go count(0, 100)
21 go count(100, 200)
22}
23// 这里不会有任何的输出,因为main是不会等 goroutine返回的,会直接继续执行到结束。
WaitGroup
- 是一种带计数器的信号量(semaphore)
- 跟踪记录在跑的goroutines
- 当WaitGroup计数为0的时候,会变的可用,否则则block在那里
- WaitGroup得传指针
WaitGroup的结构是
1// A WaitGroup waits for a collection of goroutines to finish.
2// The main goroutine calls Add to set the number of
3// goroutines to wait for. Then each of the goroutines
4// runs and calls Done when finished. At the same time,
5// Wait can be used to block until all goroutines have finished.
6//
7// A WaitGroup must not be copied after first use.
8type WaitGroup struct {
9 noCopy noCopy
10
11 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
12 // 64-bit atomic operations require 64-bit alignment, but 32-bit
13 // compilers do not ensure it. So we allocate 12 bytes and then use
14 // the aligned 8 bytes in them as state, and the other 4 as storage
15 // for the sema.
16 state1 [3]uint32 // 这里即有counter也有semaphor
17}
WaitGroup的state()
方法会返回counter和sema
1// state returns pointers to the state and sema fields stored within wg.state1.
2func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
3 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
4 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
5 } else {
6 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
7 }
8}
当WaitGroup call Add(delta int)
的时候,这个delta就会被加入到counter中
关键代码如下(中间有省略),
1statep, semap := wg.state()
2
3state := atomic.AddUint64(statep, uint64(delta)<<32) // 这里用的uint64的atomic操作来更新counter,
4v := int32(state >> 32) // 通过移位操作取出counter
5w := uint32(state) // sema
6
7// 如果v等于0,则释放所有sema
8*statep = 0
9for ; w != 0; w-- {
10 runtime_Semrelease(semap, false, 0)
11}
而 call Done()
则相当于
1func (wg *WaitGroup) Done() {
2 wg.Add(-1)
3}
而 Wati()
则相当于在一个for循环中block直到counter为0
Mutex
Mutex是Mutual exclusion的简写,叫做互斥锁 互斥锁保护起来的代码段叫做临界区域(critical section)
- 允许多个goroutine共享内存的机制
- 同步对共享状态的读写
- 代价比较低,且较快
- 状态为0的时候,表示可用:unlocked
- 调度器决定哪一个goroutine可以进入critical section
- RWMutex, 读写互斥锁,
- sync/atomic 了解一下,也可以用来做sycnhronization
Channels
Unbuffered
因为是unbuffered,所以是阻塞的。
- 专门为并发设计,为goroutine做编排
- 起到了发送信号的作用,可以有data也可以没有数据
- receive会先发生
- channel是双向的
- 保证了信号是一定能收到的
- 用关键字
make
来创建channel - 发
c<-
- 收
<-c
- 两个状态,打开/关闭,一旦关闭就不能重新打开
- 在null channel上发送/接受会阻塞
- <-c 会返回ok,通过ok的值来看channel是否被关闭
- 在声明的时候,如果可以,请尽可能的声明的channel的方向
- 可以用for range来读一个channel
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 c := make(chan int)
10 go func(c <-chan int) {
11 for {
12 if v, ok := <-c; !ok {
13 break
14 } else {
15 fmt.Println("ACK", v)
16 time.Sleep(500 * time.Millisecond)
17 }
18 }
19 }(c)
20
21 for i := 0; i < 5; i++ {
22 fmt.Println("Req", i)
23 c <- i
24 }
25 close(c)
26}
27
不一定需要发数据, 可以发一个空结构体
1package main
2
3import (
4 "fmt"
5)
6
7func main() {
8 c := make(chan struct{})
9 go func(c <-chan struct{}) {
10 <-c
11 fmt.Println("got signal, do things")
12 }(c)
13
14 c<-struct{}{}
15 close(c)
16}
17
Select
- 可以用select来在多个channel上收发数据
- select会block在那里,直到某个case触发
- 注意channel的方向
附上一个稍微复杂点的例子
这里有三个channel
- 一个申明的c,数据channel
- 一个声明的d,作为控制信道,空结构体,不传数据,
- 一个是timer,也是一个channel
chan func After(d Duration) <-chan Time
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 c, d := make(chan int), make(chan struct{})
10 go subscriber("G1", 1000, 1000, c, d)
11 go subscriber("G2", 500, 1000, c, d)
12 go subscriber("G3", 0, 100, c, d)
13 publisher(500, c, d)
14
15
16 time.Sleep(2 * time.Second)
17}
18
19func subscriber(ID string, speed, timeout int , in <-chan int, ctrl <-chan struct{}) {
20 defer fmt.Println(ID, "Exited!")
21 for {
22 select {
23 case <-ctrl:
24 fmt.Printf("[%s] 关闭!\n", ID)
25 return
26 case <-time.After(time.Duration(timeout) * time.Millisecond): // chan func After(d Duration) <-chan Time
27 fmt.Printf("[%s] 超时!!\n", ID)
28 return
29 case v, ok := <-in:
30 if !ok {
31 return
32 }
33 fmt.Printf("[%s] 收到 %v\n", ID, v)
34 time.Sleep(time.Duration(speed) * time.Millisecond)
35 }
36 }
37}
38
39func publisher(rate int, out chan<- int, ctrl chan<- struct{}) {
40 for i := 0; i < 5; i++ {
41 if i == 3 {
42 close(ctrl) // 关闭控制信道
43 break
44 }
45 out <- i
46 time.Sleep(time.Duration(rate) * time.Millisecond)
47 }
48}
49
If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the “select” statement blocks until at least one of the communications can proceed. - select
当有多个goroutine都在一个channel上的时候,当消息来的时候,是一个伪随机的均匀分布来选择一个goroutine来接收的。所以上一题,就取决于谁先收到,因为每个goroutine的处理时间和超时时间不一样,如果g1先收到 g2g3都会超时。
Buffered
- 对于producer不能保证一定delive
- 消息会被drop,如果接收方没了
- 适用于, fast producer / slow consumer
- 对于不同workload,需要通过测量你的throughput来确定一个合适的缓冲大小
JSON
- 标准库原生支持, encoding/decoding
- 小的json doc可以用json.Marshal/json.Unmarshal
- 大的json文件可以考虑 Decoder.Decode/Encode
- 可以通过
json:"name"
来改变生产的json的名字
1package main
2
3import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7)
8
9func main() {
10 b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)
11
12 // JSON ->
13 m := map[string]interface{}{}
14 if err := json.NewDecoder(bytes.NewReader(b)).Decode(&m); err != nil {
15 panic(err)
16 }
17 fmt.Println(m["first_name"], m["last_name"])
18
19 buff := new(bytes.Buffer)
20 if err := json.NewEncoder(buff).Encode(m); err != nil {
21 panic(err)
22 }
23 fmt.Println(buff)
24}
25
1package main
2
3import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7)
8
9func main() {
10 b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)
11
12 j := struct {
13 FirstName string `json:"first_name,omitempty"`
14 LastName string `json:"last_name,omitempty"`
15 }{} // 注意filed的首字母大小写,因为这个要传给另一个package处理,所以需要export
16 if err := json.NewDecoder(bytes.NewReader(b)).Decode(&j); err != nil {
17 panic(err)
18 }
19
20 fmt.Println(j.FirstName, j.LastName)
21
22 buff := new(bytes.Buffer)
23 if err := json.NewEncoder(buff).Encode(j); err != nil {
24 panic(err)
25 }
26 fmt.Println(buff)
27}
28
Web Service
- go的标准库里就有http library
- 加上json库基本上就可以做出很好的web service
- 如果需要更复杂的routing和中间件,可以考虑第三方的web framework, go-kit, GorillaMux, go-chassis, buffalo, Gin等
- openapi/swagger 和相关的generator: go-swagger, openapi-generator
- 还有一个test库用来测试网络服务
- 记得要close reponse body。
Web Server
1type Pong struct {
2 Status int `json:"status_code"`
3}
4
5func pingHandler(w http.ResponseWriter, r *http.Request) {
6 resp := Pong{
7 Status: http.StatusOK,
8 }
9
10 buff := new(bytes.Buffer)
11 err = json.NewEncoder(buff).Encode(resp)
12 if err != nil {
13 http.Error(w, err.Error(), http.StatusInternalServerError)
14 return
15 }
16
17 w.Header().Set("Content-Type", "application/json; charset=utf-8")
18 fmt.Fprintf(w, buff.String())
19}
20
21func main() {
22 http.HandleFunc("/ping", pingHandler)
23 http.ListenAndServe(:3000, nil)
24}
Http Client
1u, err := url.Parse("http://localhost:3000")
2if err != nil {
3 return "", err
4}
5u.Path = "/ping"
6
7// Encode query params...
8vals := u.Query()
9vals.Set("a", "a")
10u.RawQuery = vals.Encode()
11
12// Get call on web server
13resp, err := http.Get(u.String())
14if resp != nil {
15 defer resp.Body.Close()
16}
17if err != nil {
18 return err
19}
20
21if resp.StatusCode != 200 {
22 return fmt.Errorf("call failed %d", resp.StatusCode)
23}
24
25// Decode a json response
26body := Pong{}
27err = json.NewDecoder(resp.Body).Decode(&body)
28if err != nil {
29 return err
30}
31// Process response...
Testing
1func TestPingHandler(t *testing.T) {
2 var(
3 rr = httptest.NewRecorder()
4 r, _ = http.NewRequest("GET", "http://example.com/ping", nil)
5 )
6
7 PingHandler(rr, r)
8 assert.Equal(t, http.StatusOK, rr.Code)
9
10 var resp Pong
11 err := json.NewDecoder(rr.Body).Decode(&resp)
12 assert.Nil(t, err)
13 assert.Equal(t, 200, resp.Status)
14 // Process response...
15}
结语
希望通过这三个短短的笔记,能够让读者对go有一个基本的认识, 当然要想进一步的提高,还是应该在工作项目和业余side project尽可能找到机会可以用go来编写程序。希望有机会等更深入的使用了这一门语言之后继续分享更多的经验。