« August 2020 | Main

September 21, 2020

skynet 版的 cache server 引出的一点改进

我们自己做的 cache server 已经工作了很长时间了。上次出问题是在 2 月在家工作期间

这个月又出了一起事故,依旧是 OOM 导致的崩溃。一开始,我百思不得其解,感觉上次已经处理完了所有极限情况,按道理,这是个重 IO 而轻内存的业务,不太可能出现 OOM 的。

通过增加一些 log 以及事后的分析,我才理解了问题。并对应做了修改。

上次的问题出在服务器发送的过载。即,服务器在发送大量数据时,发送速度超过了对端可以接收的速度,然后导致了数据在 skynet 底层积压。这是 skynet 设计之初的历史原因造成的。

skynet 在设计的时候做了如下的假设:

  1. skynet 处理的业务以交互通讯为主,不会有太多的下行数据。
  2. skynet 允许在多个服务中向同一个 socket 发送数据,底层保证每次发送的数据是原子的,即 A 服务发送的数据块不会被 B 服务发送的数据穿插打断。
  3. skynet 的发送数据 api 是非阻塞的,调用即成功,不会挂起等待而引起服务消息重入问题。

第二点的由来是因为最初希望把游戏中的聊天信息和游戏逻辑信息完全分离到不同的服务中去处理,而不需要让数据汇总到一个服务中发出。后来感觉这个优化并不那么重要,但功能依然保留了下来。

第三点其实是为了满足 2 而衍生的。同时想和 skynet 服务内部消息 api 保持一致:send 消息本身不应被阻塞。

结果,skynet 的 socket api 就变得和系统 api 行为不一致。tcp 协议中,发送数据会和对端接收者协商,不会一味的推送数据;而 skynet 则会把数据堆积在底层,远远超过系统给的 buffer 。

因为有第一点的假设,绝大多数时候问题并不会显露。


但是、一旦让 skynet 充当文件服务器类似的业务时,我们就需要仔细考量。二月份的问题的解决利用了后来补丁上的的一个 warning 特性,利用 skynet 消息来通知发送缓冲区过大,从上层主动控制发送速度。

但这并没有完全解决问题。

skynet 在设计的时候,客户端上行数据是无限制的由 socket 线程转发到服务的。这个设计基于一个前提:服务的处理能力一定大于数据上行能力。对于游戏服务的场景,这总能满足。即使对于文件传输服务,也通常不是问题:磁盘 IO 速度总能大于网络速度。如果只是简单的上传文件,应该是不会因为 IO 处理过慢,把网络 buffer 撑爆的。

但这次却出了问题。

这个问题部分和 unity 的 cache server 协议设计有关。cache server 的协议中是没有 session 的概念的,每个请求依持续回应。(btw, cache server 的 上传文件请求是没有设计回应的 。我认为这是个草率的设计决定,很容易造成不合理的服务器/客户端实现。)

假设客户端先请求了一个极大的文件,那么服务器会满足它的请求,逐步把文件发送出去。这个过程仅由二月份的完善,可以做到不在 skynet 底层堆积发送队列。但是,我却疏忽了一点:TCP 实质上是双工的,上行和下行完全独立,可以互不干扰。在客户端慢慢接收数据(数据下行)的同时,它还可以同步的提交后续的请求(数据上行)。如果在此同时,客户端企图上传一个巨大的文件,那么由于协议需要严格遵守时序,处理服务没能在发送数据完成之前去处理后面这个请求,最终导致了上行数据堆积。

我们的 cache server 实现,针对这个问题做了一些修改:

之前简单的

while true do
  local req = get_request(fd)
  put_response(fd, req)
end

这样的逻辑是行不通的。需要把这里的一个连接一个处理序改成上下行分开两个,跑在两个 coroutine 中;一个管上行、一个管下行。即,put_response 应该投递到另一个 coroutine 中去处理。只有这样,发送回应(可能是下发一个大文件)的过程中遇到带宽限制而造成的等待,才不会影响 get_request 的处理。

即:get_requestput_response 过程都可能发生阻塞而等待。前者的等待不应该影响后者的处理过程,反之亦然。

但即使这样,也无法完全消除 OOM 的可能性。因为,上行的请求 q1,q2,q3,q4 ... 会导致服务器产生对应的回应 r1,r2,r3,r4 .... ;它们必须按次序完成。如果 r1 处理过程很长(当 q1 是一个大文件请求时),q2, q3 q4 .... 这些后续请求本身必须放在内存队列中(只是不需要立刻计算出 r2, r3, r4 ...)。这个队列一样有潜在的 OOM 的威胁。

所以完全解决这个问题,必须给 skynet 的 socket 底层增加一个流量控制机制。让上层可以通知 socket 线程暂停接收数据,让 TCP 协议本身阻塞住通讯,这样客户端才不会一味的推送数据。(过去的 skynet 只做了一些简单的保护措施,一旦发送堆积就断开连接,但这个粗暴的办法不适合这个场景。因为我们无法修改 Unity 的客户端实现来做相应的配合)

我在最近的一个 commit 中增加了这样的控制指令,并由 socket 模块自动触发。一旦一个服务收到太多的数据,而业务层不去处理,就会触发这个机制。


最后提一下。这次问题的暴露源于我们在广州给上海的开发团队架设了一个 cache server ,让他们可以直接使用。原来以为 cache server 这种设计在局域网内,需要高带宽低延迟的 cache 服务不适合架设在公网上,尤其是跨城市的这种;但实际用了几个月似乎也没什么不适。只是复杂的网络环境多暴露了一些软件实现问题。

September 03, 2020

为 skynet 增加并行多请求的功能

skynet 在设计时,就拒绝使用 callback 的形式来实现请求回应模式。我认为,callback 会导致业务中回应的流程和请求的流程割裂到两个执行序上,这不利于实现复杂的业务逻辑。尤其是对异常的支持不好。

所以,在 skynet 中发起请求时,当前执行序是阻塞的,一直等到远端回应,再继续在同一个执行序上延续。

如果依次发起请求,会有不该有的高延迟。因为在同一个执行序上,你必须等待前一个请求回应后,才可以提起下一个请求。而原本这个过程完全可以同时进行。

但是,如果我们想同时发起多个不相关的请求就会比较麻烦。为每个请求安排一个执行序的话,最后汇总所有请求回到一个执行序上又是一个问题。目前,只能用 fork/wait/wakeup 去实现,非常繁琐。

这类需求一直都存在。我一直想找到一个合适的方法来实现这样一类功能:同时对外发起 n 个请求,依回应的次序来处理这些请求,直到所有的请求都回应后,再继续向后延续业务。实现这样的功能在目前的 skynet 框架下并不复杂,难点在于提供怎样的 api 形式给用户使用。

一开始,我想到的是利用一个 for 循环来处理多次回应。(以下不是合法的 lua 语法,只为了表达意思)

for index, ok, a,b,c,... in skynet.request(...)(...)(...) do
   if index == 1 then
       -- 处理第一个请求
  elseif index == 2 then
       -- 处理第二个请求
  else
    assert(index == 3)
       -- 处理第三个请求
  end
end

这段示意代码中,同时提起了三个不同的请求,它预期会收到三个回应。但回应的次序显然无法确定,所以我用了 index 这个序号来标识后面的参数是第几个回应。btw, 这里 ok 表示了请求成功还是失败(发生 error )。

请求和回应在这里被割裂开了,所以出现了怎样把请求和回应对应起来的问题。为此我想了好几个办法。其中一个是这样的形式:

for req, ok, a, b, ... in skynet.select() do
  if req(...) then
     -- 处理第一个请求
  elseif req(...) then
     -- 处理第二个请求
  elseif req(...) then
     -- 处理第三个请求
  end
end

如果设计一个 skynet.select() 返回一个迭代器,然后把请求本身放在循环内,似乎可以解决割裂的问题。用一定的实现技巧,让请求只在第一次调用的时候真正发出,而循环本身不停的匹配回应。

随之我意识到,这样过于技巧化。只是为了形式上好看一点,其实让语义更模糊了。就不再在这条路上走下去。

我发现,其实最简单的方案是用一个字符串标识每个请求,让业务编写的人可以用一个更明确的词区分不同的并发请求。再进一步,如果请求本身可以直接是一个对象(table) ,用请求本身就可以区分自己。

这样就是最终的方案:

local reqs = skynet.request()
local req1 = reqs:add { ... }
local req2 = reqs:add { ... }
for req, resp in reqs:select() do
    if req == req1 then
       -- 处理请求 1
    elseif req == req2 then
       -- 处理请求 2
    end
end

也可以简化成这样:

for req, resp in skynet.request 
    { ... , token = "req1" }
    { ... , token = "req2" }
     :select() do
    if req.token == "req1" then
       -- 处理请求 1
    elseif req.token == "req2" then
       -- 处理请求 2
    end
end

这里,我让 request/select 返回的迭代器每收到一个回应都返回请求对象和回应对象,用户可以自己识别。因为可以直接在请求对象中插入任意的标识符,所以很容易区分开不同的请求;或是把类似的请求分类处理。回应信息 resp 也用 table 承载,这样,如果 resp 为 nil ,则表示请求出错。


在一开始的实现中,这套 select 机制有一些限制。因为从调度器看来,这段业务的阻塞点在 select 上。也就是在这里,等待多个请求的回应。而回应处理的代码是在循环体内的,如果在处理回应的过程中又发起了新的请求(调用了阻塞函数),调度器就无法正确处理了。因为在同一个执行序列上,无法区分即将到回应到底是之前发起的多个请求之一,还是在处理某个回应时发起的新请求的回应。

不过这个限制是很容易消除的。只需要把 select 本身的执行序放在独立的 coroutine 里就可以了。相当于发起多个请求后,用一个独立的执行序去等待多个回应,每收到一个回应就转发回当初的位置,让用户可以在原有的执行序上依次处理。

有了这个基础结构之后,也就很容易增加超时的处理。超时相当于同时提交一个定时器请求,收到定时器请求的同时,放弃还没有收到的起他回应即可;或是等所有请求都处理完毕,放弃定时器的处理。

我们需要一个明确的处理流程来结束这个 select 循环:把多个请求的处理合流到后续的处理流程上,尚未处理的回应需要忽略掉。如果一切正常,select 的迭代器结束就可以做这些收尾工作。但如果发生异常,例如,在循环中 break 、产生 error 等等,我们需要主动调用关闭流程。

好在 lua 5.4 提供了 to be closed 方法来做这件事。所以这个新特性直接放在 lua 5.4 的基础上来实现。感兴趣的同学可以提前看看仓库中 select 这个分支。目前最直接的好处是,使用新 api 重新实现的 debug console 在调用 stat 或 mem 这样需要轮询所有服务状态的功能时,默认加上了超时设计,不再会因为单个服务出了问题,而无法返回了。