« January 2021 | Main

February 27, 2021

skynet 处理 TCP 连接半关闭问题

TCP 连接是双工的,既可以上行数据,又可以下行数据。连接断开时,两侧通道也是分别关闭的。

从 API 层面看,如果 read 返回 0 ,则说明上行数据已经关闭,后续不再会有数据进来。但此时,下行通道未必关闭,也就是说对端还可能期待收取数据。

同样,如果 write 返回 -1 ,错误是 EPIPE ,则表示下行通道已经关闭,不应再发送数据。但上行通道未必关闭,之后的 read 还可能收到数据。

用 shutdown 指令可以主动关闭单个通道(上行或下行)。

如果 tcp 连接只有一侧关闭,我们(skynet 中)称之为半关闭状态。在最开始,skynet 是将半关闭视为关闭的。这是因为,skynet 一开始只考虑网络游戏应用。在网络游戏中,连接被视为不可靠的,任何时候客户端都应该妥善处理服务器断开的情况,而服务器也应该处理客户端不告而别。业务层一般会额外做一套握手协议,不完全依赖 tcp 的底层协议。状态一般是不在客户端保存的。如果上一次链接上有数据没有发送到,那么下一次建立连接会重新拿取必要的数据。

所以,在 read 返回 0 ,或是 write 出错后,skynet 的底层都直接 close 连接。这种简单粗暴的方法可以大大简化底层的实现复杂度。同时,如果需要确保业务层数据交换的可靠性,就在业务层增加确认机制。见 issue 50


随着 skynet 应用领域的扩展,我发现在很多场合依赖已有的协议(不方便增加确认机制)时,必须更好的处理半关闭状态。比如,服务器想推送很多数据然后关闭连接。如果业务层不提供客户端确认收到的机制,就很难正确实现。只是简单的 write write …… close ,很可能在底层还没有全部发送完数据前就先把连接 close 了。

这时,skynet 增加了一个简单的半关闭处理机制。如果主动调用 close 时发现之前还有从业务层写出的数据并没有发送完,那么就让 fd 进入半关闭状态。半关闭状态下,fd 不可读写,从业务层看已经关闭,但底层会一直保留 fd 到数据全部发送完毕或发送失败(对端关闭)。

这个机制良好运作了好几年,直到有很多基于 skynet 的 web server 的应用场景出现。

我们发现,某些浏览器在从 web server 下载文件时会有这样的行为:发送一个 http 请求,然后立刻关闭 write 但保留 read 。也就是说,在 web server 看,读完请求后,会 read 到 0 。如果此时服务器主动关闭连接,后续就不再能写出回应包。由于 read 的行为是 skynet 底层的网络线程主动进行,而不是业务层调用 read 触发的,所以业务层无法控制它。

为解决这个问题,我在 2021 年初 提交了一个 pr 能更妥善的处理半关闭问题。在实现过程中我发现,如果我想保持兼容,并不把复杂性甩给上层(让业务层可以自己小心的单边关闭连接)。那么,我需要更仔细的处理半关闭的情况。底层使用一个半关闭状态值是不够的,必须区分 HALFCLOSE_READHALFCLOSE_WRITE 两个状态(和操作系统内核处理半关闭状态一致)。而且,我还需要区分半关闭状态是服务器主动进行的,还是对端操作的。

主动进入半关闭状态比较简单:业务层不允许只关闭一半的信道(读或写),但它在关闭时,之前还有推送到底层的下行数据没有发送完,就需要进入半关闭状态,调用 shutdown 关闭读,但还需要等待数据发送完毕。

被动进入半关闭状态需要分为两种:read 到 0 或 write 出现 EPIPE 错误。分别应该设为不同的半关闭状态。

由于我在这方面经验不足,这次的修改遗留了很多 bug 。感谢 skynet 的广泛应用,很快就收到了反馈。这两周一直在修复实现问题。具体可以看 issue 1346 以及相关讨论。

现在的版本暂时没有新的 bug 报告。已发现的 bug 很多是对 epoll 理解不够,kqueue 也和 epoll 有许多细微的差别。仔细阅读手册,查看 stackoverflow 的相关问题,对理解它们帮助很大。例如:TCP: When is EPOLLHUP generated?

February 07, 2021

ltask :Lua 的多任务库

ltask 是我前两周实现的一个 lua 的多任务库

这个项目复用了我之前的一个类似项目的名字 。目的是一样的,但是我做了全新的设计。所以我干脆将以前的仓库移除,以同样的名字创建了新的仓库。

和之前的版本设计不同。在消息通讯机制上,这个更接近 skynet 的模型,但它是一个库而不是一个框架。调度模块是按前段时间的想法实现的,不过只做了一个初步的模型,细节上还有一些工作待完善。

它的 C 部分提供的库被分为四个部分:

  1. ltask
  2. ltask.bootstrap
  3. ltask.exclusive
  4. ltask.root

如果用官方的 Lua 解析器作为入口,那么入口代码只可以使用 ltask.bootstrap 这个子库。它提供的 api 可以完成一系列框架的搭建工作。可以视为把 skynet 设置和启动线程的部分,以库的形式提供了出来,方便用 lua 代码驱动。

所有设定工作完成后,ltask.bootstrap.run 这个 api 会启动调度器并阻塞住主线程,之后的一切任务都是由设定完成的工作线程驱动。

和 skynet 类似,一切任务( task )都被放在 service 中完成。每个 service 是一个独立的 lua 虚拟机。它们均配有一个和外界通讯的通信管道。我们用 32bit 来标识服务及所属的通信管道。和 skynet 不同,内核不负责管理服务的名字。

0 号 id 是保留的,可以用来表示无效的服务或系统服务。1 号服务是一个特殊的 root 服务,它会有一些特权。我计划保留 2-1023 号服务,用来做特定的系统任务。例如,名字解析,timer ,socket 等等。这样,一些必要的服务就可以直接用固定的数字 id 而不需要起字符串名字。

ltask.bootstrap 里有一系列 api 可以用来做上面的配置工作:启动服务但不运行。


和 skynet 不同,我将服务分为两种:shared 共享和 exclusive 独占。

skyent 的服务可以全部视为共享服务,它们共享 N 个工作线程。skynet 中也有独立的线程去完成 timer 和 socket monitor 等工作。但这些独占线程都是 C 代码实现的。这次,我希望由 lua 来驱动。所以新设立了 exclusive 服务的概念。它只能在 boostrap 过程创建,并绑定在独立的系统线程上。

exclusive 服务是为了方便实现一些会阻塞在系统调用上的业务,例如 socket 的 select (epoll) 上。如果以后需要使用一些第三方的自带网络处理的库,例如 MQ ,DB driver 等等,就不必担心它会占用工作线程太长时间了。

目前我只实现了一个 timer 服务,用于发送定时器消息。虽然 exclusive 服务也可以接收 ltask 内部的消息(它也有消息通信管道),但是,它有可能阻塞在系统调用上(对于 timer 服务来说,它会阻塞在 sleep 调用上)。所以没有使用内部消息来创建定时器。定时器还是和 skynet 的实现一样,直接用 spinlock 插入管理器。


前面提到,root 服务有一些特权。这个特权就是创建和销毁服务。这是因为调度器尽量避免零碎的锁,所以创建销毁服务 handle 都是在拿到调度器时完成的(避免对服务 handle 映射表加额外的锁)。这要求创建服务的业务编写起来要格外小心,所以我直接实现在 root 服务中,不把相关的 api 暴露出来。(实际上,是 root 发送特定消息给系统来完成的。而系统会拒绝非 root 服务发送来的相关消息)

exclusive 服务也有一些特权,比如它们可以批量发送消息。这样的 api 只在 ltask.exclusive 只模块中提供。而 shared 服务每次只可以发送一条消息,必须等待调度器将消息投递完成后,才可以发送下一条。

这里,需要谈谈 ltask 的消息模型和 skynet 最大的不同:每条消息发送后,系统都会将服务挂起,之后给出一个回执(receipt),业务层必须处理完这条回执才能跑后面的业务。

所以,在业务层面看,ltask 和 skynet 是一致的:send 消息后,业务逻辑不会被打断; call ( request )则可能在回应到来前被重新进入,有可能影响业务逻辑的上下文状态。但 ltask 无论是 send 还是 call 都可能挂起服务。

实际上,ltask 在底层已经不再区分是单项投递 (send) ,还是发起请求期待回应 (call)。所有消息都带有 session ,在底层都会有回应。只不过暴露给业务层时,send 这个 api 会忽略回应消息,call 这个 api 会挂起当前 coroutine 等待对方回应。

消息的回执单可能有三种状态:成功投递、目标服务不存在、目标服务的消息队列忙。

在目前的实现中,队列忙会抛出 error 。这可以极大的缓解系统繁忙时的雪崩效应。不过,以后可以进一步完善,让业务层可以做更细致的处理。例如,选择隔一段时间重新投递、写日志、等等。


这次我把绝大多数业务都放在了 lua 层实现,C 层只提供了必要的机制。对于大部分业务,都应该放在共享服务中。服务应该在处理完当前任务后挂起(在服务的主线程调用 coroutine.yield 即可),等待调度器再次将自己唤醒。ltask 的 api 提供了最少的对消息管道的处理:读取一条消息、发送一条消息、读取发送消息的回执。当一个服务的消息队列不为空或发送消息被处理完成后,调度器都会不断的唤醒服务。

对于 exclusive 服务,我预期是用 message based 模式编写的:即直接用一个消息循环去处理消息,并自行调用系统 api (例如 sleep ,socket select 等),每完成一部分工作后,就调用 corotoutine.yield 把 cpu 让出给调度器。而调度器会使用一个独立系统线程在调度工作完成后,立刻 resume 它。不用担心 exclusive 服务会饿死。

对于 shared 服务,我做了一些 lua 层面的封装工作。使用起来比较接近 skynet ,但做了不少简化。它和 skynet 一样,还是 request/response 模式的。服务可以在启动的时候注册一张表,每条 request 消息的第一个字段用来索引这张表,找到对应的处理函数。服务框架会为每个请求的处理函数创建一个独立的 coroutine ,在它对外发起请求时挂起,收到回应时延续。


最后谈谈新的调度器的实现。大体上和上一篇 blog 相同,但是在实现时,发现比预想的要复杂一点。

第一,消息投递:

每个服务有一个固定长度的消息队列,用于接收消息。这个服务是消息队列的唯一消费者。而调度器是所有消息队列的唯一生产者。

每个服务有一个单元的发送消息槽位和一个单元的回执单槽位。这个服务是它们的唯一生产者,调度器是唯一消费者。如果发送消息没有被调度器取走(消费),那么服务不可以生产下一条消息。

消息从服务产生,调度器取走并投递到目的服务中,并给源服务写入一个回执单。这样便完成了消息投递工作。这里全部是单一的生产者和单一的消费者,故而实现起来比较简单,也不需要任何锁。

第二,共享服务的调度:

每个工作线程都有三个槽位:预备做的任务(服务 id),正在做的任务,已经完成的任务。

预备做的任务由调度器安排进去(调度器是唯一生产者),由工作线程取出(消费者)。但这里有一个特殊的设定:调度器当没有任务可以分配时,它可以偷取已经分配到预备槽位的任务。所以这里的消费者并不唯一(调度器也可能是消费者),需要用 CAS 来操作。

正在做的任务完全是私有槽位,只有工作线程才可以读写。未来可能有监控机制可以读出来供管理员查看。

当工作线程把手头的工作做完后,它需要把已经完成的任务放在完成槽位,等待调度器收走。如果上一个任务迟迟没有收走,那么工作线程就不可以放入下一个完成任务。(这种情况非常罕见,但一旦发生,工作线程将竞争调度器的控制权,执行收走已完成的任务)

第三,调度器:

  1. 处理所有线程的发送消息,投递这些消息,生成回执。
  2. 收取所有工作线程已经完成的任务,放回调度队列中(若消息队列不为空)。
  3. 从调度队列中取出若干服务,分配给预备任务槽为空的工作线程。

调度器并不是一个独立线程,它是一个模块,可以被任意线程调用,但同时只能有一个线程有控制权。工作线程每次完成一个任务后,如果没有被分配下一个预备任务,它都会尝试竞争一次调度器的控制权。如果竞争失败,它会 sleep 。因为有偷取任务的设定,所以不用担心有边界状态导致一个任务被分配后,工作线程 sleep ,导致服务无法运行。

exclusive 服务每次让出后,关联的独立线程一定会抢占一次调度器模块,保证它发出的消息一定会被处理(exclusive 服务有一个专有的消息发送队列,而不仅仅是一个发送槽位。这可以让它批量发送消息)。

Todo :调度器应该根据服务最近被分配的工作线程,尽量保证分配到同一个工作线程处理。目前的实现中,虽然记录了服务最后一次所在的工作线程 id ,但暂时还没有用这个数据指导调度器的分配工作。


目前,ltask 还很初步。我想先把它用在我们的客户端引擎中。所以这次的实现尽量做成跨平台的(包括 windows 平台)。因为客户端对网络层要求不高,所以我暂时还没有实现。我想先可以用 luasocket lsocket 等现成的库来实现,而不必移植 skynet 的网络处理模块。

关于消息,目前复用的 skynet 中的序列化模块。但我想这次底层不再考虑跨进程通讯,未必需要把消息序列化成一个字符串。完全可以用一个自定义的数据结构来保存 lua 的值,用于服务间的数据交换。理论上可以比序列化字符串更高效。这个优化工作留到以后再做。