查看原文
其他

IM之Qunar实现

刘帆 Qunar技术沙龙 2022-09-23

点击蓝字 关注我们

刘帆

2015年5月加入去哪儿网IM团队,主要负责办公IM和客服IM的系统开发和架构设计。擅长使用Erlang语言开发高并发、高可用的应用服务。致力于可扩展的IM系统,模块化组件,使用一套服务实现办公、客服、消息推送等多种功能。

Qunar由于业务上对 IM 系统的需求,以及对 IM 需要支持的功能和扩展,结合市面上已有的 IM 的实现,实现了自己的一套完善的办公 IM 和客服 IM 系统。具备了以下几个重要特点:实时性,可靠性,一致性,安全性,扩展性,高并发。

一、IM 是什么

IM(Instant messaging)是一种通过网络提供实时消息传输的在线沟通技术。IM 系统一般包括:IM 客户端、 IM 服务器、网络以及在他们之间传输的消息。

整个流程类似于我们寄送包裹:用户A(客户端)将写有发件人和收件人的包裹(消息)给到邮局(服务器),邮局(服务器)根据包裹(消息)上收件人的信息,将包裹(消息)发送给用户B(客户端),完成整个消息的传输。

大家常见的 IM 实现:

二、IM 常见实现方案

XMPP 协议

XMPP 是一个开放式的 XML 协议,设计用于准实时消息和出席信息以及请求-响应服务。

XMPP 协议单元包括三个大类:

  • Presence:Presence 决定了 XMPP 实体的状态,用来告诉服务器该实体是在线、离线或者繁忙;

  • Message:用户之间发送和接收的消息;

  • IQ:请求-响应类型的报文;

优点:XMPP 有大量的优秀实现,且社区环境良好,功能和扩展能力丰富。

缺点:报文较大,耗费网络流量和电量。

MQTT 协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 在1999年发布。MQTT 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT 是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT 协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。正是由于它的简单,也带来了他的缺点:需要自己去实现聊天,好友等IM的逻辑。

三、Qunar实现

协议的选择

基于上面调研的常用 IM 协议,我们最终选择了 XMPP 协议最为最开始的实现协议:

  • 因为可以用最小的开发工作来实现基本的聊天功能;

  • 可以使用已有的扩展插件,更快实现更多的功能;

  • 针对 XMPP 的缺点,做针对性的修改,将网络传输这一阶段,改成 protocol buffer 协议,弥补网络流量和电量的短板。

开源项目的选择

基于:

  • ejabberd 是基于 Jabber/XMPP 协议的即时通讯服务器;

  • 由 GPLv2 授权(免费和开放源码);

  • 采用 Erlang/OTP 开发,它的特点是,跨平台,容错,集群和模块;

  • Ejabberd 是可扩展性最好的一种 Jabber/XMPP 服务器之一,支持分布多个服务器,并且具有容错处理,单台服务器失效不影响整个 cluster 运作;

  • Erlang 的调度和 GC 策略更适合 IM 的实时性要求(见参考)。

团队最终选择使用 ejabberd 开源实现来快速实现自己的 IM 功能。

架构设计

    1. 客户端通过两条连接来和服务器进行通讯

  • TCP 长连接( web 使用的 websocket ):该连接上交互的是和状态相关或者多端同步的报文;

  • HTTP 连接:和状态无关或者不需要多端同步的。

    2. 负载均衡

  • TCP 长连接通过 LVS 或者 HA 来做负载均衡;

  • HTTP 通过 nginx 来做负载均衡。

    3. 数据

  • 数据或直接放入数据库或者进入 MQ ,供需要方订阅消费;

  • 高频访问数据放到 redis ,供应用频繁查询,减小数据压力。

    4. 管理维护

  • 提供内网接口,供其他系统扩展 IM 功能;

  • 提供监控和维护工具,方便系统维护和故障处理。

ejabberd 架构

消息流转过程为:

  1. 客户端通过长连接,将消息发送给服务器,进入到负责该连接的 ejabberd_c2s进程;

  2. ejabberd_c2s进程处理完之后,调动 ejabberd_router:route(From,To,Stanza)来路由该消息;

  3. ejabberd_router处理自己的公共逻辑;

  4. 然后如果是发给该IM系统的消息,会将消息发送给 ejabberd_local进程;

  5. ejabberd_local判断如果 To是具体某个人,则会把消息发送给负责该用户连接的 ejabberd_sm进程;

  6. ejabberd_sm进程会查询收消息的用户,有几个在线设备,然后将改消息发送给该用户的多个 ejabberd_c2s进程;

  7. ejabberd_c2s进程将消息通过自己负责的TCP连接,将消息发送给客户端。

这样,一条消息就完整的从用户A传输到了用户B,实现消息的即时沟通。

  • 对于上图中没有提到的部分属于:与另一个IM通讯( ejabberd_s2s)、服务器本地处理( Processedbyserver)

ejabberd 功能扩展

在上一节 ejabberd 架构图中,我们可以在每个步骤中添加 hook 函数,添加自己的扩展功能。我们添加的扩展功能有:

使用 protocolbuffer 协议

由于 XMPP 协议具有:XML 报文流量大、耗电高等缺点,我们通过使用 protocolbuffer 替换掉 xml 报文,实现客户端到服务器之间的流量传输,减少报文流量和降低手机端的耗电量。在服务端将 protocolbuffer 再转换成 ejabberd 服务器使用的 xml 格式,减少服务端 im 逻辑的修改。

消息可靠性

我们通过以下渠道来确保 IM 消息的可靠性:

  • 设备在线时,通过消息确认回执来保证消息正确的发送出去和接收到;

  • 设备不在线时,当再次登录的时候,通过 HTTP 接口,拉取从上次退出到这次登录时间段内所有的历史消息;

  • 每条消息具有唯一id,确保发送消息时候的幂等性,重复发送同一条消息,只会展示和存储一条消息。

消息确认回执

我们在 ejabberd_c2s进程收到消息的时候,添加一个 hook 函数,用来作为服务器对收到消息的确认,同时将时间戳也返回给客户端,作为该消息的时间戳。

消息确认回执,主要解决的是问题是:保证消息至少一次发送成功,只有客户端收到了服务器的确认回执,才会认为消息到达了服务器,得到了响应的处理;否则客户端就会认为消息没有到达服务器,发送失败了。

消息同步

当我们同一个账号同时登录多个设备的时候,需要感知到在其他设备收发的消息,并同步展示给用户。所以我们在收发消息时,需要做必要的处理,以实现多设备同步的功能。

同步发送消息给其他设备:

发布消息到消息队列

为了扩展 IM 功能,我们需要把所有的 IM 的消息和时间发布到消息队列,供其他系统订阅消费,实现消息的统计、分析和存储。所以我们将时间和消息分类放入到 kafka ,实现消息的异步处理。

目前发送发送的包括:消息、上下线事件、驼圈事件以及@事件等

发送消息的 http 接口

为了给其他系统提供发送消息的服务,我们通过提供 http 接口的方式,来模拟来自用户的消息,实现该功能。

下发 IM 认证凭证

我们可以在 IM 里嵌入其它系统,来扩展 IM 的能力,包括但不限于移动 OA 、运维报警等系统。IM 客户端可以在跳转到其它系统的时候,带上 IM 的认证凭证,由其它系统来调用 IM 接口来认证身份。如果认证通过则表明是通过正在登录的 IM 客户端访问的,否则不允许访问该系统。这样就避免了让用户重复的认证身份,提高办公效率和用户体验。

IM 认证凭证的流程是:

  • 当 IM 长连接建立成功且认证通过后,服务器会通过长连接下发 token 给客户端

  • 客户端请求 IM 的 HTTP 接口的时候要带着 token ,用于身份认证

  • 客户端打开其它受信系统的时候,也会带上 token ,用于其它系统做身份认证

  • 当客户端与服务器的长连接断开的时候,服务器会销毁该 token ,使其失效。

增量拉取

在一些客户端和服务器之间需要同步的数据拉取上,我们采用增量更新的逻辑,减少每次服务器的响应数据集,加快客户端的登录和同步流程。

实现方式上,IM 采用以更新时间作为查询的 key ,服务器在每次更新数据的时候,都要更新 updatetime 字段。在客户端再次登录的时候,使用本地最新的更新时间去拉取数据,服务器如果存在比客户端更新的数据,则将增量的数据返回给客户端。

应用的场景有:

  • 组织架构更新

  • 消息历史更新

  • 群列表更新

  • 好友列表更新

  • 个人配置更新

IM 功能扩展

机器人实现

由于我们需要通过 IM 实现一些自助服务或者智能回复,我们需要在 IM 的扩展上实现该功能。

首先我们已经有了所有消息的队列服务功能,然后我们基于消息队列,订阅所有的消息,然后根据系统配置,将发送给特定机器人的消息,转发给对应的机器人服务。机器人服务收到发给自己的消息时,通过自己的系统配置或者自主学习的问题库,进行对应的操作,并调用IM的发消息接口,返回给咨询用户特定的消息。

我们可以通过该方式,实现各种自助和智能化的服务。节省人力成本和提高办公效率。

客服系统实现

对于客服系统,和普通 IM 有一些不同之处。在用户看来,他是在和一个店铺或者一个官方客服在聊天。实际上,后面可能是多个不同的客服,可能还会用到排队、会话超时等逻辑,所以要在常用的 IM 功能上来做扩展。

客服系统订阅所有的 IM 消息,当用户发送消息给客服的时候,客服系统需要对咨询做排队,客服分配,会话建立,然后将用户发给客服的消息转换成发给具体某一个客服的消息,然后发送给客服。

  1. 用户-------------> 店铺 转换成 店铺-----------> 客服

  2. 客服-------------> 店铺 转换成 店铺-----------> 用户

用户侧:

客服侧:

四、数据指标

对于 IM 主要的指标,我们主要关注的有:

  • 同时在线数

  • 建立 TCP 的量

  • 消息量

下面是对应指标的实际数值

同时在线数: 20W左右

建立 TCP 的量( QPS ):3W左右

收到消息的量( QPS ):3W左右

发出消息的量( QPS ):3W左右

五、优化系统参数

由于长连接服务需要同时支持大量的 TCP 连接,默认的系统配置达不到系统要求,我们要按照需求更改我们的系统配置,使性能达到最优。

Linux 操作系统参数

系统全局允许分配的最大文件句柄数:

  1. # 2 millions system-wide

  2. sysctl -w fs.file-max=2097152

  3. sysctl -w fs.nr_open=2097152

  4. echo 2097152 > /proc/sys/fs/nr_open

允许当前会话/进程打开文件句柄数:

  1. ulimit -n 1048576

/etc/sysctl.conf

持久化 ‘fs.file-max’ 设置到 /etc/sysctl.conf 文件:

  1. fs.file-max = 1048576

/etc/systemd/system.conf 设置服务最大文件句柄数:

  1. DefaultLimitNOFILE=1048576

/etc/security/limits.conf

/etc/security/limits.conf 持久化设置允许用户/进程打开文件句柄数:

  1. * soft nofile 1048576

  2. * hard nofile 1048576

TCP 协议栈网络参数

并发连接 backlog 设置:

  1. sysctl -w net.core.somaxconn=32768

  2. sysctl -w net.ipv4.tcp_max_syn_backlog=16384

  3. sysctl -w net.core.netdev_max_backlog=16384

可用知名端口范围:

  1. sysctl -w net.ipv4.ip_local_port_range='1000 65535'

TCP Socket 读写 Buffer 设置:

  1. sysctl -w net.core.rmem_default=262144

  2. sysctl -w net.core.wmem_default=262144

  3. sysctl -w net.core.rmem_max=16777216

  4. sysctl -w net.core.wmem_max=16777216

  5. sysctl -w net.core.optmem_max=16777216


  6. #sysctl -w net.ipv4.tcp_mem='16777216 16777216 16777216'

  7. sysctl -w net.ipv4.tcp_rmem='1024 4096 16777216'

  8. sysctl -w net.ipv4.tcp_wmem='1024 4096 16777216'

TCP 连接追踪设置:

  1. sysctl -w net.nf_conntrack_max=1000000

  2. sysctl -w net.netfilter.nf_conntrack_max=1000000

  3. sysctl -w net.netfilter.nf_conntrack_tcp_timeout_time_wait=30

TIME-WAIT Socket 最大数量、回收与重用设置:

  1. net.ipv4.tcp_max_tw_buckets=1048576


  2. # 注意: 不建议开启该设置,NAT模式下可能引起连接RST

  3. # net.ipv4.tcp_tw_recycle = 1

  4. # net.ipv4.tcp_tw_reuse = 1


  5. FIN-WAIT-2 Socket 超时设置:


  6. net.ipv4.tcp_fin_timeout = 15

六、总结

通过实现基本的 IM 功能,以及各种扩展功能,我们总结出一些 IM 核心功能:

  • 提供稳定的 TCP 长连接服务

  • 提供统一的认证服务

  • 提供高性能的消息订阅和发送消息给其他服务的能力

参考:

  • 【1】erlang GC文档

  • 【2】Erlang Garbage Collection Details and Why It Matters

  • 【3】Erlang Scheduler Details and Why It Matters

  • 【4】ejabberd massive scalability: single node with 2+ million concurrent users

  • 【5】俗说 GC 之 Heap 区内存模型的演进


END

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存