Three-Day Thinking in Go (Day 3)

这是这个系列最后一天的笔记。

Concurrency

go语言有很好的并发。go语言的并发是依靠go routines,go routine是协程。
协程和线程最大的不同是,协程是协作式的,而线程是抢占式的。协程的开销也要比线程小很多。

Go Routines

  • 轻量级线程
  • 用函数来定义 (可以是有名字的,也可以是匿名的)
  • GOMAXPROCS 默认值是当前cpu的core的数目,定义了并发度
  • core是在thread中共享的
  • 一个OS thread只能在一个core上(线程是cpu调度的最小单位)
  • 触发线程reschedule的事件:sys call, channel(mutex), go func, GC
  • GC是有成本的(用Java的人应该很明白)
  • context switch是非常昂贵的
  • goroutine有这些状态: Running, Sleeping, Waiting
  • 需要想清楚你的goroutine什么时候开始和结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"runtime"
)

func init() {
runtime.GOMAXPROCS(1)
}

func count(min, max int) {
for i := min; i < max; i++ {
fmt.Print(i)
}
fmt.Println()
}

func main() {
go count(0, 100)
go count(100, 200)
}
// 这里不会有任何的输出,因为main是不会等 goroutine返回的,会直接继续执行到结束。

WaitGroup

  • 是一种带计数器的信号量(semaphore)
  • 跟踪记录在跑的goroutines
  • 当WaitGroup计数为0的时候,会变的可用,否则则block在那里
  • WaitGroup得传指针

WaitGroup的结构是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32 // 这里即有counter也有semaphor
}

WaitGroup的state()方法会返回counter和sema

1
2
3
4
5
6
7
8
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

当WaitGroup call Add(delta int)的时候,这个delta就会被加入到counter中

关键代码如下(中间有省略),

1
2
3
4
5
6
7
8
9
10
11
statep, semap := wg.state()

state := atomic.AddUint64(statep, uint64(delta)<<32) // 这里用的uint64的atomic操作来更新counter,
v := int32(state >> 32) // 通过移位操作取出counter
w := uint32(state) // sema

// 如果v等于0,则释放所有sema
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}

而 call Done() 则相当于

1
2
3
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wati()则相当于在一个for循环中block直到counter为0

Mutex

Mutex是Mutual exclusion的简写,叫做互斥锁
互斥锁保护起来的代码段叫做临界区域(critical section)

  • 允许多个goroutine共享内存的机制
  • 同步对共享状态的读写
  • 代价比较低,且较快
  • 状态为0的时候,表示可用:unlocked
  • 调度器决定哪一个goroutine可以进入critical section
  • RWMutex, 读写互斥锁,
  • sync/atomic 了解一下,也可以用来做sycnhronization

Channels

Unbuffered

因为是unbuffered,所以是阻塞的。

  • 专门为并发设计,为goroutine做编排
  • 起到了发送信号的作用,可以有data也可以没有数据
  • receive会先发生
  • channel是双向的
  • 保证了信号是一定能收到的
  • 用关键字make来创建channel
  • c<-
  • <-c
  • 两个状态,打开/关闭,一旦关闭就不能重新打开
  • 在null channel上发送/接受会阻塞
  • <-c 会返回ok,通过ok的值来看channel是否被关闭
  • 在声明的时候,如果可以,请尽可能的声明的channel的方向
  • 可以用for range来读一个channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan int)
go func(c <-chan int) {
for {
if v, ok := <-c; !ok {
break
} else {
fmt.Println("ACK", v)
time.Sleep(500 * time.Millisecond)
}
}
}(c)

for i := 0; i < 5; i++ {
fmt.Println("Req", i)
c <- i
}
close(c)
}

不一定需要发数据, 可以发一个空结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
)

func main() {
c := make(chan struct{})
go func(c <-chan struct{}) {
<-c
fmt.Println("got signal, do things")
}(c)

c<-struct{}{}
close(c)
}
Select
  • 可以用select来在多个channel上收发数据
  • select会block在那里,直到某个case触发
  • 注意channel的方向

附上一个稍微复杂点的例子

这里有三个channel

  • 一个申明的c,数据channel
  • 一个声明的d,作为控制信道,空结构体,不传数据,
  • 一个是timer,也是一个channel chan func After(d Duration) <-chan Time
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"time"
)

func main() {
c, d := make(chan int), make(chan struct{})
go subscriber("G1", 1000, 1000, c, d)
go subscriber("G2", 500, 1000, c, d)
go subscriber("G3", 0, 100, c, d)
publisher(500, c, d)


time.Sleep(2 * time.Second)
}

func subscriber(ID string, speed, timeout int , in <-chan int, ctrl <-chan struct{}) {
defer fmt.Println(ID, "Exited!")
for {
select {
case <-ctrl:
fmt.Printf("[%s] 关闭!\n", ID)
return
case <-time.After(time.Duration(timeout) * time.Millisecond): // chan func After(d Duration) <-chan Time
fmt.Printf("[%s] 超时!!\n", ID)
return
case v, ok := <-in:
if !ok {
return
}
fmt.Printf("[%s] 收到 %v\n", ID, v)
time.Sleep(time.Duration(speed) * time.Millisecond)
}
}
}

func publisher(rate int, out chan<- int, ctrl chan<- struct{}) {
for i := 0; i < 5; i++ {
if i == 3 {
close(ctrl) // 关闭控制信道
break
}
out <- i
time.Sleep(time.Duration(rate) * time.Millisecond)
}
}

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the “select” statement blocks until at least one of the communications can proceed. - select

当有多个goroutine都在一个channel上的时候,当消息来的时候,是一个伪随机的均匀分布来选择一个goroutine来接收的。所以上一题,就取决于谁先收到,因为每个goroutine的处理时间和超时时间不一样,如果g1先收到 g2g3都会超时。

Buffered

  • 对于producer不能保证一定delive
  • 消息会被drop,如果接收方没了
  • 适用于, fast producer / slow consumer
  • 对于不同workload,需要通过测量你的throughput来确定一个合适的缓冲大小

JSON

  • 标准库原生支持, encoding/decoding
  • 小的json doc可以用json.Marshal/json.Unmarshal
  • 大的json文件可以考虑 Decoder.Decode/Encode
  • 可以通过json:"name"来改变生产的json的名字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"bytes"
"encoding/json"
"fmt"
)

func main() {
b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)

// JSON ->
m := map[string]interface{}{}
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&m); err != nil {
panic(err)
}
fmt.Println(m["first_name"], m["last_name"])

buff := new(bytes.Buffer)
if err := json.NewEncoder(buff).Encode(m); err != nil {
panic(err)
}
fmt.Println(buff)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"bytes"
"encoding/json"
"fmt"
)

func main() {
b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)

j := struct {
FirstName string `json:"first_name,omitempty"`
LastName string `json:"last_name,omitempty"`
}{} // 注意filed的首字母大小写,因为这个要传给另一个package处理,所以需要export
if err := json.NewDecoder(bytes.NewReader(b)).Decode(&j); err != nil {
panic(err)
}

fmt.Println(j.FirstName, j.LastName)

buff := new(bytes.Buffer)
if err := json.NewEncoder(buff).Encode(j); err != nil {
panic(err)
}
fmt.Println(buff)
}

Web Service

  • go的标准库里就有http library
  • 加上json库基本上就可以做出很好的web service
  • 如果需要更复杂的routing和中间件,可以考虑第三方的web framework, go-kit, GorillaMux, go-chassis, buffalo, Gin等
  • openapi/swagger 和相关的generator: go-swagger, openapi-generator
  • 还有一个test库用来测试网络服务
  • 记得要close reponse body。

Web Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Pong struct {
Status int `json:"status_code"`
}

func pingHandler(w http.ResponseWriter, r *http.Request) {
resp := Pong{
Status: http.StatusOK,
}

buff := new(bytes.Buffer)
err = json.NewEncoder(buff).Encode(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, buff.String())
}

func main() {
http.HandleFunc("/ping", pingHandler)
http.ListenAndServe(:3000, nil)
}

Http Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
u, err := url.Parse("http://localhost:3000")
if err != nil {
return "", err
}
u.Path = "/ping"

// Encode query params...
vals := u.Query()
vals.Set("a", "a")
u.RawQuery = vals.Encode()

// Get call on web server
resp, err := http.Get(u.String())
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
return err
}

if resp.StatusCode != 200 {
return fmt.Errorf("call failed %d", resp.StatusCode)
}

// Decode a json response
body := Pong{}
err = json.NewDecoder(resp.Body).Decode(&body)
if err != nil {
return err
}
// Process response...

Testing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestPingHandler(t *testing.T) {
var(
rr = httptest.NewRecorder()
r, _ = http.NewRequest("GET", "http://example.com/ping", nil)
)

PingHandler(rr, r)
assert.Equal(t, http.StatusOK, rr.Code)

var resp Pong
err := json.NewDecoder(rr.Body).Decode(&resp)
assert.Nil(t, err)
assert.Equal(t, 200, resp.Status)
// Process response...
}

结语

希望通过这三个短短的笔记,能够让读者对go有一个基本的认识,
当然要想进一步的提高,还是应该在工作项目和业余side project尽可能找到机会可以用go来编写程序。希望有机会等更深入的使用了这一门语言之后继续分享更多的经验。

GO入门系列

  1. Three-day Thinking in Go (Day 1)
  2. Three-Day Thinking in Go (Day 2)
  3. Three-day Thinking in Go (Day 3)

希望快速得到新文章的通知?请关注作者的微信公众号

wechat-qrcode