« 内存块对象的 Lua 封装 | 返回首页 | 游戏 UI 模块的选择 »

skynet 并发模型的一点改进思路

skynet 的内核是一个多线程的消息分发器。每个服务有一个消息队列,任何服务都可以向其它任意服务的消息队列投递消息,而每个服务只可以读自己的消息队列,并处理其中的消息。

目前的工作原理是,在任意消息队列不为空的那一刻,将该消息队列关联的服务对象放在一个全局队列中。框架启动固定数量的工作线程,每个工作线程分头从全局队列中获取一个服务对象,并从关联的消息队列中获取若干条消息,顺序调用服务设置的回调函数。如果处理完后消息队列仍不为空,则将服务对象重新放回全局队列。

这样,就完成了尽量多(远超过工作线程数量)的并发服务的调度问题。

我这些年一直在考虑这个模型可否有改进之处。能不能设计得更简单,却还能在简化设计的基础上进一步提高并发性。同时,还可以更好的处理消息队列过载问题。

想要提高并发处理能力,主要是减少 CPU 在锁上空转浪费的计算力。

目前的设计中,全局队列是一个单点,所有工作线程都可能并发读写这个全局队列。为了减少锁碰撞的概率,我已经做了不少的优化,比如为不同的工作线程配置了不同的策略,有的会一次尽可能多的处理单个服务中的消息;有的在处理完一个服务中的单条消息后,就立刻切换到下一个服务。这样,每个工作线程去获取锁的频率就不太相同,同时,任务繁重的服务也得以尽量在同一个工作线程多做一些事情,而不必频繁进出全局队列。

但这个优化并没有从根本上改进设计。

另一个问题是,每个服务的消息队列是多写一读模式。只有唯一的一个读取者,也就是关联服务;却有众多潜在的写入者。比如 log 服务,所有其它服务都可能向它写入消息。它的队列的锁的碰撞概率就很高。

那么,有什么改进的空间呢?


我设想,以上的消息队列均可简化为一读一写,即只有严格意义上的一个写入者和一个读取者。

我们可以设置一个足够大的固定长度的全局消息队列,当服务 A 向服务 B 发送消息时,它是将消息投递到这个全局消息队列中。因为我们的工作线程数量是固定的,这个全局消息队列的内部实现就可以按工作线程的固定数量分解成同样数量的子结构。每个工作线程都只写入关联的子结构,这样就只存在唯一的写入者。而全局消息队列当然只分配一个独立的工作线程去读取和转发。这个转发线程就是整个全局消息队列的唯一读取者,同时、它还是所有服务私有消息队列的唯一写入者。

虽然比旧设计多了一步转发工作,但转发的只是数据指针,且业务非常简单,设置未必会占据满一个 cpu 核心。它或许会增加一些消息投递的延迟,但一定能在整体上增加 cpu 的利用率。一个并发队列的实现只有唯一的读者和写者时,首尾指针都不再需要锁就可以实现了。

这个转发线程,同样还可以承担工作线程的调度工作,统一把任务分发给工作线程上去。这样也同时避免了碰撞。


再说说过载问题。

因为旧设计中,每个服务的消息队列是无限长的,除非发生 oom ,否则投递消息总能成功。这简化了业务层的实现,不用考虑消息投递阻塞的例外处理。但同时也带来了麻烦。如果业务层不妥善处理,消息队列的过载极易产生雪崩效应。现在的设计中,只提供了少许基础设置来判断消息阻塞是否发生(可以获取自身的消息队列长度),依赖服务自身想办法解决问题。

我想,如果做了上面的消息转发,以及工作线程统一调度的改造后,我们可以更好的帮助业务层解决消息过载,尽可能地避免雪崩效应。

当我们发现任何服务的消息队列太长,那么就可以暂停所有向这个服务消息队列的投递行为。也就是,不再从全局消息队列转发消息到服务消息队列,而是暂存起来。(在实现细节上,可以在消息发生过载时服务队列加一个递增的版本号,每次转发都校验这个版本号,决定消息是直接转发还是暂存)暂存队列是转发服务的私有数据结构,没有并发的问题。

同时,因为我们是统一调度,所以还可以提高过载服务的处理优先级。专门安排一个工作线程处理它(如果多个服务过载,也可以分别安排到不同的工作线程上)。工作线程只要在处理服务消息,那么必定会逐步消化服务的消息队列。当情况缓解,再将暂存数据转发过去。

如此,每个服务的消息队列都可以实现成定长结构,进一步简化实现。


btw, 工作线程的任务调度本身也可以通过消息队列通讯来完成。每个工作线程有一对自己和调度线程通讯的消息队列,用来处理调度任务。调度线程平常只需要轮流将服务对象 id 发送到每个工作线程,工作线程每完成一个任务都把确认信息发回。如果发现完成了一步任务后,自己的工作队列还很长,也可以取出任务而不执行,直接发回去,方便调度线程将其分配给别的工作线程。

Comments

这里可以考虑一下系统整体拓扑的复杂度:

如果拓扑过于复杂, 确实使用一个中心节点进行解耦是合适的。

但是如果拓扑不是太复杂的话, 其实可以做成 一写多读 队列。 每个服务Produce消息到自己的队列中, 并且每个服务去 Consume 其关心的服务的队列。

确实是个好办法

必须要有个基准测试 才能比较客观校验设计的好与不好吧

按顺序排除处理

为什么乱序一般不会发生:

因为当一个工作线程处理一个服务的时候,这个时候它发向分发地的消息是有序的。当它切换到下一个服务时,上一个服务则通过通讯管道回到了分发地。它被下一次再激活的次序是非常晚的。

消息回到分发地和服务id 回到分发地走的是两个通道,这样才造成了处理时序的不同。但是,因为分发地不太可能在处理某个通道上有不同寻常的负荷,所以我认为次序一般不会乱。

我重新考虑这个问题时发现,如果上面这两个通道合并成一个的话(因为收发端都是相同的,完全应该合并)。时序是严格保证的:因为,如果消息没转发完,相关联的服务就一直处于挂起状态,也就不会产生新的消息。

当然,由于有过载处理,消息版本号依然得有。

@wj

我认为版本号就可以解决这个问题,同一个服务向另一个服务发出的消息都有一个连续递增的序号。这样,收取方虽可能以乱序收到,但可以在自己的私有结构里重整。

因为分发的地方并没有什么潜在的阻塞位置,工作线程数量也是固定的,所以乱序发生的时候也不太会出现非常恶劣的情况。(某个工作线程饿死,无法取出转发消息)

基于此,重整算法并不会太复杂,用几个有限的桶就可以实现。如果万一陷入特别混乱的情况:比如在一个工作线程中转发了大量消息,但是有个前置消息在另一个暂时没有处理到的工作线程中。

那么当内部重排桶满了后,就会进入过载状态。而过载处理本身是发生在分发处的唯一私有结构中,不存在并发问题。可以仔细地单独处理。

@threezhiwang

对于抢占格子的实现,目前我的做法是每列上有一个fence,线程atomic_exchange到这个fence后,就可以执行这列上的任意一个有任务的格子,执行完再释放fence回到线程池。这样就不会有多于一个线程执行同一个服务,即对于任意一个格子,不会同时有多个读者。

在产生新任务的时候,只写入当前线程对应行、目标任务服务所在列的格子中,是因为向同一个服务提交任务的线程会有多个,把它们按线程ID分开就可以避免写入冲突。

@threezhiwang

那不就是open mp的基本处理逻辑么
那某个线程处理完当前行了就只能等待了
如果全局加锁,起码是可以继续参与处理其他事务

===

是的,虽然这种设计还可以通过线程锁来抢其他线程的队列,但是效率并不高。

因此在我提到的链接里,实现了进一步的思路,即线程池中的线程互斥性地抢占服务执行(即哪个格子有任务就抢哪个)。在执行的过程中,如果要发起新任务,只将新任务放入当前线程ID所在行,目标任务所在服务列的格子中。这样,对于同一个格子,就不会有多于一个线程在读,也不会有多于一个线程在写。

在这样的设计下,抢占队列所造成的竞争远远小于把这些子任务全都扔到一个全局队列中。

我想说的服务的有序性主要是指:每一个工作线程在全局队列里面都有一个私有的子结构来保存该工作线程执行过程中产生的消息,但是如果多个工作线程在执行过程中,都给某个服务发送消息(比如日志服务),那就会造成发给同一个服务的消息被存放在不同的与工作线程有关的子结构中间,在调度线程进行转发消息的时候,作为服务的消息队列的生产者,怎么保证每一个服务的消息的有序,总不能把整个全局队列加锁,然后根据服务的id遍历所有的与工作线程相关的子结构,把所有的发送给指定服务的消息依次取出,如果这样的话,肯定会影响消息发送的并发度。

@wj

在过载处理之后的有序性需要用一个版本号保证,blog 中有写,但没有展开。

@dwing
一个线程读一个线程写,链表结构的并发队列只要保证头尾指针非空就可以很容易实现, 比如创建并发队列初始就push一个初始节点(该节点永远作为队列的头节点),头尾指针指向它,后续的push操作通重置尾指针指向来串联新加入的消息节点,而pop操作则通过判断头节点的next指针是否为空来读取消息,如果为空,说明没有消息可读;如果不为空,说明有消息可读,于是读出头节点的next指针所指节点内的消息内容,然后让头节点的next指针指向头节点的next指针所指节点的next。

最近在考虑实现类似的东西。看了一些资料,在想是否可以采取 work stealing 的方式来进行调度,工作线程各自有一个 readyQ ,一个服务如果 spawn 新的服务,先放在当前工作线程的 readyQ 里面。每个工作线程都先消费自己的 readyQ ,如果没有新的工作了,再去窃取其他工作线程的 readQ 里面的服务来执行。这样就没有全局的队列了。另外服务本身也有了一定的局部性,比较固定在某个 cpu 上(前提是每个工作线程都应该绑定 cpu )。

数组结构固定长度的队列很容易实现无锁队列, 链表结构的队列也能实现无锁但难度大很多

在优化之后的方案中,我对于如果保证某个服务的消息的有序表示怀疑!

我的做法是,当前有N个物理线程,有M条服务。单个服务在任意时间只能在其中一条线程上执行。
那么制作一个N行M列的矩阵,每个格子存放一个单读单写的队列。
===
那不就是open mp的基本处理逻辑么
那某个线程处理完当前行了就只能等待了
如果全局加锁,起码是可以继续参与处理其他事务

为啥可以 远超过工作线程数量。不是每个工作线程串行处理服务里的消息么。

另外,暂存队列的设计可以简化之前的实现:服务队列固定长度、去掉锁、等等。

如果有唯一位置处理过载消息,就有更多手段处理了。

例如遵循以下规则:

1. 如果一个服务过载,那么它会在最高优先级下被工作线程处理。
2. 如果向一个过载服务发送消息,且自己不是 1 ,那么就不再被工作线程处理,直到过载服务缓和。
3. 如果一个服务因为 2 被挂起一段时间,恢复后会收到一条消息被告之过载发生。

这样就可以切断过载链条,避免雪崩效应;同时也可以自定义缓和策略。

当我们发现任何服务的消息队列太长,那么就可以暂停所有向这个服务消息队列的投递行为。也就是,不再从全局消息队列转发消息到服务消息队列,而是暂存起来。(在实现细节上,可以在消息发生过载时服务队列加一个递增的版本号,每次转发都校验这个版本号,决定消息是直接转发还是暂存)暂存队列是转发服务的私有数据结构,没有并发的问题。
------------------------------------------------
云大,我认为这样做只是转移了(转移到了"暂存队列")过载问题,并没有很好的解决问题。

感觉和我之前做的一个N*M的调度模型有点像。

我的做法是,当前有N个物理线程,有M条服务。单个服务在任意时间只能在其中一条线程上执行。
那么制作一个N行M列的矩阵,每个格子存放一个单读单写的队列。
每条服务分别去写入对应列的格子。
每线程分别去poll对应行的所有格子,取出任务并执行。

这样所有格子上都不会存在冲突。

https://github.com/paintdream/PaintsNowCore/blob/master/System/Kernel.h

Post a comment

非这个主题相关的留言请到:留言本