« 开发笔记(20) : 交易系统 | 返回首页 | Lua 5.2 如何实现 C 调用中的 Continuation »

开发笔记(21) : 无锁消息队列

最近三周按计划在做第一里程碑的发布工作,几乎所有新特性都冻结了。大家都在改 bug 和完善细节。

服务器的性能还有不小的问题,压力测试的结果不能满意。原本我希望可以轻松实现 40 人对 40 人的战场。现在看起来在目前台式机上还有困难,虽然换上高配置的服务器可以达到,但会增加不少成本。我们决定动手做一些优化。

固然过早优化的必要性不大,但早期发现的性能问题有可能是设计原因造成的。尽早发现设计中的错误,早点改正,比后期在低层次优化要省力的多。

我们采用的是大量进程(非 OS 进程,这里指 Erlang 进程)协作工作的模式。可以充分利用多核的优势,但却对内部通讯的数据交换产生的极大的压力。初步发现在多人对战时,90% 的内部通讯包是状态包的同步 。虽然我们的框架实现,会让单台机器上的 Erlang 进程间通讯,变成同一进程内的简单函数参数传递。但数据列集和内存复制还是会带来一些负荷。

目前还不太确定内部数据包传递的边际成本到底是否影响到整体性能,我打算从优化这一部分的设计和实现入手,确定问题所在。

状态同步,简单说就是一个玩家的 Agent 在做一个动作时,它需要把这个行为通知所有在虚拟场景中他附近的玩家。当很多人(超过 50 人)在一起时,就有大量的数据包需要广播出去。我们目前的做法是基于这样一个假设:服务器内部数据包的传递非常廉价。广播包比逐个发送更加廉价,这是因为,单机内部广播,可以避免大量的数据复制。所以,在同一张地图上,我们会简单的把任意一个 Agent 的状态改变信息广播给同张地图的所有其他人。这样就不需要动态维护分组信息。当每个 Agent 收到广播包后,再根据自身的逻辑进行过滤,再发给对应的客户端。

或许我们需要一个更为高效的广播方案,避免一些无谓的包复制。

我想实现这样一个数据结构:

有很多写入者,可以并发的向一个消息队列写入信息包。同时有很多读取者,可以并发的从这个队列读取这些信息包。

假设内存无限大,队列可以无限长。那么队列无需清除永远不会被读到的数据。写指针可以被共享,任何人写入都向前移动写指针(队列尾)即可。每个读取者各自维护一个读指针(队列头),可以并发读取,互不影响。

此数据结构可以被简单的实现,并做到无锁的并发安全。

我是这样实现的:

用两块连续内存,一个保存索引 INDEX_QUEUE,一个保存信息包的实际数据 DATA_QUEUE

对于进入队列,大概是这样的流程:

  1. OFFSET = SYNC_FETCH_AND_ADD(DATA_QUEUE.TAIL , SIZE)
  2. WRITE_DATA(DATA_QUEUE , OFFSET, DATA , SIZE)
  3. INDEX = SYNC_FETCH_AND_ADD(INDEX_QUEUE.WTAIL, 1)
  4. INDEX_QUEUE[INDEX] = OFFSET
  5. WHILE NOT SYNC_COMPARE_AND_SWAP(INDEX_QUEUE.RTAIL , INDEX, INDEX+1)

下面解释一下:

第一步,我们增加数据队列的尾指针,空出足够的空间。第二步将数据写入准备好的内存中。第一步的原子性保证了第二步的数据准备工作可以并行进行。

在读队列中的数据时,我们依据的是索引队列的指针位置,而不是数据队列。这样,不会读到没有准备好的数据。

索引队列有两个尾指针,以保证第四步的安全写入。

第三步,原子递增索引队列的 WTAIL ,分配出一个索引空间,供第四步原子写入索引。

第五步,将 RTAIL 递增。注意这里,不能简单的加一。而是要在 WTAIL 的原有值的基础上加一。这是因为,要避免执行第四步时同时读队列。而且我们必须保证第四五步并发时,较小的 INDEX 值先被加一推进 RTAIL 。

读队列的流程简单的多,每个线程独立维护各自的队列头指针,所以不再需要原子操作,每次仅需要从队列头读到 RTAIL 处即可。


由于队列内存不可能无限大,所以我们需要实现成循环队列,在内存块满的时候,回转一下。这会使实际的实现代码比上面的伪代码更复杂一些。不过还是可以实现成无锁的数据结构。

在我们的实际应用中,队列维护者不需要了解所有的读取者。每个队列的订阅者,在订阅的那一刻,重置读指针到队列尾。然后以一定的频率(目前是 0.1s),每次都处理完队列里的所有数据。而队列按照估计值,大约可以保存至少 1 秒甚至更多的数据包(大约有 3 万个数据包的余量,即使有上千人拥挤在同一个地图上,也需要花上远大于 0.1 秒的很长时间才会填满)。

这样,每个 Agent 需要向地图广播数据的时候,只需要把数据压入地图广播消息队列。然后定期从广播消息队列读取这个心跳的所有广播包就可以了。它甚至不需要从队列中复制出广播包来做过滤和转发处理,而用指针指向队列上的数据区直接处理就够了。


我只花了半天时间,200 多行 C + Lua 封装代码就实现了这个数据结构。但是多线程程序果然到处都是坑,又花了一天时间才解决掉其中的并行问题的 bug 。

我们成功的把内部通讯包降低到原有的 10% 。整体性能有所提升(大约 10% 到 20%),没有我预想的效果那么明显。接下来还需要更多的统计信息,找到下一个热点。

Comments

看到這篇 我想這系列文應該可以作個補充
http://ifeve.com/disruptor/

这样的实现,写线程可能会跳过未使用的Index,浪费内存

看着感觉有点像ring buffer的理念啊

道上听说一个500行的lock-free代码要琢磨一年时间才能排除所有的代码,云风大人一天时间就搞定了.让我甚是怀疑道上的消息啊!

40v40就有这么大压力吗?
最近听说御龙在天的同场景人数可以达到数千人(不排除作假),已经不是一个数量级了,非常恐怖。不知云风大哥有没有关注过?

首先感谢之前云风大人给我的解答, 谢谢. 这里也希望帮助你一下.如果有效果请回我邮件通知下就好:) 5765389@163.com
双线程无锁,CAS 的东西我之前尝试过. 也写了和你类似的实现方案, 不过效果不敢说好. 后来我发现了一个很不错而且效率很高的双线程数据传递方案, 很简单就是下面两个函数.

1 PostQueuedCompletionStatus
2 GetQueuedCompletionStatus
用过IOCP 的人一定比较熟悉, 尤其是后一个, 而这两个函数配起来, 用来在双线程间传递数据的效率经过我的测试, 比CRITICAL_SECTION 高一倍以上.
您可以试下, 方法就是先创建两个完成端口, 分别在两个线程中 GetQueuedCompletionStatus
然后消息传递使用
PostQueuedCompletionStatus

同屏优化的话,建议给各种不同类型的包,都定义一个广播级别,当同屏人数达到一定程度时,广播级别低的包,可以考虑屏蔽掉。
最简单的例子就是蓝的广播,同屏战斗人数达到某种程度时,蓝和血的广播,占的比重非常大,但是玩家往往在人多的时候,并不关注周围人的蓝量,这个时候,可以选择屏蔽。
体验更好的另一种方式,是不屏蔽,但是分级后,各自有各自的广播频率。

想实现和魔兽奥山一样的40vs40战场?直接都连个战场服务器算了,就算内网之间服务器通信的话至少也有几十毫秒的延迟。无锁队列tbb里不是有么,自己实现性能肯定比不上。

40vs40是想做成像魔兽奥山战场一样的战斗?感觉做个独立服务器让这2组玩家直接连上去算了,服务器越多,同步开销越大而且延迟高,就算是内网传输也有几十毫秒的发送耗时,这样做有点得不偿失.

感觉云风总是在纠结一些蛋疼问题,个人观点做产品还是用成熟技术。

难道是被zeromq的线程间无锁通信inspire的么?:)

@Cloud "当每个 Agent 收到广播包后,再根据自身的逻辑进行过滤,再发给对应的客户端。"

如果 "过滤之后再广播给Agent,然后直接发给客户端" 会不会减少进程内大量的通信呢。维护分组信息应该只是事件性的触发,并不会频繁发生吧(纯属猜想,不了解具体维护分组的逻辑和难点)

@hubugui

有时间一定去……

前面你说的挺对。另外窃以为这种设计用while确保入队尽量是原子操作,而不是干了一半等下一个100毫秒……

又学习了,我的理解是:锁加到了索引了,真正读写数据的时候就没有锁的限制了。
@hubugui “去掉While意味着只允许拥有最小Index的线程才会更新RTAIL” 这个很有理!

40人 VS 40人的数据处理就这么复杂了,像WOW中几百人屠城时的处理跟这个相比,有什么区别? 请解释一下。不是很明白。

不是很明白,我怎么感觉1,2,3,4这些是按顺序走下来的,只有5读的时候是可以任意时候执行的呢.

@胡不归
我一开始也没理解cas,哈哈.

怪不得连云大这个老手都搞了1天,并发果然难搞!

@zelor
又看了1遍,发现我理解的“去掉While。。。待下个包子入队,RTAIL会被刷新,所以逻辑上没错”是错误的,因为此后INDEX始终比RTAIL大,无法正确刷新。我居然以为SYNC_COMPARE_AND_SWAP含有SYNC_LESS_EQUAL_AND_SWAP的语义。

@windmeup
这时等效于直接赋值,RTAIL = INDEX+1

@胡不归

我又倒戈了,孤立的看10个的确如你所说.

抛开这个伪代码不说,你可以这样想:10个可能发生只吃一个的情况,100个同样也可能,10000个同样也可能,1000000个同样也可能,100000000000000个同样也可能....

所以这个while绝对不能去掉

@Cloud

云大,能不能增加编辑自己的留言的功能啊?

@胡不归

但这个伪代码的写法不能去while.因为不全吃了的话,index_queue的r_tail和w_tail会"错位"

需要稍微改造下数据结构

@胡不归

去掉while也不一定只吃1个,最"差"情况吃一个,最好情况全吃了.

@胡不归

我理解你说的每次吃一个的意思了,去掉while可以.

见笑

@胡不归

while是必须的:假设只有两个写线程A和B.A先写数据,但B先写完数据,这时候通过while等待A数据写完

无锁队列不用自己实现吧,为什么不直接用disruptor呢?

我发现云风跟beyone家驹有点像~

是多线程程序果然到处都是坑,又花了一天时间才解决掉其中的并行问题的 bug

老手都这么说....多线程伤不起啊...

@Cloud
加While形式上显得”对称“,虽然去掉也没啥成本。这取决于应用场景的测算,目前可忽略。

@zelor
哈哈,你玩深潜啊,到Cloud宝地才冒泡,有空带阿妹来帝都耍撒。

去掉While意味着只允许拥有最小Index的线程才会更新RTAIL,假设10个线程并发,Index从1~10,大家执行完第5步后,RTAIL等于2,而如果有While,此时RTAIL等于11。延伸下,明明10个包子,去掉While使得读取线程在本次0.1s的轮询周期内只能吃1个,称之为“饥饿”,待下个包子入队,RTAIL会被刷新,所以逻辑上没错。

上面仅仅是我的理解,请@Cloud释放官方解释吧。

昨天回去想了想.

假设第2步的执行不是瞬时完成的,个人认为更合适的顺序为:1,3,4,2,5

是不是搞的略复杂啊...都上无锁编程了...
另外,fetch and add的并发度是2,再多的并发的话,就只能等了.

@hubugui

为什么说:去掉while会出现0.1s以上的“饥饿”?

此处去掉while恐怕不太合适吧……

哇,如果云大这款游戏有测试那天,我也有时间一定去测试哇……~

当数据需要被推送的目标的数量远远大于真正关心数据的目标的数量时, 使用拉取模式会更高效.

@hubugui
居然在这里遇见你,握手。

@hubugui

0.1s 在实测中是可以接受的。 玩家获得状态同步反馈的内部延迟平均是 50ms ,比通常的网络外部延迟要小。

根据需要,可以把 100ms 的频率调整到更小。

@middleware
你的blog不错啊,收藏啦。

@tony
赞同类似ringbuffer。RTAIL表示队列中最后1个存储单元的位置,WTAIL是个马甲。

1.之所以要RTAIL是因为第3步和第4步之间可能发生读写并发,导致第4步未执行,读取线程却获得INDEX_QUEUE[INDEX]值。
2.如果是循环队列,第一轮循环[0,RTAIL]表示顺序可读数据,之后则是[RTAIL,MAX]和[0,RTAIL]。
3.第5步挺巧妙,N个并发线程会按照index大小从低到高的顺序依次退出循环。猜测这种轮询机制,在并发线程高的环境需测算验证,但去掉while会出现0.1s以上的“饥饿”,使得玩家动作更新不流畅。

@Cloud

帮我把关于本文的回复都删掉吧.我发现自己彻底没搞明白什么叫并发.多谢!!!!!!!!!

@Cloud

我似乎看懂了,之前理解错误.见笑了.

@Cloud
我们这样想:假设:A先完成第1步,然后A的第2步永远也玩不了,这时候B开始写,并把1到5步都完成了.这时候怎么确定RTAIL?

@windmeup

第五步来确保

@Cloud
云大回复,太激动了.
我的疑问是如果有A,B两个人写,A先完成了第1步,B先完成了第2步,那么RTAIL该如何确定?

其实我觉得「无锁」这个词并不准确。应该都统一为无 mutex。毕竟锁还是要有的。问一下,这个用的是 bus-lock 还是 memory barrier?

让我想起了java的Disruptor 其中的ringbuffer

@windmeup

正文我没有写错. 就是第四步和读线程的并发问题.

"...这是因为,第四步有可能被并发..."是不是应该为"...这是因为,第二步有可能并发..."

学习了

以前弄了个无锁队列,好多bug。这个无锁数据,github了?

沙发

Post a comment

非这个主题相关的留言请到:留言本