第三部分:日志和实时流处理
到此为止,我只是描述从端到端数据复制的理想机制。但是在存储系统中搬运字节不是所要讲述内容的全部。最终我们发现日志是流的另一种说法,日志是流处理的核心。
但是,等等,什么是流处理呢?
如果你是90年代晚期或者21世纪初数据库文化或者数据基础架构产品的爱好者,那么你就可能会把流处理与建创SQL引擎或者创建“箱子和箭头”接口用于事件驱动的处理等联系起来。
如果你关注开源数据库系统的大量出现,你就可能把流处理和一些开源数据库系统关联起来,这些系统包括了:Storm,Akka,S4和Samza。但是大部分人会把这些系统作为异步消息处理系统,这些系统与支持群集的远程过程调用层的应用没什么差别(而事实上在开源数据库系统领域某些方面确实如此)。
这些视图都有一些局限性。流处理与SQL是无关的。它也局限于实时流处理。不存在内在的原因限制你不能处理昨天的或者一个月之前的流数据,且使用多种不同的语言表达计算。
我把流处理视为更广泛的概念:持续数据流处理的基础架构。我认为计算模型可以像MapReduce或者分布式处理架构一样普遍,但是有能力处理低时延的结果。
处理模型的实时驱动是数据收集方法。成批收集的数据是分批处理的。数据是不断收集的,它也是按顺序不断处理的。
美国的统计调查就是成批收集数据的良好典范。统计调查周期性的开展,通过挨门挨户的走访,使用蛮力发现和统计美国的公民信息。1790年统计调查刚刚开始时这种方式是奏效的。那时的数据收集是批处理的,它包括了骑着马悠闲的行进,把信息写在纸上,然后把成批的记录传送到人们统计数据的中心站点。现在,在描述这个统计过程时,人们立即会想到为什么我们不保留出生和死亡的记录,这样就可以产生人口统计信息这些信息或是持续的或者是其它维度的。
这是一个极端的例子,但是大量的数据传送处理仍然依赖于周期性的转储,批量转化和集成。处理大容量转储的唯一方法就是批量的处理。但是随着这些批处理被持续的供给所取代,人们自然而然的开始不间断的处理以平滑的处理所需资源并且消除延迟。
例如LinkedIn几乎没有批量数据收集。大部分的数据或者是活动数据或者是数据库变更,这两者都是不间断发生的。事实上,你可以想到的任何商业,正如:Jack Bauer告诉我们的,低层的机制都是实时发生的不间断的流程事件。数据是成批收集的,它总是会依赖于一些人为的步骤,或者缺少数字化或者是一些自动化的非数字化流程处理的遗留信息。当传送和处理这些数据的机制是邮件或者人工的处理时,这一过程是非常缓慢的。首轮自动化总是保持着最初的处理形式,它常常会持续相当长的时间。
每天运行的批量处理作业常常是模拟了一种一天的窗口大小的不间断计算。当然,低层的数据也经常变化。在LinkedIn,这些是司空见贯的,并且使得它们在Hadoop运转的机制是有技巧的,所以我们实施了一整套管理增量的Hadoop工作流的架构。
由此看来,对于流处理可以有不同的观点。流处理包括了在底层数据处理的时间概念,它不需要数据的静态快照,它可以产生用户可控频率的输出,而不用等待数据集的全部到达。从这个角度上讲,流处理就是广义上的批处理,随着实时数据的流行,会儿更加普遍。
这就是为什么从传统的视角看来流处理是利基应用。我个人认为最大的原因是缺少实时数据收集使得不间断的处理成为了学术性的概念。
我想缺少实时数据收集就像是商用流处理系统注定的命运。他们的客户仍然需要处理面向文件的、每日批量处理ETL和数据集成。公司建设流处理系统关注的是提供附着在实时数据流的处理引擎,但是最终当时极少数人真正使用了实时数据流。事实上,在我在LinkedIn工作的初期,有一家公司试图把一个非常棒的流处理系统销售给我们,但是因为当时我们的全部数据都按小时收集在的文件里,当时我们提出的最好的应用就是在每小时的最后把这些文件输入到流处理系统中。他们注意到这是一个普遍性的问题。这些异常证明了如下规则:流处理系统要满足的重要商业目标之一是:财务, 它是实时数据流已具备的基准,并且流处理已经成为了瓶颈。
甚至于在一个健康的批处理系统中,流处理作为一种基础架构的实际应用能力是相当广泛的。它跨越了实时数据请求——应答服务和离线批量处理之间的鸿沟。现在的互联网公司,大约25%的代码可以划分到这个类型中。
最终这些日志解决了流处理中绝大部分关键的技术问题。在我看来,它所解决的最大的问题是它使得多订阅者可以获得实时数据。对这些技术细节感兴趣的朋友,我们可以用开源的Samza,它是基于这些理念建设的一个流处理系统。这些应用的更多技术细节我们在此文档中有详细的描述。
数据流图
流处理最有趣的角度是它与流处理系统内部无关,但是与之密切相关的是如何扩展了我们谈到的早期数据集成的数据获取的理念。我们主要讨论了基础数据的获取或日志——事件和各类系统执行中产生的数据等。但是流处理允许我们包括了计算其它数据的数据。这些衍生的数据在消费者看来与他们计算的原始数据没什么差别。这些衍生的数据可以按任意的复杂度进行压缩。
让我们再深入一步。我们的目标是:流处理作业可以读取任意的日志并把日志写入到日志或者其它的系统中。他们用于输入输出的日志把这些处理关联到一组处理过程中。事实上,使用这种样式的集中日志,你可以把组织全部的数据抓取、转化和工作流看成是一系列的日志和写入它们的处理过程。
流处理器根本不需要理想的框架:它可能是读写日志的任何处理器或者处理器集合,但是额外的基础设施和辅助可以提供帮助管理处理代码。
日志集成的目标是双重的:
首先,它确保每个数据集都有多个订阅者和有序的。让我们回顾一下状态复制原则来记住顺序的重要性。为了使这个更加具体,设想一下从数据库中更新数据流–如果在处理过程中我们把对同一记录的两次更新重新排序,可能会产生错误的输出。 TCP之类的链接仅仅局限于单一的点对点链接,这一顺序的持久性要优于TCP之类的链接,它可以在流程处理失败和重连时仍然存在。
第二,日志提供了流程的缓冲。这是非常基础的。如果处理流程是非同步的,那么上行生成流数据的作业比下行消费流数据的作业运行的更快。这将会导致处理流程阻塞,或者缓冲数据,或者丢弃数据。丢弃数据并不是可行的方法,阻塞将会导致整个流程图立即停止。 日志实际上是一个非常大的缓冲,它允许流程重启或者停止但不会影响流程图其它部分的处理速度。如果要把数据流扩展到更大规模的组织,如果处理作业是由多个不同的团队提供的,这种隔离性是极其重的。我们不能容忍一个错误的作业引发后台的压力,这种压力会使得整个处理流程停止。
Storm和Sama这两者都是按非同步方式设计的,可以使用Kafka或者其它类似的系统作为它们的日志。
有状态的实时流处理
一些实时流处理在转化时是无状态的记录。在流处理中大部分的应用会是相当复杂的统计、聚合、不同窗口之间的关联。例如有时人们想扩大包含用户操作信息的事件流(一系列的单击动作)–实际上关联了用户的单击动作流与用户的账户信息数据库。不变的是这类流程最终会需要由处理器维护的一些状态信息。例如数据统计时,你需要统计到目前为止需要维护的计数器。如果处理器本身失败了,如何正确的维护这些状态信息呢?
最简单的替换方案是把这些状态信息保存在内存中。但是如果流程崩溃,它就会丢失中间状态。如果状态是按窗口维护的,流程就会回退到日志中窗口开始的时间点上。但是,如果统计是按小时进行的,那么这种方式就会变得不可行。
另一个替换方案是简单的存储所有的状态信息到远程的存储系统,通过网络与这些存储关联起来。这种机制的问题是没有本地数据和大量的网络间通信。
我们如何支持处理过程可以像表一样分区的数据呢?
回顾一下关于表和日志二相性的讨论。这一机制提供了工具把数据流转化为与处理过程协同定位的表,同时也提供了这些表的容错处理的机制。
流处理器可以把它的状态保存在本地的表或索引——bdb,或者leveldb,甚至于类似于Lucene 或fastbit一样不常见的索引。这些内容存储在它的输入流中(或许是使用任意的转化)。生成的变更日志记录了本地的索引,它允许存储事件崩溃、重启等的状态信息。流处理提供了通用的机制用于在本地输入流数据的随机索引中保存共同分片的状态。
当流程运行失败时,它会从变更日志中恢复它的索引。每次备份时,日志把本地状态转化成一系列的增量记录。
这种状态管理的方法有一个优势是把处理器的状态也做为日志进行维护。我们可以把这些日志看成与数据库表相对应的变更日志。事实上,这些处理器同时维护着像共同分片表一样的表。因为这些状态它本身就是日志,其它的处理器可以订阅它。如果流程处理的目标是更新结点的最后状态,这种状态又是流程的输出,那么这种方法就显得尤为重要。
为了数据集成,与来自数据库的日志关联,日志和数据库表的二象性就更加清晰了。变更日志可以从数据库中抽取出来,日志可以由不同的流处理器(流处理器用于关联不同的事件流)按不同的方式进行索引。
我们可以列举在Samza中有状态流处理管理的更多细节和大量实用的例子。
日志压缩
当然,我们不能奢望保存全部变更的完整日志。除非想要使用无限空间,日志不可能完全清除。为了澄清它,我们再来聊聊Kafka的实现。在Kafka中,清理有两种选择,这取决于数据是否包括关键更新和事件数据。对于事件数据,Kafka支持仅维护一个窗口的数据。通常,配置需要一些时间,窗口可以按时间或空间定义。虽然对于关键数据而言,完整日志的重要特征是你可以重现源系统的状态信息,或者在其它的系统重现。
随着时间的推移,保持完整的日志会使用越来越多的空间,重现所耗费的时间越来越长。因些在Kafka中,我们支持不同类型的保留。我们移除了废弃的记录(这些记录的主键最近更新过)而不是简单的丢弃旧日志。我们仍然保证日志包含了源系统的完整备份,但是现在我们不再重现原系统的全部状态,而是仅仅重现最近的状态。我们把这一特征称为日志压缩。
第四部分:系统建设
我们最后要讨论的是在线数据系统设计中日志的角色。
在分布式数据库数据流中日志的角色和在大型组织机构数据完整中日志的角色是相似的。在这两个应用场景中,日志是对于数据源是可靠的,一致的和可恢复的。组织如果不是一个复杂的分布式数据系统呢,它究竟是什么?
分类计价吗?
如果换个角度,你可以看到把整个组织系统和数据流看做是单一的分布式数据系统。你可以把所有的子查询系统(诸如Redis,SOLR,Hive表等)看成是数据的特定索引。你可以把Storm或Samza一样的流处理系统看成是发展良好的触发器和视图具体化机制。我已经注意到,传统的数据库管理人员非常喜欢这样的视图,因为它最终解释了这些不同的数据系统到底是做什么用的–它们只是不同的索引类型而已。
不可否认这类数据库系统现在大量的出现,但是事实上,这种复杂性一直都存在。即使是在关系数据库系统的鼎盛时期,组织中有大量的关系数据库系统。或许自大型机时代开始,所有的数据都存储在相同的位置,真正的集成是根本不存在的。存在多种外在需求,需要把数据分解成多个系统,这些外在需求包括:规模、地理因素、安全性,性能隔离是最常见的因素。这些需求都可以由一个优质的系统实现:例如,组织可以使用单一的Hadoop聚簇,它包括了全部的数据,可以服务于大型的和多样性的客户。
因此在向分布式系统变迁的过程中,已经存在一种处理数据的简便的方法:把大量的不同系统的小的实例聚合成为大的聚簇。许多的系统还不足以支持这一方法:因为它们不够安全,或者性能隔离性得不到保证,或者规模不符合要求。不过这些问题都是可以解决的。
依我之见,不同系统大量出现的原因是建设分布式数据库系统很困难。通过削减到单一的查询或者用例,每个系统都可以把规模控制到易于实现的程度。但是运行这些系统产生的复杂度依然很高。
未来这类问题可能的发展趋势有三种:
第一种可能是保持现状:孤立的系统还会或长或短的持续一段时间。这是因为建设分布式系统的困难很难克服,或者因为孤立系统的独特性和便捷性很难达到。基于这些原因,数据集成的核心问题仍然是如何恰当的使用数据。因此,集成数据的外部日志非常的重要。
第二种可能是重构:具备通用性的单一的系统逐步融合多个功能形成超极系统。这个超级系统表面看起来类似关系数据库系统,但是在组织中你使用时最大的不同是你只需要一个大的系统而不是无数个小系统。在这个世界里,除了在系统内已解决的这个问题不存在什么真正的数据集成问题。我想这是因为建设这样的系统的实际困难。
虽然另一种可能的结果对于工程师来说是很有吸引力的。新一代数据库系统的特征之一是它们是完全开源的。开源提供了一种可能性:数据基础架构不必打包成服务集或者面向应用的系统接口。在Java栈中,你可以看到在一定程度上,这种状况已经发生了。
Zookeeper用于处理多个系统之间的协调,或许会从诸如Helix 或者Curator等高级别的抽象中得到一些帮助。
Mesos和YARN用于处理流程可视化和资源管理。
Lucene和LevelDB等嵌入式类库做为索引。
Netty,Jetty和Finagle,rest.li等封装成高级别的用于处理远程通信。
Avro,Protocol Buffers,Thrift和umpteen zillion等其它类库用于处理序列化。
Kafka和Bookeeper提供支持日志。
如果你把这些堆放在一起,换个角度看,它有点像是简化版的分布式数据库系统工程。你可以把这些拼装在一起,创建大量的可能的系统。显而易见,现在探讨的不是最终用户所关心的API或者如何实现,而是在不断多样化和模块化的过程中如何设计实现单一系统的途径。因为随着可靠的、灵活的模块的出现,实施分布式系统的时间周期由年缩减为周,聚合形成大型整体系统的压力逐步消失。
日志文件在系统结构中的地位
那些提供外部日志的系统如今已允许个人电脑抛弃他们自身复杂的日志系统转而使用共享日志。在我看来,日志可以做到以下事情:
通过对节点的并发更新的排序处理数据的一致性(无论在及时还是最终情况下)
提供节点之间的数据复制
提供”commit“语法(只有当写入器确保数据不会丢失时才会写入)
位系统提供外部的数据订阅资源
提供存储失败的复制操作和引导新的复制操作的能力
处理节点间的数据平衡
这实际上是一个数据分发系统最重要的部分,剩下的大部分内容与终端调用的API和索引策略相关。这正是不同系统间的差异所在,例如:一个全文本查询语句需要查询所有的分区,而一个主键查询只需要查询负责键数据的单个节点就可以了。
下面我们来看下该系统是如何工作的。系统被分为两个逻辑区域:日志和服务层。日志按顺序捕获状态变化,服务节点存储索引提供查询服务需要的所有信息(键—值的存储可能以B-tree或SSTable的方式进行,而搜索系统可能存在与之相反的索引)。写入器可以直接访问日志,尽管需要通过服务层代理。在写入日志的时候会产生逻辑时间戳(即log中的索引),如果系统是分段式的,那么就会产生与段数目相同数量的日志文件和服务节点,这里的数量和机器数量可能会有较大差距。
服务节点订阅日志信息并将写入器按照日志存储的顺序尽快应用到它的本地索引上。
客户端只要在查询语句中提供对应的写入器的时间戳,它就可以从任何节点中获取”读写“语义。服务节点收到该查询语句后会将其中的时间戳与自身的索引比较,如果必要,服务节点会延迟请求直到对应时间的索引建立完毕,以免提供旧数据。
服务节点或许根本无需知道”控制“或”投标选择(leader election)“的概念,对很多简单的操作,服务节点可以爱完全脱离领导的情况下提供服务,日志即是信息的来源。
分发系统所需要做的其中一个比较复杂的工作,就是修复失败节点并移除几点之间的隔离。保留修复的数据并结合上各区域内的数据快照是一种较为典型的做法,它与保留完整的数据备份并从垃圾箱内回收日志的做法几乎等价。这就使得服务层简单了很多,日志系统也更有针对性。
有了这个日志系统,你可以订阅到API,这个API提供了把ETL提供给其它系统的数据内容。事实上,许多系统都可以共享相同的日志同时提供不同的索引,如下所示:
这样一个以日志为中心的系统是如何做到既数据流的提供者又同时加载其它系统的数据的呢?因为流处理器既可以消费多个输入的数据流,随后又可以通过其它系统对数据做索引为它们提供服务。
这个系统的视图可以清晰的分解到日志和查询API,因为它允许你从系统的可用性和一致性角度分解查询的特征。这可以帮助我们对系统进行分解,并理解那些并没按这种方式设计实施的系统。
虽然Kafka和Bookeeper都是一致性日志,但这不是必须的,也没什么意义。你可以轻松的把Dynamo之类的数据构分解为一致性的AP日志和键值对服务层。这样的日志使用起来灵活,因为它重传了旧消息,像Dynamo一样,这样的处理取决于消息的订阅者。
在很多人看来,在日志中另外保存一份数据的完整复本是一种浪费。事实上,虽然有很多因素使得这件事并不困难。首先,日志可以是一种有效的存储机制。我们在Kafka生产环境的服务器上存储了5 TB的数据。同时有许多的服务系统需要更多的内存来提供有效的数据服务,例如文本搜索,它通常是在内存中的。服务系统同样也需样硬盘的优化。例如,我们的实时数据系统或者在内存外提供服务或者使用固态硬盘。相反,日志系统只需要线性的读写,因此,它很乐于使用TB量级的硬盘。最终,如上图所示,由多个系统提供的数据,日志的成本分摊到多个索引上,这种聚合使得外部日志的成本降到了最低点。
LinkedIn就是使用了这种方式实现它的多个实时查询系统的。这些系统提供了一个数据库(使用数据总线做为日志摘要,或者从Kafka去掉专用的日志),这些系统在顶层数据流上还提供了特殊的分片、索引和查询功能。这也是我们实施搜索、社交网络和OLAP查询系统的方式。事实上这种方式是相当普遍的:为多个用于实时服务的服务系统提供单一的数据(这些来自Hadoop的数据或是实时的或是衍生的)。这种方式已被证实是相当简洁的。这些系统根本不需要外部可写入的API,Kafka和数据库被用做系统的记录和变更流,通过日志你可以查询系统。持有特定分片的结点在本地完成写操作。这些结点盲目的把日志提供的数据转录到它们自己的存储空间中。通过回放上行流日志可以恢复转录失败的结点。
这些系统的程度则取决于日志的多样性。一个完全可靠的系统可以用日志来对数据分片、存储结点、均衡负载,以及用于数据一致性和数据复制等多方面。在这一过程中,服务层实际上只不过是一种缓存机制,这种缓存机制允许直接写入日志的流处理。