前言

最近在写项目,需要用到信号量等待一些资源完成,但是最多等待N毫秒。在看本文的正文之前,我们先来看下C语言里的实现方法。

在C语言里,有如下的API来实现带超时的信号量等待:

SYNOPSIS
  #include <pthread.h>
 
  int
  pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

然后在查看golang的document后,发现golang里并没有实现带超时的信号量,官方文档在这里。

原理

我的业务场景是这样的:我有一个缓存字典,当多个用户请求1个不存在的key时,只有1个请求会穿透到后端,而所有用户都要排队等这个请求完成,或者超时返回。

怎么实现呢?其实稍微想一想cond的原理,就能模拟一个带超时的cond出来。

在golang里,要同时实现”挂起等待”和”超时返回”,一般得用select case语法,一个case等待阻塞的资源,一个case等待一个timer,这一点是非常确定的。

原本阻塞的资源应该通过条件变量的机制来实现完成通知,既然这里决定用select case,那么自然想到用channel来代替这个完成通知。

接下来的问题就是,很多请求者并发来获取这个资源,但是资源还没有准备好,所以大家都要排队并挂起,等待资源完成,并且当资源完成后通知大家。

所以,这里很自然要为这个资源做一个队列,每个请求者创建一个chan,并将chan放到队列里,接着select case等待这个chan的通知。而另一端,资源完成后遍历队列,通知每个chan即可。

最后一个问题是,只有第一个请求者才能穿透请求到后端,而后续请求者不应该穿透重复的请求,这可以通过判断缓存里是否有这个key作为判定首次的条件,而标记位init来判断请求者是否应该排队。

我的场景

上面是思路,下面是我的业务场景实现。

func (cache *Cache) Get(key string, keyType int) *string {
 if keyType == KEY_TYPE_DOMAIN {
 key = "#" + key
 } else {
 key = "=" + key
 }
 
 cache.mutex.Lock()
 item, existed := cache.dict[key]
 if !existed {
 item = &cacheItem{}
 item.key = &key
 item.waitQueue = list.New()
 cache.dict[key] = item
 }
 cache.mutex.Unlock()
 
 conf := config.GetConfig()
 
 lastGet := getCurMs()
 
 item.mutex.Lock()
 item.lastGet = lastGet
 if item.init { // 已存在并且初始化
 defer item.mutex.Unlock()
 return item.value
 }
 
 // 未初始化,排队等待结果
 wait := waitItem{}
 wait.wait_chan = make(chan *string, 1)
 item.waitQueue.PushBack(&wait)
 item.mutex.Unlock()
 
 // 新增key, 启动goroutine获取初始值
 if !existed {
 go cache.initCacheItem(item, keyType)
 }
 
 timer := time.NewTimer(time.Duration(conf.Cache_waitTime) * time.Millisecond)
 
 var retval *string = nil
 
 // 等待初始化完成
 select {
 case retval = <- wait.wait_chan:
 case <- timer.C:
 }
 return retval
}

简述一下整个过程:

  • 首先锁字典,如果key不存在,说明我是第一个请求者,我会创建这个key对应的value,只不过init=false表示它正在初始化。最后,释放字典锁。
  • 接下来,锁住这个key,判断它已经初始化完成,那么直接返回value。否则,创建一个chan放入waitQueue等待队列。最后,释放key锁。
  • 接着,如果当前是第一个请求者,那么会穿透请求到后端(在一个独立的协程里去发起网络调用)。
  • 现在,创建一个用于超时的定时器。
  • 最后,无论当前是否是key的第一个请求者,还是初始化期间的并发请求者,它们都通过select case超时的等待结果完成。

在initCacheItem函数里,数据已获取成功

 // 一旦标记为init, 后续请求将不再操作waitQueue
 item.mutex.Lock()
 item.value = newValue
 item.init = true
 item.expire = expire
 item.mutex.Unlock()
 
 // 唤醒所有排队者
 waitQueue := item.waitQueue
 for elem := waitQueue.Front(); elem != nil; elem = waitQueue.Front() {
 wait := elem.Value.(*waitItem)
 wait.wait_chan <- newValue
 waitQueue.Remove(elem)
 }
  • 首先,锁住key,标记init=true,并赋值value,并释放锁。此后的请求,都可以立即返回,无需排队。
  • 之后,因为init=true已被标记,此刻再也有没有请求会修改waitQueue,所以无需加锁,直接遍历队列,通知其中的每个chan。

最后

这样就实现了带超时的条件变量效果,实际上我的场景是一个broadcast的cond例子,大家可以参照思路实现自己想要的效果,活学活用。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。

标签:
golang,信号量,golang,超时,信号量,超时

免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
狼山资源网 Copyright www.pvsay.com

评论“golang模拟实现带超时的信号量示例代码”

暂无“golang模拟实现带超时的信号量示例代码”评论...

《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线

暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。

艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。

《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。