开发笔记 (4) : Agent 的消息循环及 RPC
话接 开发笔记1 。我们将为每个玩家的接入提供一个 agent 服务。agent 相应玩家发送过来的数据包,然后给于反馈。
对于 agent 服务,是一个典型的包驱动模式。无论我们用 Erlang 框架,还是用 ZeroMQ 自己搭建的框架,agent 都会不会有太多的不同。它将利用一个单一的输入点获取输入,根据这些输入产生输出。这个输入点是由框架提供的。框架把 Agent 感兴趣的包发给它。
我们会有许多 agent ,在没有它感兴趣的包到来时,agent 处于挂起状态(而不是轮询输入点)。这样做可以大量节省 cpu 资源。怎样做的高效,就是框架的责任了,暂时我们不展开讨论框架的设计。
下面来看,一旦数据包进入 agent 应该怎样处理。
游戏服务逻辑复杂度很高,比起很多 web 应用来说,要复杂的多。我们不能简单的把外界来的请求都看成的独立的。把输入包设计成 REST 风格,并依赖数据服务构建这套系统基本上行不通。几乎每个包都有相关的上下文环境,就是说,输入和输入之间是有联系的。简单的把输入看成一组组 session 往往也做不到 session 间独立。最终,我把游戏服务器逻辑归纳以下的需求:
游戏逻辑由若干流程构成。比如,agent 可以看作是登陆流程和场景漫游服务流程。
每个流程中,服务器可以接收若干类型的包,每种类型的包大多可以立刻处理完毕。可以简单的认为提供了一个请求回应模式的处理机。
在处理部分数据包时,可以开启一个子流程,在子流程处理完毕前,不会收到父流程认可的数据包类型。(如果收到,即为非法逻辑)
在处理部分数据包时,也可以开启一个并行流程,这个流程和已有的流程处理逻辑共存。即,框架应根据包类型把数据包分发到不同的并行流程上。例如,在场景中漫游时,可能触发一个玩家交易的并发流程。(玩家交易行为需要多次交互确认的手续,不能一次性的完成。在交易过程中,其它如战斗、聊天的处理不可中断)
每个流程都可能因为某次处理后中断退出。继而进入后续的代码逻辑。这个中断退出行为,对于并发和非并发流程都有效。
RPC 可以看作一个简单的并发流程,即等待唯一返回包,并继续流程逻辑。
我希望能设计一个简单的 DSL 来描述我要的东西,大约像这个样子:
...1 listen : case message A : ...2 listen : case message B: ...3 break case message C: ...4 case message D: ...5 ...6 case message E: ...7 break ...8 listen : case message F: fork listen : case message G: ...9 case message H: ...10 break ...11 case message I: ...12
解释一下上面这张表:
它表示,服务器启动后,会运行 ...1 这些代码,然后开始等待输入 A 或 E 两种消息 。如果收到 E 类消息,就执行 ...7 段代码,再因 break 跳出到 ...8 的位置。
如果收到 A 类消息,执行 ...2 代码,然后进程改为等待 B,C,D 类消息。此时,A,E 类消息都无效。直到收到 B 类消息后,执行流程到 ...6 并结束 A 消息的处理。再次等待输入 A 或 E 。
...8 后的阶段大致相同,但在 F 类消息的处理中,使用了并行的输入(fork listen) 。此时,系统会同时等待 G/H 和 F/I 的输入。只到有 H 类消息输入,中断 F 的处理流程,执行完 ...11 后,系统去掉 G/H 的输入等待。
要实现这么一套 DSL ,首先需要用已有的动态语言先实现一下所有功能,等需求稳定后,再设计 DSL 的语法。支持 coroutine 的 lua 非常适合做这件事情。
这套东西的框架其实是一个 coroutine 的调度器。每个执行流(就是 case message),不论是不是并行的,都是一个 coroutine 。当遇到 listen ,fork ,break 的时候 coroutine yield 出来,由调度器来决定下一步该 resume 哪个分支就好了。
框架只需要接收外界传入的带类型信息的 message ,在调度器里维护一张消息类型到执行流的映射表,就可以正确的调度这些东西。
剩下的事情就是处理穿插在其中的代码块内,数据相互之间可见性的问题;以及给 RPC 提供一个更易用的接口了。
我大约花了不到 100 行 lua 代码来实现以上提到的功能的雏形,贴在下面,尚需完善:
--- agent.lua local setmetatable = setmetatable local coroutine = coroutine local assert = assert local unpack = unpack local print = print local pairs = pairs module "agent" local function create_event_group(self, events, thread , parent_group) local group = { event = {}, thread = thread, parent = parent_group, } if parent_group then for k,v in pairs(parent_group.event) do self.event[k] = nil end end for k,v in pairs(events) do group.event[k] = { func = v , group = group } assert(self.event[k] == nil , k) self.event[k] = group.event[k] end end local function pop_event_group(self, group) for k in pairs(group.event) do self.event[k] = nil end if group.parent then for k,v in pairs(group.parent.event) do assert(self.event[k] == nil , k) self.event[k] = v end end end function create(main) local mainthread = coroutine.create(main) local self = setmetatable( { event = {} } , { __index = _M }) local r , command , events = coroutine.resume(mainthread , self) assert(r , command) assert(command == "listen" , command) create_event_group(self, events, mainthread) return self end function send(self, msg , ...) local event = self.event[msg] if event == nil then print (msg .. " unknown" , ...) else local event_thread = coroutine.create(event.func) local group = event.group while true do local r, command, events = coroutine.resume(event_thread, self, ...) assert(r,command) if command == "listen" then create_event_group(self, events, event_thread , group) break elseif command == "fork" then create_event_group(self, events, event_thread) break elseif command == "break" then pop_event_group(self, group) event_thread = group.thread group = group.parent else break end end end end function listen(agent , msg) coroutine.yield("listen",msg) end function quit(agent) coroutine.yield "break" end function fork(agent, msg) coroutine.yield("fork",msg) end
--- test.lua local agent = require "agent" a = agent.create(function (self) self:listen { login = function (self) self:listen { username = function(self , ...) print("username", ...) self:listen { password = function(self, ...) print("password", ...) self:quit() end } self:quit() end , } end, ok = function (self) self:quit() end, } print "login ok" local q = 0 self:listen { chat = function (self, ...) print("chat", ...) end, question = function (self , ...) print("question", ...) local answer = "answer" .. q q = q+1 self:fork { [answer] = function (self, ...) print("answer", ...) self:quit() end } end, } end) a:send("login") a:send("username","alice") a:send("username","bob") a:send("password","xxx") a:send("login") a:send("username","bob") a:send("password","yyy") a:send("chat","foobar") a:send("ok") a:send("chat", "hello") a:send("question","?0") a:send("chat", "world") a:send("question","?1") a:send("answer0","!0") a:send("answer1","!1")
12 月 5 日补充:
周一上班和蜗牛同学讨论了一下需求, 最后我们商定抽象出一个 session 出来, 供 fork 出来的流程使用。因为并行的流程会使用相同的消息类型,但流程上是独立的。
根据 session id 和 message type 做两级分发要清晰一些。
然后,我们再对 rpc 调用做简单的封装,使用更简单。
改进后的代码就不再列出了。
Comments
Posted by: LED贴片机 | (13) October 29, 2013 03:41 PM
Posted by: Anonymous | (12) January 13, 2012 12:04 PM
Posted by: ZeroV | (11) January 10, 2012 09:32 PM
Posted by: Holimion | (10) December 10, 2011 09:12 AM
Posted by: tinyzhang | (9) December 7, 2011 10:58 AM
Posted by: Anonymous | (8) December 5, 2011 07:34 PM
Posted by: ot512 | (7) December 5, 2011 03:56 PM
Posted by: 常高伟 | (6) December 4, 2011 10:05 PM
Posted by: 涛 | (5) December 4, 2011 05:43 PM
Posted by: 贴片机 | (4) December 4, 2011 02:07 PM
Posted by: asking | (3) December 4, 2011 11:53 AM
Posted by: mos | (2) December 4, 2011 11:22 AM
Posted by: xh4n | (1) December 4, 2011 08:53 AM