开发笔记 (4) : Agent 的消息循环及 RPC
话接 开发笔记1 。我们将为每个玩家的接入提供一个 agent 服务。agent 相应玩家发送过来的数据包,然后给于反馈。
对于 agent 服务,是一个典型的包驱动模式。无论我们用 Erlang 框架,还是用 ZeroMQ 自己搭建的框架,agent 都会不会有太多的不同。它将利用一个单一的输入点获取输入,根据这些输入产生输出。这个输入点是由框架提供的。框架把 Agent 感兴趣的包发给它。
我们会有许多 agent ,在没有它感兴趣的包到来时,agent 处于挂起状态(而不是轮询输入点)。这样做可以大量节省 cpu 资源。怎样做的高效,就是框架的责任了,暂时我们不展开讨论框架的设计。
下面来看,一旦数据包进入 agent 应该怎样处理。
游戏服务逻辑复杂度很高,比起很多 web 应用来说,要复杂的多。我们不能简单的把外界来的请求都看成的独立的。把输入包设计成 REST 风格,并依赖数据服务构建这套系统基本上行不通。几乎每个包都有相关的上下文环境,就是说,输入和输入之间是有联系的。简单的把输入看成一组组 session 往往也做不到 session 间独立。最终,我把游戏服务器逻辑归纳以下的需求:
游戏逻辑由若干流程构成。比如,agent 可以看作是登陆流程和场景漫游服务流程。
每个流程中,服务器可以接收若干类型的包,每种类型的包大多可以立刻处理完毕。可以简单的认为提供了一个请求回应模式的处理机。
在处理部分数据包时,可以开启一个子流程,在子流程处理完毕前,不会收到父流程认可的数据包类型。(如果收到,即为非法逻辑)
在处理部分数据包时,也可以开启一个并行流程,这个流程和已有的流程处理逻辑共存。即,框架应根据包类型把数据包分发到不同的并行流程上。例如,在场景中漫游时,可能触发一个玩家交易的并发流程。(玩家交易行为需要多次交互确认的手续,不能一次性的完成。在交易过程中,其它如战斗、聊天的处理不可中断)
每个流程都可能因为某次处理后中断退出。继而进入后续的代码逻辑。这个中断退出行为,对于并发和非并发流程都有效。
RPC 可以看作一个简单的并发流程,即等待唯一返回包,并继续流程逻辑。
我希望能设计一个简单的 DSL 来描述我要的东西,大约像这个样子:
...1
listen :
case message A :
...2
listen :
case message B:
...3
break
case message C:
...4
case message D:
...5
...6
case message E:
...7
break
...8
listen :
case message F:
fork listen :
case message G:
...9
case message H:
...10
break
...11
case message I:
...12
解释一下上面这张表:
它表示,服务器启动后,会运行 ...1 这些代码,然后开始等待输入 A 或 E 两种消息 。如果收到 E 类消息,就执行 ...7 段代码,再因 break 跳出到 ...8 的位置。
如果收到 A 类消息,执行 ...2 代码,然后进程改为等待 B,C,D 类消息。此时,A,E 类消息都无效。直到收到 B 类消息后,执行流程到 ...6 并结束 A 消息的处理。再次等待输入 A 或 E 。
...8 后的阶段大致相同,但在 F 类消息的处理中,使用了并行的输入(fork listen) 。此时,系统会同时等待 G/H 和 F/I 的输入。只到有 H 类消息输入,中断 F 的处理流程,执行完 ...11 后,系统去掉 G/H 的输入等待。
要实现这么一套 DSL ,首先需要用已有的动态语言先实现一下所有功能,等需求稳定后,再设计 DSL 的语法。支持 coroutine 的 lua 非常适合做这件事情。
这套东西的框架其实是一个 coroutine 的调度器。每个执行流(就是 case message),不论是不是并行的,都是一个 coroutine 。当遇到 listen ,fork ,break 的时候 coroutine yield 出来,由调度器来决定下一步该 resume 哪个分支就好了。
框架只需要接收外界传入的带类型信息的 message ,在调度器里维护一张消息类型到执行流的映射表,就可以正确的调度这些东西。
剩下的事情就是处理穿插在其中的代码块内,数据相互之间可见性的问题;以及给 RPC 提供一个更易用的接口了。
我大约花了不到 100 行 lua 代码来实现以上提到的功能的雏形,贴在下面,尚需完善:
--- agent.lua
local setmetatable = setmetatable
local coroutine = coroutine
local assert = assert
local unpack = unpack
local print = print
local pairs = pairs
module "agent"
local function create_event_group(self, events, thread , parent_group)
local group = {
event = {},
thread = thread,
parent = parent_group,
}
if parent_group then
for k,v in pairs(parent_group.event) do
self.event[k] = nil
end
end
for k,v in pairs(events) do
group.event[k] = { func = v , group = group }
assert(self.event[k] == nil , k)
self.event[k] = group.event[k]
end
end
local function pop_event_group(self, group)
for k in pairs(group.event) do
self.event[k] = nil
end
if group.parent then
for k,v in pairs(group.parent.event) do
assert(self.event[k] == nil , k)
self.event[k] = v
end
end
end
function create(main)
local mainthread = coroutine.create(main)
local self = setmetatable( { event = {} } , { __index = _M })
local r , command , events = coroutine.resume(mainthread , self)
assert(r , command)
assert(command == "listen" , command)
create_event_group(self, events, mainthread)
return self
end
function send(self, msg , ...)
local event = self.event[msg]
if event == nil then
print (msg .. " unknown" , ...)
else
local event_thread = coroutine.create(event.func)
local group = event.group
while true do
local r, command, events = coroutine.resume(event_thread, self, ...)
assert(r,command)
if command == "listen" then
create_event_group(self, events, event_thread , group)
break
elseif command == "fork" then
create_event_group(self, events, event_thread)
break
elseif command == "break" then
pop_event_group(self, group)
event_thread = group.thread
group = group.parent
else
break
end
end
end
end
function listen(agent , msg)
coroutine.yield("listen",msg)
end
function quit(agent)
coroutine.yield "break"
end
function fork(agent, msg)
coroutine.yield("fork",msg)
end
--- test.lua
local agent = require "agent"
a = agent.create(function (self)
self:listen {
login = function (self)
self:listen {
username = function(self , ...)
print("username", ...)
self:listen {
password = function(self, ...)
print("password", ...)
self:quit()
end
}
self:quit()
end ,
}
end,
ok = function (self)
self:quit()
end,
}
print "login ok"
local q = 0
self:listen {
chat = function (self, ...)
print("chat", ...)
end,
question = function (self , ...)
print("question", ...)
local answer = "answer" .. q
q = q+1
self:fork {
[answer] = function (self, ...)
print("answer", ...)
self:quit()
end
}
end,
}
end)
a:send("login")
a:send("username","alice")
a:send("username","bob")
a:send("password","xxx")
a:send("login")
a:send("username","bob")
a:send("password","yyy")
a:send("chat","foobar")
a:send("ok")
a:send("chat", "hello")
a:send("question","?0")
a:send("chat", "world")
a:send("question","?1")
a:send("answer0","!0")
a:send("answer1","!1")
12 月 5 日补充:
周一上班和蜗牛同学讨论了一下需求, 最后我们商定抽象出一个 session 出来, 供 fork 出来的流程使用。因为并行的流程会使用相同的消息类型,但流程上是独立的。
根据 session id 和 message type 做两级分发要清晰一些。
然后,我们再对 rpc 调用做简单的封装,使用更简单。
改进后的代码就不再列出了。
Comments
Posted by: LED贴片机 | (13) October 29, 2013 03:41 PM
Posted by: Anonymous | (12) January 13, 2012 12:04 PM
Posted by: ZeroV | (11) January 10, 2012 09:32 PM
Posted by: Holimion | (10) December 10, 2011 09:12 AM
Posted by: tinyzhang | (9) December 7, 2011 10:58 AM
Posted by: Anonymous | (8) December 5, 2011 07:34 PM
Posted by: ot512 | (7) December 5, 2011 03:56 PM
Posted by: 常高伟 | (6) December 4, 2011 10:05 PM
Posted by: 涛 | (5) December 4, 2011 05:43 PM
Posted by: 贴片机 | (4) December 4, 2011 02:07 PM
Posted by: asking | (3) December 4, 2011 11:53 AM
Posted by: mos | (2) December 4, 2011 11:22 AM
Posted by: xh4n | (1) December 4, 2011 08:53 AM