sync包相关
sync.Once - 函数只执行一下
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
# 打印结果
Only once
sync.REMutex - 读写锁
概述
- 写锁只能被一个goroutine占用,读锁可以同时被多个goroutine同时获取
- 适用场景:适用用于读多写少的场景
Api
- Lock( ) // 加写锁
- RLock( ) // 加读锁
sync.Mutex - 互斥锁
概述
- 一个goroutine或得Mutex后,其他的goroutine只能等到这个goroutine释放Mutex
- 已经锁定的 Mutex 并不与特定的 goroutine 相关联,这样可以利用一个 goroutine 对其加锁,再利用其他 goroutine 对其解锁
- 使用场景:适用于一个读一个写的场景
Api
- Lock( )
- Unlock( )
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
"time"
"fmt"
"sync"
)
func main() {
var mutex sync.Mutex
fmt.Println("Lock the lock")
mutex.Lock()
fmt.Println("The lock is locked")
channels := make([]chan int, 4)
for i := 0; i < 4; i++ {
channels[i] = make(chan int)
go func(i int, c chan int) {
fmt.Println("Not lock: ", i)
mutex.Lock()
fmt.Println("Locked: ", i)
time.Sleep(time.Second)
fmt.Println("Unlock the lock: ", i)
mutex.Unlock()
c <- i
}(i, channels[i])
}
time.Sleep(time.Second)
fmt.Println("Unlock the lock")
mutex.Unlock()
time.Sleep(time.Second)
for _, c := range channels {
<-c
}
}
sync.Pool - 临时对象池
概述
- 维护一个本地对象池,而不需要频繁的创建对象和gc
- 适用场景:适用于无状态的对象复用:fmt包,不适用于有状态的对象,如:socket、数据库连接池
- 创建Pool需要实现一个New方法,当获取不到临时对象时,调用New方法创建
Api
- Get( ) // 获取一个临时对象
- Put( ) // 将临时对象放回pool中
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import (
"bytes"
"io"
"os"
"sync"
"time"
)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 获取临时对象,没有的话会自动创建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 将临时对象放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
打印结果:
2006-01-02T15:04:05Z path=/search?q=flowers
sync.Cond - 条件变量
概述
- sync.Cond实现了goroutine状态变化的通信机制,如:goroutine A的执行通过Wait( )等待goroutine B的通知,goroutine B维护一个通知列表,调用Signal( )或Broadcast( )通知goroutine A恢复执行。
- sync.Cond总是与锁一起使用,并在Wait( )之前就上锁
Api
- sync.NewCond( ) // 创建cond对象
- cond.L.Lock( ) // 上锁
- Wait( ) // 协程阻塞
- Signal( ) // 唤醒列表中的一个协程
- Broadcast( ) // 唤醒所有协程
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main
import (
"bytes"
"fmt"
"io"
"sync"
"time"
)
type MyDataBucket struct {
br *bytes.Buffer
gmutex *sync.RWMutex
rcond *sync.Cond //读操作需要用到的条件变量
}
func NewDataBucket() *MyDataBucket {
buf := make([]byte, 0)
db := &MyDataBucket{
br: bytes.NewBuffer(buf),
gmutex: new(sync.RWMutex),
}
db.rcond = sync.NewCond(db.gmutex.RLocker())
return db
}
func (db *MyDataBucket) Read(i int) {
db.gmutex.RLock()
defer db.gmutex.RUnlock()
var data []byte
var d byte
var err error
for {
//读取一个字节
if d, err = db.br.ReadByte(); err != nil {
if err == io.EOF {
if string(data) != "" {
fmt.Printf("reader-%d: %s\n", i, data)
}
db.rcond.Wait()
data = data[:0]
continue
}
}
data = append(data, d)
}
}
func (db *MyDataBucket) Put(d []byte) (int, error) {
db.gmutex.Lock()
defer db.gmutex.Unlock()
//写入一个数据块
n, err := db.br.Write(d)
db.rcond.Broadcast()
return n, err
}
func main() {
db := NewDataBucket()
go db.Read(1)
go db.Read(2)
for i := 0; i < 10; i++ {
go func(i int) {
d := fmt.Sprintf("data-%d", i)
db.Put([]byte(d))
}(i)
time.Sleep(100 * time.Millisecond)
}
}
sync.WaitGroup
概述
- 阻塞主线程,直到所有的goroutine执行完成
Api
- Add( ) // 计时器加n
- Done( ) // 计时器减1
- Wait( ) // 线程阻塞,直到计时器为0
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go f(i, &wg)
}
wg.Wait()
}
// 一定要通过指针传值,不然进程会进入死锁状态
func f(i int, wg *sync.WaitGroup) {
fmt.Println(i)
wg.Done()
}
This post is licensed under CC BY 4.0 by the author.