查看原文
其他

入门特辑|5000 字阐述 Apache Pulsar 的核心特性和设计概览

ApachePulsar 2022-09-10

The following article is from 大数据技术与架构 Author 王知无

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/

本文转载自公众号:大数据技术与架构,作者王知无。原文《5000字阐述云原生消息中间件Apache Pulsar的核心特性和设计概览》。
本期排版:Tango@StreamNative

目前的 Pulsar 社区发展十分迅速,Pulsar 的版本也在不断的更新迭代,目前大版本的迭代已经到了 2.8,本文是结合作者个人在 Github 和 Pulsar 社区中对 Pulsar 的探索过程中总结而成。

Pulsar 中的核心概念

这部分你可能会看到非常多的概念跟 Kafka 相关,我们一起来看一下:

Topic

Kafka 中也有 Topic 的概念。Topic 是一个消息目录或者说存放消息的命名空间,也就是消息发布(生产)的位置。一个 topic 可以有一个或多个 producer 和/或 consumer。Producer 向 topic 写入消息,consumer 从 topic 消费消息。下图展示了三者之间如何协同工作。

Bookie

Apache Pulsar 使用 Apache BookKeeper 作为存储层。Apache BookKeeper 针对实时工作负载进行优化,是一项可扩展、可容错、低延迟的存储服务。客户端发布的消息存储在 BookKeeper 的服务器实例中,即 bookie。

Ledger 是 BookKeeper 中的基本存储单元。一系列的 entry 组成一个 ledger,entry 被顺序写入 ledger。

Journal 文件包含 BookKeeper 中的消息写入日志。在更新 ledger 前,bookie 确保已经将更新的交易(交易日志 entry)写入非易失存储。在 bookie 第一次运行或旧的 journal 文件大小达到指定阈值时,会创建新的 journal 文件。

Entry log 文件用于管理 BookKeeper 客户端写入的 entry。来自不同 ledger 的 entry 会被依次写入一个或多个 entry log 中,而偏移量则作为指针保存在 ledger 缓存中,以进行快速查找。

Broker

和 Kafka 一样,在 Pulsar 中,broker 是一个无状态服务器,用于协助读写数据。一个非分区的 topic 不能同时被多个 broker 管理,但是可以存储在多个 bookie 服务中。

Entry

Entry 是存储到 BookKeeper 中的一条记录。

Ledger

可以认为 ledger 是用来存储 Entry 的,多个 Entry 序列组成一个 ledger。

MetaData Storage

元数据存储,是用于存储 bookie 相关的元数据,比如 bookie 上有哪些 ledger。BookKeeper 目前使用的是 ZooKeeper 存储,所在在部署 BookKeeper 前,要先有ZooKeeper 集群。

Journal

其实就是 BookKeeper 的 WAL(write ahead log),用于存 BookKeeper 的事务日志,journal 文件有一个最大大小,达到这个大小后会新起一个 journal 文件。

Entry log

存储 entry 的文件,ledger 是一个逻辑上的概念,不同 ledger 中的 entry 会先按 ledger 聚合,然后写入 entry log 文件中。同样,entry log 会有一个最大大小,达到最大大小后会新起一个新的 entry log 文件

Index file

Ledger 的索引文件,ledger 中的 entry 被写入到了 entry log 文件中,索引文件用于对 entry log 文件中每一个 ledger 做索引,记录每个 ledger 在 entry log 中的存储位置以及数据在 entry log 文件中的长度。

Ledger cache

用于缓存索引文件的,加快查找效率。

数据落盘

内存中会存储一个 LastLogMark,其中包含 txnLogId (journal 文件的 id)和 txnLogPos (journal 文件中的位置),entry log 文件和 index 文件都会先在内存中被缓存。当内存达到一定值或者离上一次刷盘过期了一段时间(定时线程)后,会触发 entry log 文件和 index 文件的刷盘,之后再将 LastLogMark 持久化。当 lastLogMark 被持久化后,表示在 lastLogMark 之前的 entry 和索引都已经写到了磁盘上,这个时候可以将 lastLogMark 之前的 journal 文件清掉,如果 LastLogMark 在持久化前出现了宕机,可以通过 journal 文件做恢复,保证了数据不丢。

Data Compaction

数据的合并,有点类似于 HBase 的 compact 过程。在 bookie 上,虽然 entry log 在刷盘前会按 ledger 做聚合,但是因数据会不断地新增,每个 leadger 的数据会交差存储在 entry log 文件中。而 bookie 上有一个用于做垃圾回收的线程,该线程会将没有关联任何 ledger 的 entry 文件进行删除,以便回收磁盘空间。compaction 的目的则是为了避免 entry log 中只有少数的记录是有关联的 ledger 的情况,不能让这样的 entry log 文件一直占用磁盘空间,所以垃圾收集线程会将这样的 entry log 中有关联 ledger 的 entry 复制到一个新的 entry log 文件中(同时修改索引),然后将老的 entry log 文件删除。与 HBase 类似,BookKeeper 的 compaction 也分为两种:

Minor compaction

当 entry log 中有效的 entry 只占 20#以下时做 compaction。

Major compaction

当 entry log 中有效的占到 80%以下时就可开始做 compaction。

关键特性

•跨地域复制( geo-replication),单个实例原生支持多个集群(跨集群复制);•极低的发布延迟和端到端延迟;•可无缝扩展到超过一百万个 topic;•简单的客户端 API,支持 Java、Go、Python 和 C++;•支持多种 topic 订阅模式:独占订阅、共享订阅、故障转移订阅、键共享(exclusive, shared, failover, key_shared);•通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递;•由轻量级的无服务器(serverless)计算框架 Pulsar Functions 实现流原生的数据处理;•基于 Pulsar Functions 的无服务器连接器框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar;•分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如 S3、GCS)中。

Pulsar 的架构设计

一个 Pulsar 实例由一个或多个 Pulsar 集群组成。实例中的集群可以在它们之间复制数据。一个 Pulsar cluster 由三部分组成:

•一个或者多个 broker:负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer。Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务;•一个 BookKeeper:包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储;•一个 ZooKeeper:特定于某个 Pulsar 集群的 ZooKeeper 集群处理 Pulsar 集群之间的协调任务。

特别需要注意的是:集群间可以通过跨地域复制(Geo-Replication)进行消息同步。

ZooKeeper

ZooKeeper 负责存储元数据,集群配置,协调:其中 local ZooKeeper 负责 Pulsar Cluster 内部的配置,global ZooKeeper 则用于 Pulsar Cluster 之间的数据复制。

Bookie 负责存储;Broker 负责负载均衡和消息的读取、写入等;Global replicators 负责集群间的数据复制。

Apache BookKeeper

在这里我们着重介绍一下 Apache BookKeeper。Pulsar 用 Apache BookKeeper 作为持久化存储,BookKeeper 有以下几个特性:

•利用多个 ledger 保存独立的日志;•为按条目复制的顺序数据提供了非常高效的存储;•保证了多系统挂掉时 ledgers 的读取一致性;•提供不同的 Bookies 之间均匀的 IO 分布的特性;•在容量和吞吐量上都可以水平扩展。通过向集群添加更多 bookie,可以立即增加容量;•Bookies 可以包含数千个具备同时读写功能的 ledger。使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样 Bookies 可以将读操作的影响和对于写操作的延迟分隔开;•除消息数据外,游标(cursors)还永久存储在 BookKeeper 中;Cursors 是消费端订阅消费的位置;BookKeeper 让 Pulsar 可以用一种可扩展的方式存储消费位置。

Ledgers

Ledger 是一个只追加(append-only)的数据结构,并且只有一个写入器,这个写入器负责多个 BookKeeper 存储节点(就是 Bookies)的写入。Ledger 的条目(entries)会被复制到多个 bookies。Ledgers 具有以下特性:

•Pulsar Broker 可以创建 ledeger,添加内容到 ledger 和关闭 ledger;•当一个 ledger 被关闭后,除非明确的要写数据或者是因为写入器挂掉导致 ledger 关闭,这个 ledger 只会以只读模式打开;•最后,当 ledger 中的条目不再有用的时候,整个 legder 可以被删除(ledger 分布是跨 Bookies 的)。

Pulsar Geo-replication

•多个 Broker 节点组成一个 Pulsar Cluster;多个 Pulsar Cluster 组成一个 Pulsar Instance;•Pulsar 通过 geo-replication 支持一个 Instance 内在不同的集群发送和消费消息。


下图说明了 Pulsar 在不同集群之间跨地域复制的过程:

在上图中,每当 P1,P2 和 P3 生产者将消息分别发布到 Cluster-A,Cluster-B 和 Cluster-C 群集上的 T1 主题时,这些消息就会立即在集群之间复制。复制消息后,C1 和 C2 consumer 可以使用它们各自群集中的消息。没有 geo-replication,C1 和 C2 consumer 将无法使用 P3 producer 发布的消息。

分层存储

通过使用分层存储(Tiered Storage),在 backlog 中的旧消息可以从 BookKeeper 转移到更廉价的存储中,不出其他问题,客户端将仍然可以访问 backlog,降低了存储成本。

Pulsar 当前支持 S3、Google Cloud Storage (GCS) 和文件系统(filesystem)来做长期存储(long term store),可以将数据卸载(Offloading)到长期存储中。

Pulsar 的核心设计

保证不丢失消息

我们直接引用 Pulsar 官方博客中的总结:Pulsar Broker 是无状态的,没有不能丢失的持久化状态,与存储层分开。

BookKeeper 集群本身并不执行复制,每个 Bookie 只是一个跟随者,被领导者人指使做什么,领导人是 Pulsar Broker。每个 Topic 都由一个 Pulsar Broker 拥有,该 Broker 提供 Topic 的所有读写操作。Pulsar 通过让领导人 (Pulsar Broker) 没有状态,BookKeeper 的 fencing 特性可以很好的处理脑裂问题。没有脑裂,没有分歧,没有数据丢失。

此外,当在 Bookie 上写入数据时,首先将该消息写入日志文件,这是一个预写日志(WAL),它可以帮助 BookKeeper 在发生故障时避免数据丢失。它与关系型数据库持久化保证的机制相同。

强顺序性保证

Pulsar 的顺序保证只在特定的模式下才能得到保证。BookKeeper 容许将磁盘 IO 做读写分离。写入都按顺序写入日志文件可以存储在专用的磁盘上,并且可以批量刷盘以获得搞得吞吐量。除此之外从写入操作来看没有其他的同步磁盘 IO 操作,数据都是写入到内存的缓存区。

读写延迟

关于读写延迟测试可以参考 Pulsar和Kafka基准测试:Pulsar性能精准解析

这篇博客包含了多场景下,Pulsar 和 Kafka 支持的各持久性级别、以及在同一持久性级别下两者的吞吐量和延迟的对比。

相关阅读

假期充电包 | Apache Pulsar 从入门到实践直播回顾|TGIP-CN 032:Apache Pulsar 快速上手实战


点击“阅读原文”,获取 Apache Pulsar 硬核干货资料!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存