Golang类消息循环的线程安全模式

场景

为了提高性能,减少数据库的查询操作,一个Service维护着全部生命周期活跃的Foo对象。Foo的定义如下:

struct Foo {
    Id int64
    RootId int64
}

内部使用map[int64]*Foo数据结构。map的key就是Foo对象的ID。绝大部分的查询都是根据Foo的ID返回Foo对象,使用map确实高效。小部分场景是查询是查询RootId相同的一组Foo。这个通过遍历整个map来实现,现阶段的数据量级还不需要通过建立RootId索引来提高性能。除了查询之后,还有大量的插入和更新动作。最关键的是,有多个goroutine同时操作这个Service。

go的map对象不能并发读写,否则会触发panic. 标准库提供了线程安全的sync.Map。但是,golang还不支持范型,因此这个sync.Map的key和Value都是interface{}。由使用者负责类型安全和类型转换。sync.Map的插入和更新都是原子化操作,但遍历Range不是。

后续这个Service还引入其他的状态Field,Service必须建立关键区。很自然想到标准库的Locker。不过Locker不支持同一个goroutine重复Lock(会导致死锁)。存在Service.A方法中调用Service.B方法,A和B都需要获取Lock。使用Locker需要小心处理重复Lock。

单线程方案

线程安全是因为多个线程并发操作资源,单线程不存在线程安全的问题。如果Service的操作都在同一个goroutine下,问题不就解决了么。性能还不弱,因为有Redis做榜样。这个Service也没有性能敏感的要求。

做一个类似消息循环,在一个goroutine循环响应资源操作请求。在这个条件下,可以放心使用类型安全的map对象了。很自然,我们使用一个channel,然后在一个gorouting下for这个channel就行了。channel定义为chan func(),处理各种操作。

接下来就是Service下的方法中对资源的操作,由func打包起来传入消息循环。但是类似重入Lock问题,重入channel也会导致死锁。需要让Service的方法了解当前goroutine是否是消息循环的goroutine。

标准库的context.Context很适合携带这种信息。所以channel定位改为chan func(context.Context)。提供两个函数携带和判断channel循环信息:

const (
	loop_flag = "_in_loop"
)

func WithLoopContext(ctx context.Context) context.Context {
	if ctx == nil {
		ctx = context.Background()
	}
	if !IsLoopContext(ctx) {
		ctx = setInLoopFlag(ctx)
	}
	return ctx
}

func setInLoopFlag(ctx context.Context) context.Context {
	return context.WithValue(ctx, loop_flag, true)
}

func IsLoopContext(ctx context.Context) bool {
	if v, ok := ctx.Value(loop_flag).(bool); ok {
		return v
	}
	return false
}

Service的伪实现如下:


type Serivce struct {
    foos map[int64]*Foo
    // 省却其他状态成员
    loopChan chan func(context.Context)
}
func NewService() *Service {
    svc := &Service{
        foos: make(map[int64]*Foo, 1024),
        loopChan: make(chan func(context.Context), 16),
    }
    go svc.run() // 单独一个goroutine处理消息循环
    return svc
}
func (s *Service) run() {
    ctx := WithLoopContext(context.Background()) // 标记在消息循环中
    for f := range s.loopChan {
        f(ctx)
    }
}
func (s *Service) A(ctx context.Context, id int64) (*Foo, error) {
    if IsLoopContext(ctx) { // 如果已经在循环中,则自己处理资源
        return s.B(ctx, id) // 可以安全的调用其他方法
    } else {
        var foo *Foo
        var err error
        done := make(chan bool)
        defer close(done)
        s.loopChan <- func(ctx context.Context) { // 在消息循环中重新调用自身
            foo, err = s.A(ctx, id)
            done <- true
        }
        <-done // 等待循环执行完
        return foo, err
    }
}

func (s *Service) B(ctx context.Context, id int64) (*Foo, error) {
    if IsLoopContext(ctx) {
        if foo, found := s.foos[id]; found {
			// 继续处理foo
		}
		return foo, nil
    } else {
        var foo *Foo
        var err error
        done := make(chan bool)
        defer close(done)
        s.loopChan <- func(ctx context.Context) {
            foo, err = s.A(ctx, id)
            done <- true
        }
        <-done
        return foo, err
    }
}

后记

这个方案还是显得有点啰嗦,希望有更精炼的写法。

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.