查看原文
其他

Apache Pulsar在智联招聘的实践 -- 从消息队列到基于Apache Pulsar的事件中心

李鹏辉 ApachePulsar 2020-01-08

导读:本文中鹏辉介绍了以前的消息中间件在智联招聘的应用和场景;以及对消息中间件选型的诉求;详细描述了选型过程中的细致思考。接着介绍了为什么会选择Pulsar,以及Pulsar中和智联的场景匹配的特性。最后提供了详细的Pulsar落地实践。


业务场景


消息队列作为智联招聘非常重要的平台级服务负责全业务线的消息投递。有很多非常典型的业务场景,我们用一个业务场景简历投递来说明消息队列为业务提供的支持


图1.简历投递业


当C端用户发生一次简历投递的时候会先发送一条消息到消息队列服务,C端中台、B端中台以及平台级的基础服务会从消息队列消费这条消息进行自己的业务逻辑处理比如写DB、通知B端企业等,在这个场景中消息队列为投递业务提供了很好异步解耦支持,在高峰期的时候可以提供很好的削峰作用以保障各业务系统的稳定运行。


上面的这个场景是非常典型的工作队列场景,在工作队列中大多数是为了实现业务场景支持在线服务而设定的,往往具有以下特点:

  1. 一条消息会被多个业务方消费

  2. 多个业务方之间广播方式消费(每个业务方消费完整的一份数据)

  3. 单个业务方采用集群消费模式(每个consumer消费部分数据)

  4. 每条消息都需要确保送达,消息队列会采用重试的机制来保证这一点

  5. 重要业务消息需要提供跟踪机制可以查询整个消息的生命周期


还有一些业务的需求会使用到延时消息,定时消息等。


基于RabbitMQ的自研MQService

RabbitMQ作为一款非常成熟的消息队列产品可以很好的应对工作队列的场景,当然也有一些不足比如单队列的扩展能力、延时消息支持的不够好等。我们在RabbitMQ基础上又做了一层抽象(MQService),将RabbitMQ看做一个消息服务的存储节点来用,在Zookeeper中会记录Topic的数据在RabbitMQ节点的分布并增加了容错的特性来保证存储节点失败的情况下可以持续提供消息写入能力。在招聘旺季消息队列每天约承载数10亿的消息投递。


MQService整体结构如下:


图2.MQService架构


用户可以通过Thrift协议、Http协议、MQTT来做消息的发送和消费。用户也可以注册Http回调接口来消费消息。默认的Java客户端封装了Thrift协议及MQTT的消息生产及消费,其他语言并没有封装对应的客户端而是通过Http协议进行消息的生产和消费,整体智联招聘也是以Java做后台服务为主。


接下来我们看一下通过zookeeper维护的Topic与RabbitMQ分组以及RabbitMQ节点的关系。


每个Topic都对对应到一个Group,每个Group下会挂一些RabbitMQ节点。当producer发送一条消息时MQService会从缓存中拿到对应这个Topic可用的RabbitMQ节点的列表,MQService会通过负载均衡策略选择其中的一个RabbitMQ节点进行写入,写入失败会重试下一个节点直到写入成功。单个节点如果写入失败次数在一定时间内达到一个特定值会触发熔断机制,单个RabbitMQ节点在熔断期间不对外提供写入及查询服务。


通过上面的介绍,大家应该可以对MQService有一个直观的印象,这里不再详细展开来介绍实现的细节。


以Kafka为中心的流批处理

首先Kafka在智联有招聘有大规模的应用,每天的数据传输量大约在数十TB量级,覆盖的范围包括ELK、实时计算等。我们还是以计算每天不同时间端的投递量为例子来介绍Kakfa在这个场景下的使用。



图3.基于Kafka的Streaming模型


这是一个非常经典的流式计算的架构,通过采集业务日志入Kafka,再通过Spark/Flink之类的计算框架做计算工作。在计算投递量的场景中,我们通过将计算结果按小时以及职位为维度将计算结果保存在MySQL中,也会将明细数据存储在Hive中供离线计算使用。


那么很容易发现同一个业务场景但是数据来源是不一样的,一个是业务方发送至MQService的一个是通过Logstash采集业务端日志来做的,那么我们如何来保证这两份数据的一致性?即使不通过采集日志,业务方去双写又如何来保证一致性呢?这样也给用户带来额外的负担。


矛盾点在于MQService很难融入到流行的实时计算框架或者批处理框架,而Kafka也应用在工作队列模式下显得有点力不从心。主要体现在Kakfa数据消费的支持,Partition数量与Consumer数量的绑定造成的Partition数量要跟着consumer的消费能力决定,业务方处理数据很难保证一批数据都能够成功处理,做offset commit的时候也是无法达到用户的预期。这是基于产品在业务场景的匹配上而讨论论的,就像Kafka介绍的那样(A distributed streaming platform)。


因此在2018年初时我们提出了通过一套方案来解决“工作队列 + Streaming”的想法也就是事件中心,期望事件中心可以承载智联全业务线用户行为、中台以及后台的业务事件传递。产品和业务系统在服务过程中会产生事件,事件是在先前定义好的,对于事件的生产方无需关心事件的消费,只需要关心事件的格式定义以及事件的生产。事件的消费方可以在事件中心去查阅自己想要的订阅的事件来申请订阅,甚至是数据产品也可以在事件中心去找找灵感。


这样我们不需要关心两个消息中间件数据的一致性问题,一份数据就可以匹配“工作队列 + Streaming”场景,对资源消耗、系统运维都有很好的改善。


工作队列 + Streaming场景全新诉求


有了这个想法之后我们开始总结我们需要的是一个什么样的产品,然后开始围绕我们的需求去做设计工作和技术调研工作。我们总结了我们一些诉求如下:


容灾能力及一致性

数据做分布式存储并且在分布式环境中要保证一致性。有一些重要的业务是依赖消息可靠性以及数据一致性的,所以在技术选型的时候如果在一个支持一致性的模型下去弱化一致性提升可用性是比较容易的,但是如果在一个没有一致性模型的方案上去做一致性这将会需要一个很大的改动。


单Topic扩展能力

就像在MQService描述的那样,同一个Topic可以利用多个节点来做横向扩展。Kafka在这一点做了很好的抽象(Partition)。同一个Partition的事件可以提供顺序性消费。


累计签收与单条签收

累计签收主要应用在Streaming场景下,而单条签收可以很好的匹配工作队列的场景,就像一次简历投递的业务处理,业务本身没有顺序性的要求,单条签收可以很好的支持消费者的消费能力扩展。而在累计签收模式下单分区是要保证顺序性的。


事件回溯能力

我们需要根据不同的事件来决定保留的时长或大小,可以为一些想要拿到历史事件的业务提供支持,我们也可以看到这也是MQService(上文提到的)薄弱的地方,MQService是无法给用户提供回溯能力的。


基于上面的一些主要的特性我们开始了技术选型的调研工作。经过了一段时间的调研工作后我们发现开源的消息中间件产品兼顾容灾能力和一致性的产品几乎没有,因此我们有一个想法那就是基于一个强一致性的分布式日志存储系统来做队列功能的开发,期间也考虑过使用Raft协议+Log存储但是最终还是Bookkeeper吸引了我们的关注,Bookeeper提供了开箱即用的API以及在Twitter、Hadoop有着大规模应用的场景,其稳定性以及成熟度等都是可以保证的。因此我们在大约5月份的时候已经开始做基于Bookkeeper事件中心的一些设计工作,在接触Bookkeeper社区的的时候才了解到Apache Puslar,通过了一段时间对Apache Pulsar的了解,以及社区活跃度的一些观察和Apache Pulsar社区小伙伴们的大力支持下,我们决定基于Apache Pulsar来搭建我们的事件中心。


为什么选择 Apache Pulsar 


Apache Pulsar有很多特性在满足事件中心需求的前提了还给了我们更多的惊喜,为更多的场景提供非常好的解决方案。


灵活的可用性和一致性选择

在每个Topic中由一系列的Ledger构成,每个Ledger有三个关键配置:


  • Ensemble Size (E)

  • Write Quorum Size (Qw)

  • Ack Quorum Size (Qa)


Ensemble Size (E) 决定了Pulsar写入Ledger可用的Bookies池的大小。

Write Quorum (Qw) 是Pulsar将要写入的实际的Bookies数量。可以等于或者小于E。


图4.E = 3 Qw = 3


当Qw小于E时,以条带化的方式分配读/写即每个Bookie只提供读写请求的子集。因此可以提升吞吐量,降低延迟。这也是提升单个Partition吞吐能力的一个很好的方案,这也得益于基于Segment为物理单元的存储设计。消息通过Robin的方式写入指定的Bookie,在查询消息是可以根据MessageId取模即能获得所在的Bookie列表。

图5.E = 5 Qw = 3


Ack Quorum (Qa) 是确认写入Bookies的数量,Pulsar Broker将确认发送给客户端。为了一致性,Qa应该是:(Qw + 1) / 2 或者更大。


这个特性可以很好的让我们在可用性和一致性上去做选择。


订阅的抽象

单队列的扩展Kafka为我们做了很好的抽象,Apache Pulsar也基本采用相同的思路。而对于订阅的抽象,我们认为Apache Pulsar再一次为我们做了很好的抽象,通过3种不同的订阅方式来匹配不同的使用场景。


图6.Apache Pulsar对订阅的抽象


消息存储在Topic中。逻辑上一个Topic是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar使用游标来跟踪偏移量。生产者将消息发送到一个指定的Topic,Apache Pulsar保证消息一旦被确认就不会丢失(正确的配置和非整个集群故障的情况下)。


消费者通过订阅来消费Topic中的消息。订阅是游标(跟踪偏移量)的逻辑实体,并且还根据不同的订阅类型提供一些额外的保证


  • Exclusive(独享) - 一个订阅只能有一个消息者消费消息

  • Shared(共享) - 一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息

  • Fail-Over(灾备) - 一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。


一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。


Apache Pulsar通过允许消费者将Topic看做在消费者消费确认后删除消息的队列,或者消费者可以根据游标的回放来提供队列和日志的语义。在底层都使用日志作为存储模型。


这为我们通过一套系统支持工作队列和Streaming的诉求提供了很好的支持,在工作队列场景我们使用share模式,在Streaming模式我们使用Failover或者Exclusive。我们只需要一份数据就可以同时支持两种场景。


更好的IO和存储设计

当在Bookie上写入数据时,首先将该消息写入日志文件,这是一个预写日志(WAL),它可以帮助Bookkeeper在发生故障时避免数据丢失。它与关系型数据库持久化保证的机制相同。


写入预写日志的操作完成后会将数据放入缓存。写入的缓存会在内存中做积累并定期进行排序和刷盘。对写入进行排序以便将同一Ledger的条目放在一起,从而提高读取性能。如果条目以严格的时间顺序写入,在读取时无法利用磁盘的高效顺序操作


Bookkeeper容许将磁盘IO做读写分离。写入都按顺序写入日志文件可以存储在专用的磁盘上,并且可以批量刷盘以获得搞得吞吐量。除此之外从写入操作来看没有其他的同步磁盘IO操作,数据都是写入到内存的缓存区。


写缓存通过异步的方式批量将条目写入到日志文件和RocksDB,因此,一个磁盘用于同步写入日志文件,另一个磁盘用于异步写入数据和读取操作,


图7.Apache Pulsar的IO及存储设计


在存储设计上Bookkeeper以Segment为中心设计对系统扩容、冷热数据分离提供了很好的支持。在扩容方面通过增加Bookie节点就可以分担整个集群的存储压力,在冷热数据分离方面通过将Segment搬迁至二级存储如S3、OSS等更廉价的存储设备中,支持在线业务往往使用SSD来做存储。因此我们可以兼顾热数据的高性能与冷数据的大空间存储。


图8.Bookie 扩容


图9.冷数据搬迁


在IO和存储设计上以及Offload的特性给了我们更多的惊喜,可以更好的为我们在不影响在线业务的支持上兼顾大量事件存储需求的痛点,大大的降低了冷数据的存储成本。我们计划将冷数据存储至OSS。


上面挑选了Apache Pulsar非常核心的3个messaging特性来做介绍,这与事件中心的初衷是非常匹配的,然而Apache Pulsar远不止这些,有完善的多租户特性提供Topic的分层次管理,多种Schema的支持为数据校验、序列化提供更便捷的方式,轻量级的Pulsar Function以及Pulsar SQL都是非常值得去探索的特性,这里就不一一展开介绍了。


Apache Pulsar在智联招聘的落地实践


下面将介绍Apache Pulsar在智联招聘落地过程中的一些实践


为Namespace设置合理的Backlog Quota

Pulsar为我们提供了Backlog的机制能够记录每个Subscription的消费状况。也提供了Backlog Quota的设置,主要可以设置Backlog大小以及达到阈值时的控制策略。


Backlog Quota的控制策略有3种:


  1. producer_request_hold

  2. producer_exception

  3. consumer_backlog_eviction


producer_request_hold作为默认配置,在达到Backlog设置的大小阈值后会block producer发消息操作,这个配置不适合用于消息发送方是在线业务的使用场景。


producer_exception在达到Backlog设置的大小阈值后,producer会快速失败。


consumer_backlog_eviction在达到Backlog设置的大小阈值后会将subscription未签收的头部数据逐出,可以理解为自动签收。其实这个和producer_exception的区别在于producer_exception对于订阅方将会丢失尾部数据,而consumer_backlog_eviction是丢失头部数据。


我们大部分使用consumer_backlog_eviction策略。目前Pulsar支持在namespace级别设置这个策略,在2.2.0版本可以在broker.conf文件修改全局策略。将Backlog作为一个重点的监控项监控起来也是非常有必要的,后面会说到这部分。


增加MaxProducersPerTopic限制

防止错误或者恶意的Client使用造成Broker维持大量的Producer对象。Broker默认的配置是不限制的,增加限制可以提升Pulsar Cluster的安全性。


Pulsar提供两种方式来设定MaxProducerPerTopic


  1. broker.conf 中设置 maxProducersPerTopic

  2. 通过./pulsar-admin namespaces set-max-producers-per-topic -p


目前我们在broker.conf中的 maxProducersPerTopic = 10000,如果namespace有个性需求的话通过 ./pulsar-admin namespaces set-max-producers-per-topic -p 设置。


Apache Pulsar监控与报警

Pulsar提供丰富的Prometheus指标信息输出,我们可以这些指标信息来做好Pulsar的监控报警。Pulsar的客户端也记录了丰富的指标,我们做了一个client的扩展包将client的节点信息记录在Zookeeper中,由Prometheus自动发现,这样Client端的指标信息由Prometheus采集。


配合Grafana的监控展示,实时了解集群的状态

图10.集群状态看板-1


图11.集群状态看板-2



图12.分Namespace状态展示


报警规则配置:

  1. Client 发送失败次数

  2. Backlog 超阈值

  3. Rates/Network 超阈值

  4. Client > 50ms 延迟

  5. Broker > 50ms 延迟

  6. Storage Size 超阈值


Pulsar多集群流量切换

为了避免集群整体不可用,我们通过Zookeeper控制Client的连接串。基于Pulsar Client的Service URL Provider基础上做的二次开发。在Zookeeper中存储的Pulsar连接串改变的时候,Client会自动断掉当前的连接并重新与新的Pulsar地址进行连接。


为重要业务提供消息链路追踪

我们基于Pulsar Client的Interceptor接口以及Zipkin进行二次开发,为了实现消息的链路跟踪。消息链路跟踪的方案是通过日志采集统一入Hbase。每条消息都具备消息链路跟踪成本是昂贵的,并不适用于所有的场景,更适应与一些比较重要切消息量不太大的场景,当然这个根据不同的业务而定。


总结

我们通过介绍之前的消息中间件在智联招聘的应用情况来说明我们的痛点所在,我们计划打造一个可以解决当下痛点的产品来支撑智联招聘的业务。我们通过一段时间的技术选型工作后最终选择了Apache Pulsar作为我们的搭建企业级事件中心的基础。


截止目前事件中心接入的事件种类约100个,每天产生5亿事件量。部分业务通过灰度的方式接入,计划在11月底能够接入20亿事件量/日。


智联招聘也在持续为Apache Pulsar社区贡献新的特性比如Dead Letter Topic, Client Interceptors等,智联招聘有很多业务场景也非常依赖延时消息特性,后面我们也会在Pulsar上贡献此特性。


感谢Apache Pulsar社区小伙伴们在项目落地过程中的技术支持。





    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存