Golang的sync包的几个常用的功能

学习下sync包的几个常用的包的使用和使用方法

sync.Map

// readonly

// readOnly 是一个不可变的结构,以原子方式存储在Map.read中
type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 如果一些key不在read,但是在dirty中则为true
}
type entry struct {
	// p 指向实际存储的接口值
	// p有三种状态
	// 如果 p == nil, 条目已经被删除,此时 m.dirty 等于 nil.
	//
	// 如果 p == expunged, 条目已经被删除, 此时m.dirty不等于nil,并且 m.dirty中不存在
	//
	// 是一个存在的值 存在read,如果m.dirty不等于nil 也存在与dirty中
	//
	// 被删除时,通过原子操作替换entry为nil,等到下一次dirty map创建时会替换nil为expungef 

	p unsafe.Pointer // *interface{}
}
// expunged是一个任意指针,用来标记已从dirty中删除的条目
var expunged = unsafe.Pointer(new(interface{}))

Load

查找指定的key

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	// 先从无锁的read中读取
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 如果read不存在需要的key,但是amended为true表明dirty中有read不存在的key时,则开始尝试从dirty中读取
	if !ok && read.amended {
		m.mu.Lock()
		// 再次检测防止在加锁的时候dirty提升为read
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// 无论是否存在增加misses计数
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

// 从entry中atomic.Loadpointer实际的接口值
func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

// 自增
func (m *Map) missLocked() {
	m.misses++
	// 如果misses小于dirty长度 返回
	if m.misses < len(m.dirty) {
		return
	}
	// 将dirty提升为read,然后dirty设置为nil misses设置为0
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

Store

存储一个键值

func (m *Map) Store(key, value interface{}) {
	// 如果key存在于read中,则直接更改
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// read中存在该key,但是p已经在dirty被删除了
			// 在dirty中存储该key

			m.dirty[key] = e
		}
		// read 中存在key 并且没有删除 则直接更新
		// read和dirty是共用entry的 所以直接更新内存地址的值就可以
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		// 如果read中不存在该key 且p!=expunged,但是在dirty中存在则直接更新该key,此时read还是没有该key
		e.storeLocked(&value)
	} else {
		if !read.amended { // m.dirty没有新的数据
			m.dirtyLocked() // dirty为nil的话从read中复制未删除的数据
			m.read.Store(readOnly{m: read.m, amended: true}) // 设置 amended 表示dirty存在read中不存在的值
		}
		// 将这个entry加入到m.dirty
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

// 如果条目没有被标记删除,将key的值修改
// 如果条目被标记为expunged则返回false
func (e *entry) tryStore(i *interface{}) bool {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return false
	}
	for {
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
	}
}

// 如果dirty为nil,把read中的未删除数据从read中复制过去
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		// 将所有为nil的p修改为expunged
		// 拷贝所有不为expunged的值
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		// 如果p == nil 设置为punged后退出 不拷贝此数据
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

Delete

删除

func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	// 在read中查找
	if !ok && read.amended {
		// 不存在并且dirty存在read没有的key
		// 再次加锁检查
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		// 直接从dirty中删除
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		e.delete()
	}
}

// 将key的值设置nil
func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

sync.Map还提供了LoadOrStore/Range操作,但没有提供Len()方法,如果需要统计有效值只能先提升dirty,再遍历read,Range方法已经做了

适合key值相对固定,读多写少的操作,因为新建key需要频繁提升dirty,

sync.map操作图解

sync.Pool

不可以用来保存函数状态的对象,例如socket,例如socket连接
常用来保存一些不保存状态的对象,例如在gin这个框架中,Context对象就是使用pool对象池复用的,每次请求完成后,将Context对象放入对象池中,可以减少对象的创建,复用对象,减轻GC的压力
pool在每次GC前会清理掉

Pool的结构

type Pool struct {
	noCopy noCopy
	// 指向一个数组 数组的元素类型是poolLocal
	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	// 本地的数组 大小等于P
	localSize uintptr        // size of the local array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	// 存储一个对象读取不需要加锁
	private interface{} 
	// 是一个数组 读写需要加锁
	shared  []interface{}
	// 对shared加锁
	Mutex             
}

Get操作

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	// 先返回本地P上的pool
	l := p.pin()
	// 私有的
	x := l.private
	l.private = nil
	runtime_procUnpin()
	if x == nil {
		// 从shared中取
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
			x = p.getSlow()
		}
	}
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	// 没有找到 新建
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

Put操作

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	// 拿到当前P对应的pool
	l := p.pin()
	// 如果私有的位置为空则放在私有的位置
	if l.private == nil {
		// 放置在私有位置
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	// 存放在共享的区域中
	if x != nil {
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
	if race.Enabled {
		race.Enable()
	}
}

Mutex

先使用cas尝试获取锁,如果获取到就直接返回,如果没有获取到,然后就是使用自旋锁获取,如果还是没有获取到就通过信号量让当前G

Crond

条件变量 主要用于多个goroutinue等待另外一个goroutinue的通知信息。