« skynet 消息分发及服务调度的新设计 | 返回首页 | 基于引用计数的对象生命期管理 »

skynet 消息队列的新设计(接上文)

接前一篇文 ,谈谈 skynet 消息队列的一些新想法。

之前谈到,每个服务的消息接收队列可以是定长的,且不必太长。因为正常运行中,每个服务都应该尽量消化掉需要处理的消息,否则会预示着某种上层设计的问题。

但是,在接收队列满的时候直接丢掉消息显然是不合理的。那意味着必须有更健全的错误传播机制,让发送失败方可以出错而中断业务。允许发送消息出错可能使上层结构设计更难。

让发送方阻塞在 skynet 中显然也不是个好方案。因为 skynet 的服务是允许阻塞时重入执行另一条新 session 的,这是和 erlang 最大的不同。这可以让单个 lua vm 的性价比更高,可以在要需要的时候,做共享状态,而不必全部业务都通过相对低效的消息通讯来完成;但其负面代价是重入会引发一些隐讳的 bug 。很多已有的 skynet 项目都依赖 send 消息不阻塞这点来保证逻辑正确,不能轻易修改。

我的解决方案是给每个服务再做一组发送队列。最接收方忙的时候,把待发消息放在自己这里的发送队列中。这样就可以由框架来确保消息都能正确的依次发送(这里不保证目的地不同的消息的先后次序,但保证目的地相同的消息次序)。


这两天,我仔细考虑了这个方案。从单一接收队列改为一个接收队列和若干发送队列。实现复杂度一定是增加了,但并没有达到不可接受的程度。这个方案并不难想到,也不难做到,但 3 年前实现 skynet 时我并没有这么做,一定是有些原因吧。

其中一个核心问题是,处理 IO 和 timer 的线程不同于普通的服务,它们是和系统打交道的。也按类似的机制做就不太合理了。它们要做的是尽量匀速的接收外部消息,并马上转发到 skynet 内部,它们是面对几乎所有 skynet 内部服务高频工作的。不应该有复杂的调度机制。

ps. IO 和 timer 线程也不是普通的 skynet 服务。我曾经把 IO 做成一个独立服务,后来又放弃了这个做法而移到核心中去。

一个变通的方法是让两个特殊的定制服务和 IO 及 timer 线程对 1 对 1 对接。对接的通道是无限长的,由于只有一个读取方一个接收方,这个消息队列在实现时也可以有针对性。


另一个问题是,在服务退出时如果处理遗留消息。

我们必须让未处理的请求被妥善回应。在 skynet 1.0 中,这个步骤是在上层完成的。lua 层会在 exit 时遍历所有没有回应的请求,发送一个 error 消息。

而当发送消息不再保证送达时,问题就变得有点棘手。在新方案中,未发出的消息是暂存在自己的发送队列中的,一旦自己都不在,谁来保证这些消息送达呢?

同样的问题还有:当暂发请求真正发出的时候,对方已经退出,需要重新产生一个错误回应。

我的解决方案是:首先在底层就严格区分请求/回应/单向推送/错误传播 这些消息,可以直接在底层做出合理处理;然后让服务的销毁也严格放在唯一一个服务中进行。在销毁过程中,收集待发消息队列,采集这些消息,然后将需要处理的放在自己这里,之后持续发送。

关于服务的创建和销毁部分。我有许多新想法,打算另开一篇 blog 来记录。


总结一下,这篇主要谈消息队列的新设计。在消息队列方面,我计划按需定制三类队列。

第一,多写一读的固定长度并发队列。由于只有一个读者,且队列长度固定不变。所以在出队列的一端是不需要任何锁的。锁只放在进队列的一端。但这里并不需要无锁设计来减少 spin lock 的盲等。因为任何一个写入者碰到队列满或队列有别的写入者正在操作,都可以一致视为队列忙,不必反复重试。它只需要把待发数据放在自己的发送队列即可。

第二,读写全在一起的无并发队列。用来暂存在接收方忙时的待发数据(还会用于服务销毁时收集待办业务)。这个队列可以在 OOM 未发生前无限增长。因为无并发情况,很容易实现正确。每个服务将配备多个这样的队列。这个待发队列不会用得太多,所以默认长度会很短,多个队列也不需要用 hash 表索引,简单一个数组即可。需要用时直接 O(n) 遍历(n 不会太大,因为过载可能的服务并不会太多,多的话系统根本不能正常工作)。

第三,专用于 IO 线程及 timer 线程和 skynet 内部服务对接的一读一写队列(管道)。写入方和读取方分属不同线程。这个队列有可能按需增长。这个可以用一个读写锁来保证并发正确。写锁只发生在写入队列满时,读取方每次读操作都需申请读锁。注意,这里队列满的扩展队列操作同时只有一个操作者,所以不必锁住队列后再做复制扩展队列空间;而可以扩展前先对老队列做一个副本并行处理,再用写锁做一个新老指针交换。


最后,昨天我实在忍不住光在脑子里想,刷了一天的代码(大约 1000 行完全新写的)。

好吧,我食言了,等不到年后了。skynet 2.0 重写计划开始了一天。新代码暂时只能编译通过,还不完整,不能运行。我暂放在一个临时仓库里,等合适的时候再合并到 skynet 主仓库的 2.0 分支上。

有兴趣的同学可以帮忙 review 一下,印证一下这两篇 blog 的想法的具体实现。

Comments

不考虑多线程io么

有两点不明
1. “多写一读的固定长度并发队列。由于只有一个读者,且队列长度固定不变。所以在出队列的一端是不需要任何锁的。”出队列不需要任何锁应该是太乐观了吧,怎么知道队列是不是满了,队列是不是空了?
2. “而可以扩展前先对老队列做一个副本并行处理,再用写锁做一个新老指针交换。”这个写法很像lock-free只是少了CAS,但是有个问题,在并行处理以后发现老队列变了咋办?

如果只是为了在框架内解决一个服务退出后它所遗留的请求消息会让其他服务过载的问题,似乎没必要完全改变已有的底层设计吧。让一个即将退出的服务把这个事件告诉相关的服务,相关服务将自身消息队列做下清理即可。这样底层只需新增一个服务可以优先处理特殊消息的功能即可。

Post a comment

非这个主题相关的留言请到:留言本