时间:2022-08-04 21:40来源:财神爷站
BIGO?于?2014?年成立,是一家高速发展的科技公司。基于强大的音视频处理技术、全球音视频实时传输技术、人工智能技术、CDN?技术,BIGO?推出了一系列音视频类社交及内容产品,包括?Bigo?Live(直播)和?Likee(短视频)等,在全球已拥有近?1?亿用户,产品及服务已覆盖超过?150?个国家和地区。
1挑战
最初,BIGO?的消息流平台主要采用开源?Kafka?作为数据支撑。随着数据规模日益增长,产品不断迭代,BIGO?消息流平台承载的数据规模出现了成倍增长,下游的在线模型训练、在线推荐、实时数据分析、实时数仓等业务对消息流平台的实时性和稳定性提出了更高的要求。开源的?Kafka?集群难以支撑海量数据处理场景,我们需要投入更多的人力去维护多个?Kafka?集群,这样成本会越来越高,主要体现在以下几个方面:
数据存储和消息队列服务绑定,集群扩缩容?/?分区均衡需要大量拷贝数据,造成集群性能下降。
当分区副本不处于?ISR(同步)状态时,一旦有?broker?发生故障,可能会造成数据丢失或该分区无法提供读写服务。
当?Kafka?broker?磁盘故障?/?空间占用率过高时,需要进行人工干预。
集群跨区域同步使用?KMM(Kafka?Mirror?Maker),性能和稳定性难以达到预期。
在?catch-up?读场景下,容易出现?PageCache?污染,造成读写性能下降。
Kafka?broker?上存储的?topic?分区数量有限,分区数越多,磁盘读写顺序性越差,读写性能越低。
Kafka?集群规模增长导致运维成本急剧增长,需要投入大量的人力进行日常运维;在?BIGO,扩容一台机器到?Kafka?集群并进行分区均衡,需要?0.5?人?/?天;缩容一台机器需要?1?人?/?天。
如果继续使用?Kafka,成本会不断上升:扩缩容机器、增加运维人力。同时,随着业务规模增长,我们对消息系统有了更高的要求:系统要更稳定可靠、便于水平扩展、延迟低。为了提高消息队列的实时性、稳定性和可靠性,降低运维成本,我们开始考虑是否要基于开源?Kafka?做本地化二次开发,或者看看社区中有没有更好的解决方案,来解决我们在维护?Kafka?集群时遇到的问题。
2为什么选择?Pulsar
2019?年?11?月,我们开始调研消息队列,对比当前主流消息流平台的优缺点,并跟我们的需求对接。在调研过程中,我们发现?Apache?Pulsar?是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。Pulsar?能够无缝扩容、延迟低、吞吐高,支持多租户和跨地域复制。最重要的是,Pulsar?存储、计算分离的架构能够完美解决?Kafka?扩缩容的问题。Pulsar?producer?把消息发送给?broker,broker?通过?bookie?client?写到第二层的存储?BookKeeper?上。
Pulsar?采用存储、计算分离的分层架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。
水平扩容:能够无缝扩容到成百上千个节点。
低延迟:在大规模的消息量下依然能够保持低延迟(小于?5?ms)。
持久化机制:Pulsar?的持久化机制构建在?Apache?BookKeeper?上,实现了读写分离。
读写分离:BookKeeper?的读写分离?IO?模型极大发挥了磁盘顺序写性能,对机械硬盘相对比较友好,单台?bookie?节点支撑的?topic?数不受限制。
为了进一步加深对?Apache?Pulsar?的理解,衡量?Pulsar?能否真正满足我们生产环境大规模消息?Pub-Sub?的需求,我们从?2019?年?12?月开始进行了一系列压测工作。由于我们使用的是机械硬盘,没有?SSD,在压测过程中遇到了一些性能问题,在?StreamNative?的协助下,我们分别对?Broker?和?BookKeeper?进行了一系列的性能调优,Pulsar?的吞吐和稳定性均有所提高。
经过?3~4?个月的压测和调优,我们认为?Pulsar?完全能够解决我们使用?Kafka?时遇到的各种问题,并于?2020?年?4?月在测试环境上线?Pulsar。
3Apache?Pulsar?at?BIGO:Pub-Sub?消费模式
2020?年?5?月,我们正式在生产环境中使用?Pulsar?集群。Pulsar?在?BIGO?的场景主要是?Pub-Sub?的经典生产消费模式,前端有?Baina?服务(用?C++?实现的数据接收服务),Kafka?的?Mirror?Maker?和?Flink,以及其他语言如?Java、Python、C++?等客户端的?producer?向?topic?写入数据。后端由?Flink?和?Flink?SQL,以及其他语言的客户端的?consumer?消费数据。
在下游,我们对接的业务场景有实时数仓、实时?ETL(Extract-Transform-Load,将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程)、实时数据分析和实时推荐。大部分业务场景使用?Flink?消费?Pulsar?topic?中的数据,并进行业务逻辑处理;其他业务场景消费使用的客户端语言主要分布在?C++、Go、Python?等。数据经过各自业务逻辑处理后,最终会写入?Hive、Pulsar?topic?以及?ClickHouse、HDFS、Redis?等第三方存储服务。
4Pulsar?+?Flink?实时流平台
在?BIGO,我们借助?Flink?和?Pulsar?打造了实时流平台。在介绍这个平台之前,我们先了解下?Pulsar?Flink?Connector?的内部运行机理。在?Pulsar?Flink?Source/Sink?API?中,上游有一个?Pulsar?topic,中间是?Flink?job,下游有一个?Pulsar?topic。我们怎么消费这个?topic,又怎样处理数据并写入?Pulsar?topic?呢?
按照上图左侧代码示例,初始化一个?StreamExecutionEnvironment,进行相关配置,比如修改?property、topic?值。然后创建一个?FlinkPulsarSource?对象,这个?Source?里面填上?serviceUrl(brokerlist)、adminUrl(admin?地址)以及?topic?数据的序列化方式,最终会把?property?传进去,这样就能够读取?Pulsar?topic?中的数据。Sink?的使用方法非常简单,首先创建一个?FlinkPulsarSink,Sink?里面指定?target?topic,再指定?TopicKeyExtractor?作为?key,并调用?addsink,把数据写入?Sink。这个生产消费模型很简单,和?Kafka?很像。
Pulsar?topic?和?Flink?的消费如何联动呢?如下图所示,新建?FlinkPulsarSource?时,会为?topic?的每一个分区新创建一个?reader?对象。要注意的是?Pulsar?Flink?Connector?底层使用?reader?API?消费,会先创建一个?reader,这个?reader?使用?Pulsar?Non-Durable?Cursor。Reader?消费的特点是读取一条数据后马上提交(commit),所以在监控上可能会看到?reader?对应的?subion?没有?backlog?信息。
Offset?Commit?完成后,Pulsar?broker?会将?Offset?信息(在?Pulsar?中以?Cursor?表示)存储到底层的分布式存储系统?BookKeeper?中,这样做的好处是当?Flink?任务重启后,会有两层恢复保障。第一种情况是从?checkpoint?恢复:可以直接从?checkpoint?里获得上一次消费的?message?id,通过这个?message?id?获取数据,这个数据流就能继续消费。如果没有从?checkpoint?恢复,Flink?任务重启后,会根据?SubionName?从?Pulsar?中获取上一次?Commit?对应的?Offset?位置开始消费。这样就能有效防止?checkpoint?损坏导致整个?Flink?任务无法成功启动的问题。
为了降低?Flink?消费?Pulsar?topic?的门槛,让?Pulsar?Flink?Connector?支持更加丰富的?Flink?新特性,BIGO?消息队列团队为?Pulsar?Flink?Connector?增加了?Pulsar?Flink?SQL?DDL(Data?Definition?Language,数据定义语言)?和?Flink?1.11?支持。此前官方提供的?Pulsar?Flink?SQL?只支持?Catalog,要想通过?DDL?形式消费、处理?Pulsar?topic?中的数据不太方便。在?BIGO?场景中,大部分?topic?数据都以?JSON?格式存储,而?JSON?的?schema?没有提前注册,所以只能在?Flink?SQL?中指定?topic?的?DDL?后才可以消费。针对这种场景,BIGO?基于?Pulsar?Flink?Connector?做了二次开发,提供了通过?Pulsar?Flink?SQL?DDL?形式消费、解析、处理?Pulsar?topic?数据的代码框架(如下图所示)。
左边的代码中,第一步是配置?Pulsar?topic?的消费,首先指定?topic?的?DDL?形式,比如?rip、rtime、uid?等,下面是消费?Pulsar?topic?的基础配置,比如?topic?名称、service-url、admin-url?等。底层?reader?读到消息后,会根据?DDL?解出消息,将数据存储在?test_flink_sql?表中。第二步是常规逻辑处理(如对表进行字段抽取、做?join?等),得出相关统计信息或其他相关结果后,返回这些结果,写到?HDFS?或其他系统上等。第三步,提取相应字段,将其插入一张?hive?表。由于?Flink?1.11?对?hive?的写入支持比?1.9.1?更加优秀,所以?BIGO?又做了一次?API?兼容和版本升级,使?Pulsar?Flink?Connector?支持?Flink?1.11。BIGO?基于?Pulsar?和?Flink?构建的实时流平台主要用于实时?ETL?处理场景和?AB-test?场景。
实时?ETL?处理场景
实时?ETL?处理场景主要运用?Pulsar?Flink?Source?及?Pulsar?Flink?Sink。这个场景中,Pulsar?topic?实现几百甚至上千个?topic,每个?topic?都有独立的?schema。我们需要对成百上千个?topic?进行常规处理,如字段转换、容错处理、写入?HDFS?等。每个?topic?都对应?HDFS?上的一张表,成百上千个?topic?会在?HDFS?上映射成百上千张表,每张表的字段都不一样,这就是我们遇到的实时?ETL?场景。
随着程序运行,我们发现这种方案也存在问题:算子之间压力不均衡。因为有些?topic?流量大,有些流量小,如果完全通过随机哈希的方式映射到对应的?task?manager?上去,有些?task?manager?处理的流量会很高,而有些?task?manager?处理的流量很低,导致有些?task?机器上积塞非常严重,拖慢?Flink?流的处理。所以我们引入了?slot?group?概念,根据每个?topic?的流量情况进行分组,流量会映射到?topic?的分区数,在创建?topic?分区时也以流量为依据,如果流量很高,就多为?topic?创建分区,反之少一些。分组时,把流量小的?topic?分到一个?group?中,把流量大的?topic?单独放在一个?group?中,很好地隔离了资源,保证?task?manager?总体上流量均衡。
AB-test?场景
实时数仓需要提供小时表或天表为数据分析师及推荐算法工程师提供数据查询服务,简单来讲就是?app?应用中会有很多打点,各种类型的打点会上报到服务端。如果直接暴露原始打点给业务方,不同的业务使用方就需要访问各种不同的原始表从不同维度进行数据抽取,并在表之间进行关联计算。频繁对底层基础表进行数据抽取和关联操作会严重浪费计算资源,所以我们提前从基础表中抽取用户关心的维度,将多个打点合并在一起,构成一张或多张宽表,覆盖上面推荐相关的或数据分析相关的?80%?~?90%?场景任务。
在实时数仓场景下还需实时中间表,我们的解决方案是,针对?topic?A?到?topic?K?,我们使用?Pulsar?Flink?SQL?将消费到的数据解析成相应的表。通常情况下,将多张表聚合成一张表的常用做法是使用?join,如把表?A?到?K?按照?uid?进行?join?操作,形成非常宽的宽表;但在?Flink?SQL?中?join?多张宽表效率较低。所以?BIGO?使用?union?来替代?join,做成很宽的视图,以小时为单位返回视图,写入?ClickHouse,提供给下游的业务方实时查询。使用?union?来替代?join?加速表的聚合,能够把小时级别的中间表产出控制在分钟级别。
输出天表可能还需要?join?存放在?hive?上的表或其他存储介质上的离线表,即流表和离线表之间?join?的问题。如果直接?join,checkpoint?中需要存储的中间状态会比较大,所以我们在另外一个维度上做了优化。
左侧部分类似于小时表,每个?topic?使用?Pulsar?Flink?SQL?消费并转换成对应的表,表之间进行?union?操作,将?union?得到的表以天为单位输入到?HBase(此处引入?HBase?是为了做替代它的?join)。
右侧需要?join?离线数据,使用?Spark?聚合离线的?Hive?表(如表?a1、a2、a3),聚合后的数据会通过精心设计的?row-key?写入?HBase?中。数据聚合后状态如下:假设左边数据的?key?填了宽表的前?80?列,后面?Spark?任务算出的数据对应同样一个?key,填上宽表的后?20?列,在?HBase?中组成一张很大的宽表,把最终数据再次从?HBase?抽出,写入?ClickHouse,供上层用户查询,这就是?AB-test?的主体架构。
5业务收益
从?2020?年?5?月上线至今,Pulsar?运行稳定,日均处理消息数百亿,字节入流量为?2~3?GB/s。Apache?Pulsar?提供的高吞吐、低延迟、高可靠性等特性极大提高了?BIGO?消息处理能力,降低了消息队列运维成本,节约了近?50%?的硬件成本。目前,我们在几十台物理主机上部署了上百个?Pulsar?broker?和?bookie?进程,采用?bookie?和?broker?在同一个节点的混部模式,已经把?ETL?从?Kafka?迁移到?Pulsar,并逐步将生产环境中消费?Kafka?集群的业务(比如?Flink、Flink?SQL、ClickHouse?等)迁移到?Pulsar?上。随着更多业务的迁移,Pulsar?上的流量会持续上涨。
我们的?ETL?任务有一万多个?topic,每个?topic?平均有?3?个分区,使用?3?副本的存储策略。之前使用?Kafka,随着分区数增加,磁盘由顺序读写逐渐退化为随机读写,读写性能退化严重。Apache?Pulsar?的存储分层设计能够轻松支持百万?topic,为我们的?ETL?场景提供了优雅支持。
6未来展望
BIGO?在?Pulsar?broker?负载均衡、broker?cache?命中率优化、broker?相关监控、BookKeeper?读写性能优、BookKeeper?磁盘?IO?性能优化、Pulsar?与?Flink、Pulsar?与?Flink?SQL?结合等方面做了大量工作,提升了?Pulsar?的稳定性和吞吐,也降低了?Flink?与?Pulsar?结合的门槛,为?Pulsar?的推广打下了坚实基础。
未来,我们会增加?Pulsar?在?BIGO?的场景应用,帮助社区进一步优化、完善?Pulsar?功能,具体如下:
为?Apache?Pulsar?研发新特性,比如支持?topic?policy?相关特性。
迁移更多任务到?Pulsar。这项工作涉及两方面,一是迁移之前使用?Kafka?的任务到?Pulsar。二是新业务直接接入?Pulsar。
BIGO?准备使用?KoP?来保证数据迁移平滑过渡。因为?BIGO?有大量消费?Kafka?集群的?Flink?任务,我们希望能够直接在?Pulsar?中做一层?KoP,简化迁移流程。
对?Pulsar?及?BookKeeper?持续进行性能优化。由于生产环境中流量较高,BIGO?对系统的可靠性和稳定性要求较高。
持续优化?BookKeeper?的?IO?协议栈。Pulsar?的底层存储本身是?IO?密集型系统,保证底层?IO?高吞吐,才能够提升上层吞吐,保证性能稳定。