上一篇文章《直播大数据采集(一期)》(以下简称《一期》)复盘了之前做过的一个采集直播大数据的项目,其中介绍了项目相关的业务和技术实现始末,也描述了当时碰到的问题以及采用的解决方案,但由于涉及到太多细节导致篇幅过长,且当时项目迭代也是分为两期来做的,所以将整个项目分为两篇文章来描述。本文即承接上文介绍当时由于业务需求和技术迭代所做的二期改造过程。
一期项目中已对初始的业务需求进行了实现,如直播大数据采集、数据分析、平台展示等。当时整个采集系统采集的直播间数量峰值为 10 万个,集群规模为 40 节点,也就是高峰时平均每节点要承担 2500 个直播间的采集和数据清洗任务,而每天流过整个系统的总流量为 400 ~ 600 G,集群采用 Ansible 进行统一部署,使用 Alinode 和 Grafana 进行监控和管理。且后期使用爬虫代理对风控较严格的平台进行采集,使得数据流趋于稳定,基本能满足业务需求。于是开始着手准备二期升级和迭代。
工程上的任何一次迭代都应该基于现实业务或解决某个问题,此次迭代也正由于需求方提出了新的需求以及我们也希望通过这次迭代能用更好的方式解决一期项目中遇到的问题。整理当时接到的新需求以及待解决的问题如下:
- ”原始数据“ 需求:当时部门内一个客户端团队(我当时处于数据团队)在设计客户端时,涉及到在客户端中展示主播在播时的弹幕和礼物,考虑到数据团队已经能采集到完整的直播流数据,因此他们希望能对接采集到的数据,而且最好是原始数据(但当时的架构中最终输出的是经过去平台化清洗的数据)。
- 优化本地数据清洗:一期项目中数据的去平台化清洗任务是在集群节点上进行的,其中涉及到大量的数据转换和计算,这加大了集群节点的负担。
- 关播后延迟关闭采集:在《一期》中已描述过,当一个直播间关播后,集群会自动关闭对该直播间的采集。延迟关闭也就是说当主播关播时,集群会延迟关闭对该直播间的采集。
- 调度服务的单点问题:集群服务都会关注的单点问题!
架构重构
设计
基于以上提出的新需求以及问题,其中涉及到大量与数据相关的内容,这势必要对一期项目的架构进行重新设计。考虑到未来可能会有需求的扩展,且需要对整体系统进行解耦,决定采用应用普遍且适应性强的分层式设计的新架构方案。
如下图,整个系统维持原有采集架构不变,在这里分为数据层和应用层两大块。
数据层:即用于接收和处理所有数据。将原先分布于采集结点中的数据清洗过程独立出来进行重新设计,又将该过程分为三个阶段:原始数据、ETL(数据清洗)、大数据计算。
- 原始数据:从采集集群采集到的数据经过序列化后得到的初始数据,这些数据具有强烈的平台特色,因此这个阶段得到的数据在字段、类型、名称等方面存在很大差异。但某些特殊业务场景下也会需要该数据。
- ETL:上述原始数据由于存在平台差异,因此无法进行统一计算和分析,因此在这一阶段需要对数据进行差异化抹平,即去平台化,以便后续进行统一计算和分析。
- 大数据计算:在该阶段根据特定业务需求对大数据进行分析计算得出结果,并提供给业务使用。
MQ(消息队列):作为沟通原始数据、ETL 和大数据计算三个阶段的桥梁,集群获得原始数据后将数据直接发送至 MQ 中,而 ETL 和大数据计算阶段则可以根据需要对消息队列进行消费,且 ETL 阶段清洗后的数据也会被传至 MQ 中。
数据层负责对大数据的各种处理,一方面每一阶段都可以独立提供对外的响应接口,因此每一阶段得到的数据都能被单独应用,这使得未来业务具有更加强大的可扩展性,另一方面数据分为多个阶段来处理的方式与一期相比,系统达到了解耦的目的,且降低了集群节点的负担,各个阶段之间通过 MQ 交互,将整个过程异步化,本身相互依赖度也更加低,提高了数据处理效率的同时也降低了开发维护成本。
- 应用层:这一层主要是应用数据层提供的各种数据能力,包括但不限于各种业务服务、日志、存储相关部分,各个应用之间相互独立,也可单独部署。单独把数据应用独立出来可以增强整体系统的可扩展性,也极大地降低了基础设施与上层业务之间的耦合度。
技术实现
根据上述架构设计方案,可以整理出各个部分需要做的开发工作。原始数据部分可以继承一期中的采集阶段,将节点采集到数据后所做的数据清洗操作移除,并直接将数据推送至消息队列。消息队列仍然采用阿里云 Loghub(虽然看上去不合理但是在当时 LogHub 属于白菜价而且后续大数据计算操作对接非常顺畅)。
大数据处理部分由于直接使用了阿里云的流计算和实时计算服务并由另一位同学单独负责,且该服务支持直接从 Loghub 中消费数据,处理起来除业务部分并不复杂,当时阿里云的技术支持也比较到位,所以不在此展开详述。
ETL 部分会比较复杂一些,且需要重新开发。梳理一下其中可能会碰到的问题:
- 数据量大:峰值 10 万直播间,每天 400 ~ 600G。
- 业务复杂:涉及到多个平台的不同种类数据,其中数据格式、字段名称含义、类型等都不一样,为最终抹平各平台间的差异需要做去平台化操作,并且市面上也找不到现成的清洗方案。
- 计算量大:一方面数据量本身较大,另一方面业务中会涉及到大量取值、转换、累计、求平均等数值计算操作,原有的 Node.js 方案(原本整体系统都是基于 Node.js 来实现)在这里可能并不适用。
基于上述问题,并在团队讨论后考虑在 ETL 的实现中放弃现有的 Node.js 体系,使用 Go 语言针对该特定场景开发一套数据清洗系统,且在 Go 服务和原有 Node.js 服务之间采用 RPC 通信。使用 Go 语言主要考虑因素如下:
- 性能:Go 语言作为一门编译型语言,虽然不如 C/C++ 那样效率超群,但对比 JavaScript 这类脚本语言还是绰绰有余的。且放弃现有 Node.js 体系的原因也主要是 Node.js 的本身设计因素决定了其并不适合用于大量 CPU 计算场景,与现有业务场景相悖,采用 Go 语言实现也能提高数值计算的效率。
- 学习成本低:作为业务工程化采用的技术方案,必然考虑的一个因素就是可持续维护,Go 语言的学习成本与 C/C++ 相比低太多,当项目人员流失时也能快速找到替补。
- 生态良好:Go 的语言生态持续完善,且市场上已有大量使用 Go 语言实现的优秀开源工具,如 Kubernetes、Docker。
采用新的架构方案并进行了相关优化的实现后,整体业务趋于平稳,架构中各数据层也能独立稳定地对外提供数据服务。将数据清洗操作从原有集群节点中拆分出来后也极大降低了节点负担,优化后的系统整体达到了架构设计预期的效果。
实现新需求
上面提到,这次技术迭代的一个主要起因是业务层面有一些新的需求产生,这些需求一方面是业务本身需要,另一方面也是为了规避数据采集本身所带来的数据缺失或不准确问题。如当一个直播间关播时,采集系统延迟关闭对其的数据采集(正常情况下直播间关播时会立即停止采集)。
这么做的原因有两个:
一个是由于直播间开关播信号是根据从平台拉取的直播间列表计算得出的,但观察发现直播平台给出的列表有时候并不是准确的,这就会导致开关播的误判,从数据上看就会造成数据的不准确,延迟关播(也就是说主播关播时还继续采集一段时间,如 10 分钟,若 10 分钟后仍然没有开关播状态更新则说明真正关播了,则停止采集,否则继续采集)可以尽可能地规避平台列表错误所带来的数据不准确的问题。
另一个原因则是一些当红主播在关播后仍然会有一些观众发送弹幕和礼物,这一部分数据也是有价值的,延迟采集可以获得更多的有效数据。
具体实现方式为,使用 Redis 的 Sorted Set(有序集合)特性。构建一个主播 id 和其最新在线时间关联的数据,并将其放入一个 Sorted Set,以主播最新在线时间为排序依据。主播的最新在线时间会定时更新(正常为 1 分钟),并在在线时间更新时同步更新 Sorted Set 中对应主播的最新在线时间,这样就实现了一个如下图所示的时间轴:
正常情况下,由于集合中的主播最新在线时间是定时更新的,因此正常开播的主播都会集中在上图时间轴中的等待区,假设某主播的最新在线时间同步异常,则导致集合中的时间无法更新,但此时时间依然再往前推移,最终在超过规定的最大等待时间(在这里假设 10 分钟)后,该主播会进入上图时间轴的待关闭区。在这样的设定下,所有主播都会分布在整个时间轴的两个分区内。
此时,调度中心要做的事情就变得更加简单,只需要定时去 Sorted Set 中按照时间区间取出要继续采集的直播间和需要停止采集的直播间并做出相关操作即可。
单点问题
单点问题是分布式服务都会遇到的问题,在本系统中也不例外。采集系统中可能会碰到单点问题的部分有:直播间列表拉取服务、调度中心服务、MySQL 数据库以及 Redis 缓存。直播间列表拉取服务用于定时拉取各个平台直播间列表,从架构上看并不属于集群范畴,所以在此不做介绍。MySQL 数据库和 Redis 缓存也不属于本文的主要讨论范围,因此也不做详细解说。接下来就主要讨论一下调度中心服务的单点问题。
引用一下《一期》中的架构设计图:
从架构设计中可以看出,调度中心主要负责集群控制、任务分配、任务回收的工作。由于调度中心是由一台独立的服务器单独部署的,所以可以分析一下当调度中心挂机后可能会出现的问题。
- 集群状态无法管控:调度中心会与集群中每一个节点进行长连接,并实时获取每个节点的负载。假如调度中心挂机会导致集群失控,节点数、负载等信息无法获知。
- 新任务无法分配:调度中心一方面掌控集群中所有节点的负载等相关信息,另一方面接受来自任务源的采集任务,并按照节点负载将任务均匀分配给每一个节点,若调度中心挂机则会导致新任务无法开启采集。
- 现有任务状态失控:当集群中某节点挂机或某节点上的某任务失败,调度中心会将该节点上的所有任务或某个失败任务进行回收,并在集群中均匀分配给健康的节点。一旦调度中心挂机则会导致任何失败的任务都无法回收再分配。
要解决上述问题,考虑采用主备的方式。
初步方案
本系统中主备方案的设计最大的问题即是如何确保任务的正确分配与回收。调度中心一方面需要从唯一的任务源获取到任务,另一方面还要准确地在集群中每一个节点之间均匀分配和回收所有任务。因此初步的主备方案必然需要考虑到任务的一致性。
初步的设计方案如图:
为了更好地管理所有任务,在调度中心节点和集群中间加一层用于存储所有节点状态和负载并定时更新的的节点池(用 Redis 实现,简单起见假设缓存节点稳定性远高于服务),而在任务源和节点池之间设置两个调度中心,任务源会根据调度中心的状态将任务分配给健康的节点,同时调度中心也会向下将任务分配。
后续
上述解决节点单点问题的初步方案在后期并没有继续进行讨论并实施,系统的单点问题也没有当做主要问题去解决。主要原因有两个:
- 系统真正的稳定性瓶颈在于爬虫风控:每一个系统都有其性能和稳定性的瓶颈,在这里系统的稳定性瓶颈并不在于单点问题或系统某部分的不稳定因素(虽然调度中心甚至数据库都有挂机的可能),而在于采集的部分平台对爬虫有着很严格的封锁机制,导致数据断流或错误,最终严重影响业务。因此当时我自己包括团队也将主要精力用于应对平台的反封锁。
- 尽可能规避不稳定因素:当时也是采取了一些措施来应对系统的不稳定因素(如单点问题)的,如监控。另外也对任务分配和回收机制做了优化,使得即使调度中心挂机了,现有的任务也不会受影响,从而最大程度上以最小的代价规避单点问题所带来的业务差错。
尽管当时业务上并没有深入讨论关于单点问题的解决方案,但对于技术工程上来说这仍然是一个值得思考与讨论的问题。上述初步解决方案也存在很多问题与不合理的地方,如果读者有任何见解,欢迎在下方留言区留言讨论。
问题与讨论
在后来的一些面试或聊天中,有过多次与同行聊起这个项目的经历,其中也碰到过一些比较有意思的问题,现整理问题以及我的回答如下:
用 LogHub 做消息队列?疯了吧?
答:当时考虑了两方面因素,其一是成本,那个时候 LogHub 真的属于白菜价。其二阿里云方面也确实支持将 LogHub 作为消息队列使用,服务也能较容易的对其数据进行消费,且 LogHub 与后续需要对接的流计算和实时计算在阿里云是天生支持的。
为什么不用一台很强大的服务器并在其中使用 Docker 部署采集结点?这样不是更容易管理吗?
答:这个业务场景下其实不难看出最缺失的资源并不是计算资源,而是 IP,为了规避平台的封锁我们也曾费尽心机最终采用爬虫代理实时更新代理 IP 才勉强支持业务。而对于在一台服务器上部署 Docker,这看上去很好,但其实最终也是不利于横向扩展的。
结语
本文承接《一期》,继续探讨了整个采集系统的后期迭代和业务升级。相对于《一期》,本文的叙述更加零散,但我也基本还原了当时场景下切切实实碰到过的技术和业务问题的细节。且这个项目是在 2017年 ~ 2018年年中期间实现并迭代的,里面涉及的很多技术实现方式放在现在的场景下可能已经有了更好的替代方案,所以对于本文涉及到的各种方案与观点,读者如果有任何建议或好的想法,欢迎在下方留言区反馈或展开讨论,非常感谢 :)