« 银河竞逐的设计 | 返回首页 | skynet 版的 cache server 引出的一点改进 »

为 skynet 增加并行多请求的功能

skynet 在设计时,就拒绝使用 callback 的形式来实现请求回应模式。我认为,callback 会导致业务中回应的流程和请求的流程割裂到两个执行序上,这不利于实现复杂的业务逻辑。尤其是对异常的支持不好。

所以,在 skynet 中发起请求时,当前执行序是阻塞的,一直等到远端回应,再继续在同一个执行序上延续。

如果依次发起请求,会有不该有的高延迟。因为在同一个执行序上,你必须等待前一个请求回应后,才可以提起下一个请求。而原本这个过程完全可以同时进行。

但是,如果我们想同时发起多个不相关的请求就会比较麻烦。为每个请求安排一个执行序的话,最后汇总所有请求回到一个执行序上又是一个问题。目前,只能用 fork/wait/wakeup 去实现,非常繁琐。

这类需求一直都存在。我一直想找到一个合适的方法来实现这样一类功能:同时对外发起 n 个请求,依回应的次序来处理这些请求,直到所有的请求都回应后,再继续向后延续业务。实现这样的功能在目前的 skynet 框架下并不复杂,难点在于提供怎样的 api 形式给用户使用。

一开始,我想到的是利用一个 for 循环来处理多次回应。(以下不是合法的 lua 语法,只为了表达意思)

for index, ok, a,b,c,... in skynet.request(...)(...)(...) do
   if index == 1 then
       -- 处理第一个请求
  elseif index == 2 then
       -- 处理第二个请求
  else
    assert(index == 3)
       -- 处理第三个请求
  end
end

这段示意代码中,同时提起了三个不同的请求,它预期会收到三个回应。但回应的次序显然无法确定,所以我用了 index 这个序号来标识后面的参数是第几个回应。btw, 这里 ok 表示了请求成功还是失败(发生 error )。

请求和回应在这里被割裂开了,所以出现了怎样把请求和回应对应起来的问题。为此我想了好几个办法。其中一个是这样的形式:

for req, ok, a, b, ... in skynet.select() do
  if req(...) then
     -- 处理第一个请求
  elseif req(...) then
     -- 处理第二个请求
  elseif req(...) then
     -- 处理第三个请求
  end
end

如果设计一个 skynet.select() 返回一个迭代器,然后把请求本身放在循环内,似乎可以解决割裂的问题。用一定的实现技巧,让请求只在第一次调用的时候真正发出,而循环本身不停的匹配回应。

随之我意识到,这样过于技巧化。只是为了形式上好看一点,其实让语义更模糊了。就不再在这条路上走下去。

我发现,其实最简单的方案是用一个字符串标识每个请求,让业务编写的人可以用一个更明确的词区分不同的并发请求。再进一步,如果请求本身可以直接是一个对象(table) ,用请求本身就可以区分自己。

这样就是最终的方案:

local reqs = skynet.request()
local req1 = reqs:add { ... }
local req2 = reqs:add { ... }
for req, resp in reqs:select() do
    if req == req1 then
       -- 处理请求 1
    elseif req == req2 then
       -- 处理请求 2
    end
end

也可以简化成这样:

for req, resp in skynet.request 
    { ... , token = "req1" }
    { ... , token = "req2" }
     :select() do
    if req.token == "req1" then
       -- 处理请求 1
    elseif req.token == "req2" then
       -- 处理请求 2
    end
end

这里,我让 request/select 返回的迭代器每收到一个回应都返回请求对象和回应对象,用户可以自己识别。因为可以直接在请求对象中插入任意的标识符,所以很容易区分开不同的请求;或是把类似的请求分类处理。回应信息 resp 也用 table 承载,这样,如果 resp 为 nil ,则表示请求出错。


在一开始的实现中,这套 select 机制有一些限制。因为从调度器看来,这段业务的阻塞点在 select 上。也就是在这里,等待多个请求的回应。而回应处理的代码是在循环体内的,如果在处理回应的过程中又发起了新的请求(调用了阻塞函数),调度器就无法正确处理了。因为在同一个执行序列上,无法区分即将到回应到底是之前发起的多个请求之一,还是在处理某个回应时发起的新请求的回应。

不过这个限制是很容易消除的。只需要把 select 本身的执行序放在独立的 coroutine 里就可以了。相当于发起多个请求后,用一个独立的执行序去等待多个回应,每收到一个回应就转发回当初的位置,让用户可以在原有的执行序上依次处理。

有了这个基础结构之后,也就很容易增加超时的处理。超时相当于同时提交一个定时器请求,收到定时器请求的同时,放弃还没有收到的起他回应即可;或是等所有请求都处理完毕,放弃定时器的处理。

我们需要一个明确的处理流程来结束这个 select 循环:把多个请求的处理合流到后续的处理流程上,尚未处理的回应需要忽略掉。如果一切正常,select 的迭代器结束就可以做这些收尾工作。但如果发生异常,例如,在循环中 break 、产生 error 等等,我们需要主动调用关闭流程。

好在 lua 5.4 提供了 to be closed 方法来做这件事。所以这个新特性直接放在 lua 5.4 的基础上来实现。感兴趣的同学可以提前看看仓库中 select 这个分支。目前最直接的好处是,使用新 api 重新实现的 debug console 在调用 stat 或 mem 这样需要轮询所有服务状态的功能时,默认加上了超时设计,不再会因为单个服务出了问题,而无法返回了。

Comments

哈哈,这种需求很多 我自己在unity客户端也做了类似的机制, 起一堆加载协程,等他们全部返回了再继续后面的逻辑。 不做这种机制,就需要各种判断,因为返回的先后顺序不同,或者做标记变量。。。
大神啊,很实用
实用,非常感谢!
简化开发了,感谢
真好,感谢风哥。
没有错误信息是因为 skynet 本身不跨服务传递 error 信息。 否则,错误信息可以通过 for 的第三个参数传入。
看了一遍,发现在错误的情况下,只能确定是否正确,而不能获取具体的错误信息。 另外,这类设计,可以参考下js中的Promise设计,包含了完整的流程。

Post a comment

非这个主题相关的留言请到:留言本