« 概率问题 | 返回首页 | 开发笔记 (5) : 场景服务及避免读写锁 »

开发笔记 (4) : Agent 的消息循环及 RPC

话接 开发笔记1 。我们将为每个玩家的接入提供一个 agent 服务。agent 相应玩家发送过来的数据包,然后给于反馈。

对于 agent 服务,是一个典型的包驱动模式。无论我们用 Erlang 框架,还是用 ZeroMQ 自己搭建的框架,agent 都会不会有太多的不同。它将利用一个单一的输入点获取输入,根据这些输入产生输出。这个输入点是由框架提供的。框架把 Agent 感兴趣的包发给它。

我们会有许多 agent ,在没有它感兴趣的包到来时,agent 处于挂起状态(而不是轮询输入点)。这样做可以大量节省 cpu 资源。怎样做的高效,就是框架的责任了,暂时我们不展开讨论框架的设计。

下面来看,一旦数据包进入 agent 应该怎样处理。

游戏服务逻辑复杂度很高,比起很多 web 应用来说,要复杂的多。我们不能简单的把外界来的请求都看成的独立的。把输入包设计成 REST 风格,并依赖数据服务构建这套系统基本上行不通。几乎每个包都有相关的上下文环境,就是说,输入和输入之间是有联系的。简单的把输入看成一组组 session 往往也做不到 session 间独立。最终,我把游戏服务器逻辑归纳以下的需求:

游戏逻辑由若干流程构成。比如,agent 可以看作是登陆流程和场景漫游服务流程。

每个流程中,服务器可以接收若干类型的包,每种类型的包大多可以立刻处理完毕。可以简单的认为提供了一个请求回应模式的处理机。

在处理部分数据包时,可以开启一个子流程,在子流程处理完毕前,不会收到父流程认可的数据包类型。(如果收到,即为非法逻辑)

在处理部分数据包时,也可以开启一个并行流程,这个流程和已有的流程处理逻辑共存。即,框架应根据包类型把数据包分发到不同的并行流程上。例如,在场景中漫游时,可能触发一个玩家交易的并发流程。(玩家交易行为需要多次交互确认的手续,不能一次性的完成。在交易过程中,其它如战斗、聊天的处理不可中断)

每个流程都可能因为某次处理后中断退出。继而进入后续的代码逻辑。这个中断退出行为,对于并发和非并发流程都有效。

RPC 可以看作一个简单的并发流程,即等待唯一返回包,并继续流程逻辑。


我希望能设计一个简单的 DSL 来描述我要的东西,大约像这个样子:

...1
listen :
    case message A :
        ...2
        listen :
            case message B:
                ...3
                break
            case message C:
                ...4
            case message D:
                ...5
        ...6
    case message E:
        ...7
        break
...8
listen :
    case message F:
        fork listen :
            case message G:
                ...9
            case message H:
                ...10
                break
        ...11
    case message I:
        ...12

解释一下上面这张表:

它表示,服务器启动后,会运行 ...1 这些代码,然后开始等待输入 A 或 E 两种消息 。如果收到 E 类消息,就执行 ...7 段代码,再因 break 跳出到 ...8 的位置。

如果收到 A 类消息,执行 ...2 代码,然后进程改为等待 B,C,D 类消息。此时,A,E 类消息都无效。直到收到 B 类消息后,执行流程到 ...6 并结束 A 消息的处理。再次等待输入 A 或 E 。

...8 后的阶段大致相同,但在 F 类消息的处理中,使用了并行的输入(fork listen) 。此时,系统会同时等待 G/H 和 F/I 的输入。只到有 H 类消息输入,中断 F 的处理流程,执行完 ...11 后,系统去掉 G/H 的输入等待。


要实现这么一套 DSL ,首先需要用已有的动态语言先实现一下所有功能,等需求稳定后,再设计 DSL 的语法。支持 coroutine 的 lua 非常适合做这件事情。

这套东西的框架其实是一个 coroutine 的调度器。每个执行流(就是 case message),不论是不是并行的,都是一个 coroutine 。当遇到 listen ,fork ,break 的时候 coroutine yield 出来,由调度器来决定下一步该 resume 哪个分支就好了。

框架只需要接收外界传入的带类型信息的 message ,在调度器里维护一张消息类型到执行流的映射表,就可以正确的调度这些东西。

剩下的事情就是处理穿插在其中的代码块内,数据相互之间可见性的问题;以及给 RPC 提供一个更易用的接口了。

我大约花了不到 100 行 lua 代码来实现以上提到的功能的雏形,贴在下面,尚需完善:


--- agent.lua
local setmetatable = setmetatable
local coroutine = coroutine
local assert = assert
local unpack = unpack
local print = print
local pairs = pairs

module "agent"

local function create_event_group(self, events, thread , parent_group)
    local group = {
        event = {},
        thread = thread,
        parent = parent_group,
    }
    if parent_group then
        for k,v in pairs(parent_group.event) do
            self.event[k] = nil
        end
    end

    for k,v in pairs(events) do
        group.event[k] = { func = v , group = group }
        assert(self.event[k] == nil , k)
        self.event[k] = group.event[k]
    end
end

local function pop_event_group(self, group)
    for k in pairs(group.event) do
        self.event[k] = nil
    end
    if group.parent then
        for k,v in pairs(group.parent.event) do
            assert(self.event[k] == nil , k)
            self.event[k] = v
        end
    end
end

function create(main)
    local mainthread = coroutine.create(main)
    local self = setmetatable( { event = {} } , { __index = _M })
    local r , command , events = coroutine.resume(mainthread , self)
    assert(r , command)
    assert(command == "listen" ,  command)
    create_event_group(self, events, mainthread)
    return self
end

function send(self, msg , ...)
    local event = self.event[msg]
    if event == nil then
        print (msg .. " unknown" , ...)
    else
        local event_thread = coroutine.create(event.func)
        local group = event.group
        while true do
            local r, command, events = coroutine.resume(event_thread, self, ...)
            assert(r,command)
            if command == "listen" then
                create_event_group(self, events, event_thread , group)
                break
            elseif command == "fork" then
                create_event_group(self, events, event_thread)
                break
            elseif command == "break" then
                pop_event_group(self, group)
                event_thread = group.thread
                group = group.parent
            else
                break
            end
        end
    end
end


function listen(agent , msg)
    coroutine.yield("listen",msg)
end

function quit(agent)
    coroutine.yield "break"
end

function fork(agent, msg)
    coroutine.yield("fork",msg)
end
--- test.lua

local agent = require "agent"

a = agent.create(function (self)
    self:listen {
        login = function (self)
            self:listen {
                username = function(self , ...)
                    print("username", ...)
                    self:listen {
                        password = function(self, ...)
                            print("password", ...)
                            self:quit()
                        end
                    }
                    self:quit()
                end ,
            }
        end,
        ok = function (self)
            self:quit()
        end,
    }
    print "login ok"
    local q = 0
    self:listen {
        chat = function (self, ...)
            print("chat", ...)
        end,
        question = function (self , ...)
            print("question", ...)
            local answer = "answer" .. q
            q = q+1
            self:fork {
                [answer] = function (self, ...)
                    print("answer", ...)
                    self:quit()
                end
            }
        end,
    }
end)

a:send("login")
a:send("username","alice")
a:send("username","bob")
a:send("password","xxx")
a:send("login")
a:send("username","bob")
a:send("password","yyy")
a:send("chat","foobar")
a:send("ok")

a:send("chat", "hello")
a:send("question","?0")
a:send("chat", "world")
a:send("question","?1")
a:send("answer0","!0")
a:send("answer1","!1")


12 月 5 日补充:

周一上班和蜗牛同学讨论了一下需求, 最后我们商定抽象出一个 session 出来, 供 fork 出来的流程使用。因为并行的流程会使用相同的消息类型,但流程上是独立的。

根据 session id 和 message type 做两级分发要清晰一些。

然后,我们再对 rpc 调用做简单的封装,使用更简单。

改进后的代码就不再列出了。

Comments

表示看不懂。
跟Holimion 一样,看不出有哪些应用场景。。。
方法pop_event_group中assert应该是assert(self.event[k] ~= nil, k)吧, 不是"=="! 否则跑不通.
正常游戏时的协议包互相之间并没有明确的时序关系,都是可以接受,比如玩家可以跑/跳/发技能/查询公会成员。。。因此,这套定义了严格协议流程的框架,其实只对登陆/跨服之类的协议有效,而这些协议在整个服务器协议中只占很小一部分。并且,这些登陆/跨服协议的流程,一旦设计完成,在项目后期几乎就不需要改动了。不觉得有必要为这样的需求,设计一套框架。
simple but useful
喜欢你的验证码 真幽默 不过我想知道你们公司的名字 额 没什么恶意 谢谢
全都看不懂,但我想建一个虚拟世界,天堂。服务器处理程序叫God,对世界内的数据维护程序叫angel。呵呵
有看到了coroutine。今天看pycon 中国,里面很多人都提到了python的coroutine。之前也看到过lua支持coroutine,不过没有研究过。虽然我不是做游戏开发,但是对于复杂的逻辑处理也遇到很多(我是做企业通信开发)。两年前做个一个东西:http://blog.csdn.net/chgaowei/article/details/4589675。和你说的有点像,不过我做得有点复杂。 现在我更倾向于用python来解决以前遇到的问题。这两天会研究一下python的coroutine。
嗯,我曾经对网络服务器逻辑的编写总结过一句话:“把数据维护好,把事件处理好”。数据包括玩家自身的各种数据、服务器逻辑所需要维护的全局数据、也有一些临时性数据(比如计算出来的附近玩家)。事件包括客户端对服务器的请求(异步)、服务器逻辑模块之间的交互(可以全部是同步,也可以把其中部分做成异步的)。 正确的维护好数据,需要处理好各种事件,而各种事件的处理也需要服务器维护的各种数据的支持,这也就是云风大哥所说的网游服务器逻辑复杂性的根源:几乎每个客户端请求都有相应的上下文。 数据包分发是服务器常见的策略,云风大哥这次把数据包的分发放到具体的上下文种进行,当然就更加的好了,至少我可以想到的是:在写逻辑的时候,数据包之间的前后关系不用在玩家身上做标记了,框架已经提前过滤了非法请求。 服务器逻辑模块之间的交互也可以很容易的通过这个框架来实现,至少事件能够很容易的、正确的派发,而事件的正确处理还需要“穿插在其中的代码块内,数据相互之间可见性的问题”的解决。 我觉得由于逻辑请求之间的互相依赖,服务器数据的拆分和访问应该是网游服务器进行并行处理的难点。
还真有些看不懂
erlang selective receive
1.什么做什么不能做可能不该放在msg这一层,放在user这一层较好。 2.session的维护可以用callback参考node的例子。 wait_msg(msg1,function callback1(user){ dosometing_logic1 wait_msg(msg2,functon callback2(user) { //callback3,4,5... ... } }
到现在为止看过coroutine用的最合适的情况。以前只觉得省input output 过程中的临时内存。

Post a comment

非这个主题相关的留言请到:留言本