Three-day Thinking in Go (Day 3)

13 minute read

这是这个系列最后一天的笔记。

Concurrency

go语言有很好的并发。go语言的并发是依靠go routines,go routine是协程。 协程和线程最大的不同是,协程是协作式的,而线程是抢占式的。协程的开销也要比线程小很多。

Go Routines

  • 轻量级线程
  • 用函数来定义 (可以是有名字的,也可以是匿名的)
  • GOMAXPROCS 默认值是当前cpu的core的数目,定义了并发度
  • core是在thread中共享的
  • 一个OS thread只能在一个core上(线程是cpu调度的最小单位)
  • 触发线程reschedule的事件:sys call, channel(mutex), go func, GC
  • GC是有成本的(用Java的人应该很明白)
  • context switch是非常昂贵的
  • goroutine有这些状态: Running, Sleeping, Waiting
  • 需要想清楚你的goroutine什么时候开始和结束
 1package main
 2
 3import (
 4	"fmt"
 5	"runtime"
 6)
 7
 8func init() {
 9	runtime.GOMAXPROCS(1)
10}
11
12func count(min, max int) {
13	for i := min; i < max; i++ {
14		fmt.Print(i)
15	}
16	fmt.Println()
17}
18
19func main() {
20	go count(0, 100)
21	go count(100, 200)
22}
23// 这里不会有任何的输出,因为main是不会等 goroutine返回的,会直接继续执行到结束。

WaitGroup

  • 是一种带计数器的信号量(semaphore)
  • 跟踪记录在跑的goroutines
  • 当WaitGroup计数为0的时候,会变的可用,否则则block在那里
  • WaitGroup得传指针

WaitGroup的结构是

 1// A WaitGroup waits for a collection of goroutines to finish.
 2// The main goroutine calls Add to set the number of
 3// goroutines to wait for. Then each of the goroutines
 4// runs and calls Done when finished. At the same time,
 5// Wait can be used to block until all goroutines have finished.
 6//
 7// A WaitGroup must not be copied after first use.
 8type WaitGroup struct {
 9	noCopy noCopy
10
11	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
12	// 64-bit atomic operations require 64-bit alignment, but 32-bit
13	// compilers do not ensure it. So we allocate 12 bytes and then use
14	// the aligned 8 bytes in them as state, and the other 4 as storage
15	// for the sema.
16	state1 [3]uint32 // 这里即有counter也有semaphor
17}

WaitGroup的state()方法会返回counter和sema

1// state returns pointers to the state and sema fields stored within wg.state1.
2func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
3	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
4		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
5	} else {
6		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
7	}
8}

当WaitGroup call Add(delta int)的时候,这个delta就会被加入到counter中

关键代码如下(中间有省略),

 1statep, semap := wg.state()
 2
 3state := atomic.AddUint64(statep, uint64(delta)<<32) // 这里用的uint64的atomic操作来更新counter,
 4v := int32(state >> 32)  // 通过移位操作取出counter
 5w := uint32(state) // sema
 6
 7// 如果v等于0,则释放所有sema
 8*statep = 0
 9for ; w != 0; w-- {
10  runtime_Semrelease(semap, false, 0)
11}

而 call Done() 则相当于

1func (wg *WaitGroup) Done() {
2	wg.Add(-1)
3}

Wati()则相当于在一个for循环中block直到counter为0

Mutex

Mutex是Mutual exclusion的简写,叫做互斥锁 互斥锁保护起来的代码段叫做临界区域(critical section)

  • 允许多个goroutine共享内存的机制
  • 同步对共享状态的读写
  • 代价比较低,且较快
  • 状态为0的时候,表示可用:unlocked
  • 调度器决定哪一个goroutine可以进入critical section
  • RWMutex, 读写互斥锁,
  • sync/atomic 了解一下,也可以用来做sycnhronization

Channels

Unbuffered

因为是unbuffered,所以是阻塞的。

  • 专门为并发设计,为goroutine做编排
  • 起到了发送信号的作用,可以有data也可以没有数据
  • receive会先发生
  • channel是双向的
  • 保证了信号是一定能收到的
  • 用关键字make来创建channel
  • c<-
  • <-c
  • 两个状态,打开/关闭,一旦关闭就不能重新打开
  • 在null channel上发送/接受会阻塞
  • <-c 会返回ok,通过ok的值来看channel是否被关闭
  • 在声明的时候,如果可以,请尽可能的声明的channel的方向
  • 可以用for range来读一个channel
 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6)
 7
 8func main() {
 9	c := make(chan int)
10	go func(c <-chan int) {
11		for {
12			if v, ok := <-c; !ok {
13				break
14			} else {
15				fmt.Println("ACK", v)
16				time.Sleep(500 * time.Millisecond)
17			}
18		}
19	}(c)
20
21	for i := 0; i < 5; i++ {
22		fmt.Println("Req", i)
23		c <- i
24	}
25	close(c)
26}
27

不一定需要发数据, 可以发一个空结构体

 1package main
 2
 3import (
 4	"fmt"
 5)
 6
 7func main() {
 8	c := make(chan struct{})
 9	go func(c <-chan struct{}) {
10            <-c
11	    fmt.Println("got signal, do things")
12	}(c)
13
14	c<-struct{}{}
15	close(c)
16}
17
Select
  • 可以用select来在多个channel上收发数据
  • select会block在那里,直到某个case触发
  • 注意channel的方向

附上一个稍微复杂点的例子

这里有三个channel

  • 一个申明的c,数据channel
  • 一个声明的d,作为控制信道,空结构体,不传数据,
  • 一个是timer,也是一个channel chan func After(d Duration) <-chan Time
 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6)
 7
 8func main() {
 9	c, d := make(chan int), make(chan struct{})
10	go subscriber("G1", 1000, 1000, c, d)
11	go subscriber("G2", 500, 1000, c, d)
12	go subscriber("G3", 0, 100, c, d)
13	publisher(500, c, d)
14	
15
16	time.Sleep(2 * time.Second)
17}
18
19func subscriber(ID string, speed, timeout int , in <-chan int, ctrl <-chan struct{}) {
20	defer fmt.Println(ID, "Exited!")
21	for {
22		select {
23		case <-ctrl:
24			fmt.Printf("[%s] 关闭!\n", ID)
25			return
26		case <-time.After(time.Duration(timeout) * time.Millisecond):  // chan func After(d Duration) <-chan Time
27			fmt.Printf("[%s] 超时!!\n", ID)
28			return
29		case v, ok := <-in:
30			if !ok {
31				return
32			}
33			fmt.Printf("[%s] 收到 %v\n", ID, v)
34			time.Sleep(time.Duration(speed) * time.Millisecond)
35		}
36	}
37}
38
39func publisher(rate int, out chan<- int, ctrl chan<- struct{}) {
40	for i := 0; i < 5; i++ {
41		if i == 3 {
42			close(ctrl) // 关闭控制信道
43			break
44		}
45		out <- i
46		time.Sleep(time.Duration(rate) * time.Millisecond)
47	}
48}
49

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the “select” statement blocks until at least one of the communications can proceed. - select

当有多个goroutine都在一个channel上的时候,当消息来的时候,是一个伪随机的均匀分布来选择一个goroutine来接收的。所以上一题,就取决于谁先收到,因为每个goroutine的处理时间和超时时间不一样,如果g1先收到 g2g3都会超时。

Buffered

  • 对于producer不能保证一定delive
  • 消息会被drop,如果接收方没了
  • 适用于, fast producer / slow consumer
  • 对于不同workload,需要通过测量你的throughput来确定一个合适的缓冲大小

JSON

  • 标准库原生支持, encoding/decoding
  • 小的json doc可以用json.Marshal/json.Unmarshal
  • 大的json文件可以考虑 Decoder.Decode/Encode
  • 可以通过json:"name"来改变生产的json的名字
 1package main
 2
 3import (
 4	"bytes"
 5	"encoding/json"
 6	"fmt"
 7)
 8
 9func main() {
10	b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)
11
12	// JSON ->
13	m := map[string]interface{}{}
14	if err := json.NewDecoder(bytes.NewReader(b)).Decode(&m); err != nil {
15	    panic(err)
16	}
17	fmt.Println(m["first_name"], m["last_name"])
18
19	buff := new(bytes.Buffer)
20	if err := json.NewEncoder(buff).Encode(m); err != nil {
21		panic(err)
22	}
23	fmt.Println(buff)
24}
25
 1package main
 2
 3import (
 4	"bytes"
 5	"encoding/json"
 6	"fmt"
 7)
 8
 9func main() {
10	b := []byte(`{"first_name":"Alice", "last_name": "Mary"}`)
11
12	j := struct {
13		FirstName string `json:"first_name,omitempty"`
14		LastName  string `json:"last_name,omitempty"`
15	}{} // 注意filed的首字母大小写,因为这个要传给另一个package处理,所以需要export
16	if err := json.NewDecoder(bytes.NewReader(b)).Decode(&j); err != nil {
17		panic(err)
18	}
19
20	fmt.Println(j.FirstName, j.LastName)
21
22	buff := new(bytes.Buffer)
23	if err := json.NewEncoder(buff).Encode(j); err != nil {
24		panic(err)
25	}
26	fmt.Println(buff)
27}
28

Web Service

  • go的标准库里就有http library
  • 加上json库基本上就可以做出很好的web service
  • 如果需要更复杂的routing和中间件,可以考虑第三方的web framework, go-kit, GorillaMux, go-chassis, buffalo, Gin等
  • openapi/swagger 和相关的generator: go-swagger, openapi-generator
  • 还有一个test库用来测试网络服务
  • 记得要close reponse body。

Web Server

 1type Pong struct {
 2  Status int `json:"status_code"`
 3}
 4
 5func pingHandler(w http.ResponseWriter, r *http.Request) {
 6  resp := Pong{
 7    Status: http.StatusOK,
 8  }
 9
10  buff := new(bytes.Buffer)
11  err = json.NewEncoder(buff).Encode(resp)
12  if err != nil {
13    http.Error(w, err.Error(), http.StatusInternalServerError)
14    return
15  }
16
17  w.Header().Set("Content-Type", "application/json; charset=utf-8")
18  fmt.Fprintf(w, buff.String())
19}
20
21func main() {
22  http.HandleFunc("/ping", pingHandler)
23  http.ListenAndServe(:3000, nil)
24}

Http Client

 1u, err := url.Parse("http://localhost:3000")
 2if err != nil {
 3  return "", err
 4}
 5u.Path = "/ping"
 6
 7// Encode query params...
 8vals := u.Query()
 9vals.Set("a", "a")
10u.RawQuery = vals.Encode()
11
12// Get call on web server
13resp, err := http.Get(u.String())
14if resp != nil {
15  defer resp.Body.Close()
16}
17if err != nil {
18  return err
19}
20
21if resp.StatusCode != 200 {
22  return fmt.Errorf("call failed %d", resp.StatusCode)
23}
24
25// Decode a json response
26body := Pong{}
27err = json.NewDecoder(resp.Body).Decode(&body)
28if err != nil {
29  return err
30}
31// Process response...

Testing

 1func TestPingHandler(t *testing.T) {
 2  var(
 3    rr   = httptest.NewRecorder()
 4    r, _ = http.NewRequest("GET", "http://example.com/ping", nil)
 5  )
 6
 7  PingHandler(rr, r)
 8  assert.Equal(t, http.StatusOK, rr.Code)
 9
10  var resp Pong
11  err := json.NewDecoder(rr.Body).Decode(&resp)
12  assert.Nil(t, err)
13  assert.Equal(t, 200, resp.Status)
14  // Process response...
15}

结语

希望通过这三个短短的笔记,能够让读者对go有一个基本的认识, 当然要想进一步的提高,还是应该在工作项目和业余side project尽可能找到机会可以用go来编写程序。希望有机会等更深入的使用了这一门语言之后继续分享更多的经验。

GO入门系列

  1. 系列1
  2. 系列2
  3. 系列3

wechat-qrcode