场景
为了提高性能,减少数据库的查询操作,一个Service维护着全部生命周期活跃的Foo
对象。Foo的定义如下:
struct Foo {
Id int64
RootId int64
}
内部使用map[int64]*Foo
数据结构。map的key就是Foo对象的ID。绝大部分的查询都是根据Foo的ID返回Foo对象,使用map确实高效。小部分场景是查询是查询RootId相同的一组Foo。这个通过遍历整个map来实现,现阶段的数据量级还不需要通过建立RootId索引来提高性能。除了查询之后,还有大量的插入和更新动作。最关键的是,有多个goroutine同时操作这个Service。
go的map对象不能并发读写,否则会触发panic. 标准库提供了线程安全的sync.Map
。但是,golang还不支持范型,因此这个sync.Map
的key和Value都是interface{}
。由使用者负责类型安全和类型转换。sync.Map
的插入和更新都是原子化操作,但遍历Range
不是。
后续这个Service还引入其他的状态Field,Service必须建立关键区。很自然想到标准库的Locker。不过Locker不支持同一个goroutine重复Lock(会导致死锁)。存在Service.A方法中调用Service.B方法,A和B都需要获取Lock。使用Locker需要小心处理重复Lock。
单线程方案
线程安全是因为多个线程并发操作资源,单线程不存在线程安全的问题。如果Service的操作都在同一个goroutine下,问题不就解决了么。性能还不弱,因为有Redis做榜样。这个Service也没有性能敏感的要求。
做一个类似消息循环,在一个goroutine循环响应资源操作请求。在这个条件下,可以放心使用类型安全的map
对象了。很自然,我们使用一个channel,然后在一个gorouting下for这个channel就行了。channel定义为chan func()
,处理各种操作。
接下来就是Service下的方法中对资源的操作,由func打包起来传入消息循环。但是类似重入Lock问题,重入channel也会导致死锁。需要让Service的方法了解当前goroutine是否是消息循环的goroutine。
标准库的context.Context
很适合携带这种信息。所以channel定位改为chan func(context.Context)
。提供两个函数携带和判断channel循环信息:
const (
loop_flag = "_in_loop"
)
func WithLoopContext(ctx context.Context) context.Context {
if ctx == nil {
ctx = context.Background()
}
if !IsLoopContext(ctx) {
ctx = setInLoopFlag(ctx)
}
return ctx
}
func setInLoopFlag(ctx context.Context) context.Context {
return context.WithValue(ctx, loop_flag, true)
}
func IsLoopContext(ctx context.Context) bool {
if v, ok := ctx.Value(loop_flag).(bool); ok {
return v
}
return false
}
Service的伪实现如下:
type Serivce struct {
foos map[int64]*Foo
// 省却其他状态成员
loopChan chan func(context.Context)
}
func NewService() *Service {
svc := &Service{
foos: make(map[int64]*Foo, 1024),
loopChan: make(chan func(context.Context), 16),
}
go svc.run() // 单独一个goroutine处理消息循环
return svc
}
func (s *Service) run() {
ctx := WithLoopContext(context.Background()) // 标记在消息循环中
for f := range s.loopChan {
f(ctx)
}
}
func (s *Service) A(ctx context.Context, id int64) (*Foo, error) {
if IsLoopContext(ctx) { // 如果已经在循环中,则自己处理资源
return s.B(ctx, id) // 可以安全的调用其他方法
} else {
var foo *Foo
var err error
done := make(chan bool)
defer close(done)
s.loopChan <- func(ctx context.Context) { // 在消息循环中重新调用自身
foo, err = s.A(ctx, id)
done <- true
}
<-done // 等待循环执行完
return foo, err
}
}
func (s *Service) B(ctx context.Context, id int64) (*Foo, error) {
if IsLoopContext(ctx) {
if foo, found := s.foos[id]; found {
// 继续处理foo
}
return foo, nil
} else {
var foo *Foo
var err error
done := make(chan bool)
defer close(done)
s.loopChan <- func(ctx context.Context) {
foo, err = s.A(ctx, id)
done <- true
}
<-done
return foo, err
}
}
后记
这个方案还是显得有点啰嗦,希望有更精炼的写法。