« June 2013 | Main | August 2013 »

July 30, 2013

给 skynet 添加 mongo driver

前段时间 实现了 mongo 的 lua driver ,做了一些基础工作后,由于工作比较忙就放下了。

这几天有同学告诉我他们在用这个了,并找到了一处 bug 。我还真是受宠若惊啊。一咬牙决定把这个东东整合到 skynet 中去。

本来觉得挺简单,做起来后发现必须把 lua-mongo 里的 socket 部分剥离开,才能替换成 skynet 的 socket 库

这个分离工作花了我一天的时间,结果虽然会损失一点性能,但是 mongo 的底层协议解析模块就可以独立出来。

和 skynet 的整合工作在做完这个步骤后,要轻松的多了。只需要把 socket 模块替换成 skynet 提供的即可。

注意:我们的项目暂时还没有使用 MongoDB ,所以我只实现了最基本的 mongo driver 的特性。需要有兴趣的同学帮我完善,或者,等我们的项目开始用 mongo 的话,总有一天我会自己把这些工作做完的。

有兴趣的同学可以直接 pull request 到 lua-mongo ,我会整合新功能,并合并到 skynet 中。

目前最希望完成的是短线自动重连, replica set ,以及 write concern 这三项特性。它们应该都可以在 lua 层完成。

July 25, 2013

coroutine 的回收利用

这几天在 lua 和 luajit 的邮件列表上有人讨论 coroutine 的再利用问题。

前几天有个用 skynet 的同学给我写了封邮件,说他的 skynet 服务在产生了 6 万次 timeout 后,内存上升到了 50M 直到 gc 才下降。

这些让我重新考虑 skynet 的消息处理模块。skynet 对每条消息的相应都产生了一个新的 coroutine ,这样才能在消息处理流程中,可以方便的切换出去让调度器调度。诸如 RPC/ socket 读写这些 api 才能在用起来看成是同步调用,却在实现上不阻塞线程。

读源码可知,lua 的 coroutine 非常轻量(luajit 的略重)。但依旧有一些代价。频繁的动态生成 coroutine 对象也会对 gc 造成一定的负担。所以我今天花了一点时间优化了这个问题。

简单说,就是用自己写的 co_create 函数替换掉 coroutine.create 来构建 coroutine 。在原来的主函数上包裹一层。主函数运行完后,抛出一个 EXIT 消息表示主函数运行完毕。并把自己放到池中。如果池中有可利用的旧 coroutine ,则可以传入新的主函数重新利用之。

为了简化设计,如果 coroutine 中抛出异常,就废弃掉这个 coroutine 不再重复利用。为了防止 coroutine 池引用了死对象,需要在主函数运行完后,把主函数引用清空,等待替换。

具体实现参见这个 patch

ps. coroutine poll 故意没实现成弱表,而是在相应 debug GC 消息时再主动清空。

July 22, 2013

增强了 skynet 的 socket 库

今天想在 skynet 下访问外部的 http 接口, 所以试了一下前几天新写的非阻塞 socket 库

由于测试的时候用的 URL 被墙了,所以发现了问题:connect 目前是阻塞模式的。连接一个有问题的服务器可能被阻塞很久。所以花了点时间把 connect 接口改成非阻塞的了。

我以前没处理过这种情况,所以也就按书上的写法写写,异步 connect 做起来挺麻烦的。如果有用 skynet 的同学这方面经验丰富,烦请用之前帮忙 review 一下新打的 patch 。

另外,为了处理 http 的需求,给 socket 库增加了新接口 readall ,可以读 socket 上的所有数据(直到对方 close 再返回)。

socket.read 和 socket.readline 也改进了,如果对方关闭了连接,那么最后一次调用除了返回 nil 外,也把最后发送过来的数据返回。这样,readline 就可以处理最后一行没有分割符的情况了。

另外一个小改进是针对上次 两步初始化 lua 服务 的。

我在 lua 服务启动模块中增加了出错通知,这样 launcher 服务就可以拿到启动的出错信息了。

当然,我觉得 Hive 里处理服务启动的方式更漂亮一些。就是独立出一个服务启动协议,把启动参数用消息传进去,目前这样用长字符串传递启动消息的方式非常容易出错。

可惜我们已经在目前的 skynet 上积累了太多代码,想改比较困难了。

July 04, 2013

回调还是消息队列

前几天在做 Hive 的 socket 库的时候, 遇到一个问题很典型,我记得不是第一次遇到了。值得记录一下。

socket 底层有一个 poll 的 api ,通过 epoll 或 kqueue 或 select 取得一系列的事件。用 lua 怎么封装它呢?

一个比较直接的想法是注入一个 callback function ,对于每个事件回调一个 lua 函数。但这容易引起许多复杂的问题。因为回调函数很不可控,内部可能抛出异常,也可能引起函数重入,或是做了一些你不喜欢去做的事情。

如果面面俱到,就会让原本 C/Lua 边界的性能问题更加恶化。

所以,我采用了方案二:把所有事件以及相关数据全部返回,让后续的 Lua 代码去处理 C 层获取的所有事件。

这个方案也容易造成性能问题,那就是临时构件复杂数据结构,对 Lua VM 的 GC 造成的压力。

为了优化这点,我主张从外部传入一个接收结构的 table ,也就是一个空的消息队列。

然后在 C 层使用 rawseti 把消息加进去。rawseti 是一个相当高效的 lua api 。因为 lua table 如果只有顺序的整数做 key 的话,性能和 C 数组无异。

因为每条消息上有不只一个数据,所以最终的数据结构是一个二维数组。为了不每次临时创建大量的 table ,可以缓存用过的 table 到另一个 table 中。

比如,上次收到 4 条消息,这次收到 5 条的话,就新创建一个 table 保存第 5 条消息关联的参数。下次又收到三条消息的话,就把过剩的两个 table 转移到 cache 中备用。当系统稳定运行后,相当于创建好了整个二维数组接收 C 层产生的数据,而不会临时构建 lua 表了。


再说一个不那么相关的话题:Lua 下实现一个简单的消息队列,我觉得如下简单的几条代码就可以了。

local q1 = {}
local q2 = {}

-- 产生消息只需要
table.insert(q1, msg)

-- 分发消息需要两层循环, 可以处理 dispatch 过程中产生的新消息
while q1[1] do
  q1,q2 = q2,q1
  for i=1,#q2 do
    dispatch(q2[i])
    q2[i] = nil
  end
end

July 02, 2013

Hive 增加了 socket 库

按计划给 Hive 增加了非阻塞的 socket 库。这样,它就可以用 lua 完成 skynet 中的 gate 以及其它 tcp 连接相关的功能了。

我比较纠结的是 listen 这个 api 的设计。传统的 bind/listen/accept 模型不太适合这样的 actor 模式。一个尝试过的方案是 skynet 中的 gate 。也就是在 listen 的端口上每收到一个新连接,就转发到新的 cell 中去。

为了灵活起见,我把转发控制交给了回调函数。这也是用 lua 的灵活之处。

用户可以选择启动新的 cell ,然后把新连接 forward 到新 cell 去,也可以 fork 一个 coroutine ,forward 回自己处理。整套 socket api 是非阻塞的,如果 IO 上没有数据,coroutine 都会被挂起。

这次 Hive 这个项目算是业余的尝试,skynet 在我们的项目中已经积累了太多相关代码,不太容易迁移到新的框架下来。而且用 lua 实现大部分以前用 C 实现的代码,性能损失未知。不过这几天写下来觉得,C 没有很好的 coroutine 支持,在做异步消息处理时非常麻烦,以至于用 C 写的模块都不能很好的提供服务的同时保持代码简洁。用 Lua 重新实现它们要简单的多。

这个 socket 库我顺手把 epoll / kqueue / select 三种模型都支持了。其中 select 实现起来最麻烦,而且性能不高。所以,如果想在 Windows 跑的同学,姑且认为它仅仅只是想运行起来罢了。生产环境还是不要用 windows 的好。

为什么不用现成的 lua socket 库呢?

因为对于这种多虚拟机的用法,现有的 lua 库接口都不太合适。我需要在一个 VM 中处理 socket 上的消息,并把消息传递到其它 VM 中进一步处理。所以把网络数据打包为 lua 的 string 或 userdata 都不合适。

最好的方法是直接用 malloc 分配一块内存,复制消息,然后以 lightuserdata 指针的形式传递给后续处理的 VM 。这样可以避免许多数据拷贝。甚至于可以将消息多次转发,直到有 VM 可以指针处理它为止。


至于 Hive 这个玩具暂时就放在这里了。有兴趣完善它的同学可以和我联系。

比如,可以利用已有的 socket 库增加一个调试协议,允许从外部连入一个控制台,去查询 hive 的内部状态。就像 skynet 的 debug 模型那样工作。

还可以利用 socket 库进一步的处理多个进程的不同 hive 相互交互的工作。

我认为这些 skynet 有的特性,移植到 hive 中都不是太难的事情。


ps. 本来想做一个 rockspec 提交到 luarocks 中去的,做了一半遇到些困难。而且整个代码也还需要经常更新,还是搁置一下吧。