技术开发 频道

用Nova实现Hadoop海量数据工作流管理

  【IT168专稿】今天的大型互联网公司往往都需要处理持续大量涌来的数据,对数据加以分析挖掘,从中产生价值。这些动作包括:

  1、分析处理用户行为日志,包括点击、搜索,以为搜索、内容和广告提供精细化的匹配和排序算法。2、从爬虫产生的web页面流中构建、更新搜索索引。3、处理半结构化数据,比如新闻、博客与微博。


全球系统架构师大会现场报道

  这些处理过程正不断在新一代的灵活而且可扩展的数据管理平台上实现,比如PIG和Hadoop。PIG是供Hadoop的map-reduce系统使用的结构化数据流语言运行时。Nova,是一个数据和工作和流管理系统,它持续不断地把到来的数据推送到在Hadoop集群的PIG程序图中。

什么叫持续的数据流
▲Yahoo中国研究院高级技术经理韩轶平

  在支持有状态增量处理方面,Nova类似于数据流处理系统,但不同之处在于,它使用类似于基于磁盘处理的方式来处理海量数据。批次式增量处理对于互联网公司很适合,因为可以应对不断道来的数据,并受益于增量算法,同时不需要满足有超低延时性需求的处理。

  以下是精彩演讲内容:

  今天的话题是介绍一下这两年建立的Nova系统,是基于Hadoop海量数据工作流平台。这个工作在2001把其中增量计算的一部分成果在杂志上有发表。

  什么叫持续的数据流

  它是以连续或者间隔的方式,通过推送或采集的形式到达的数据。里面有两个纬度:可以连续、源源不断的,可以批量的方式推断。获取可以推送也可以采取的方式。系统日志是一直在增长中,常见的有搜索日志等。内容有新闻、上传图片、文字信息、视频信息,很多以持续信息流出现的,不断被采集到数据分析引擎里面,其他E-mail、短信、微博也是持续的数据流,在不停地产生中,如果进行大型的分析处理的话,要持续分析处理。

  今天的互联网当中,如果说是数据驱动的互联网,持续的数据流占绝大部分的比例,或者最重要的一些数据。

  持续数据流有那些场景?

  搜索内容广告精细匹配和排序。无论是网页搜索还是传统型的广告,包括内容推荐都需要对持续的用户行为的日志进行分析,甚至对其他的对用户社交的更新进行分析。

  用户行为分析和建模,我负责雅虎个性化搜索平台,通过用户在互联网上,包括雅虎和雅虎以外各种用户行为分析,对用户行为进行建模,进行个性化的操作。

  媒体内容处理分析,刚才提到对文章的过滤、对文章自然语言的分析,对视频的分析,对视频过滤导波视频检测。

  社交网络分析,对社交网络相关关联性的关系分析。基本涵盖了绝大多数的热点行业。

  当前的现状是什么样的?

  在开发Nova系统之前我们面临的现状,雅虎在互联网行业、大数据处理上是比较领先的公司,很早用一些平台进行大数据分析。我们当时看到了一些现象,数据处理和管理的逻辑十分混杂。如果做一些离线、在线的数据处理,我们有一个数据分析的任务,写一个程序、写一个脚本,在这个脚本里面有几部分工作组成:

  第一、对数据进行解析、整合、求交,很多是数据管理的事情。比如说连续数据流程,要选择合适的时间段,对于数据碎片进行配片、求交。还有数据管理的逻辑,这两个东西往往在开发当中混在一起,往往有管理数据代码又有分析代码。我一直推导的一个理念是希望把这两者分开,让我们的建模人员或者研究人员专心一意做算法,极大简化开发的过程,任务管理能够变得更清晰。

  第二,每个应用实现都是一个特设的个例。每一个组、每一个产品、每个部门都在用自己开发的程序。比如说不同组用不同的数据结构存储数据,不同的组同一个数据员搜索日志。升级也不一样,互相之间不太好协调。每当有一个新的任务出现时,要重新思考、重新设计,是非常低效的过程。在这种应用下,大量的存储都是无效的。我们搜索日志,大家做自己的分析,很多东西都会重复,甚至很多情况大家会克隆一份数据,或者自己保存一份数据,大量的计算和存储都被重复的浪费掉,对应的一个问题,因为很离散的环境,全球的优化很难实现。我们开发技术的目的,是云架构,提供云平台,希望能够做一些优化,使整个计算、存储更有效率。这种全球化的要求很难实现。

  我们希望设计一个系统,主要提供三个大的功能区域:

  第一,工作流管理。也就是说,能够把我们离线数据处理的时候,所有各种各样的工作流、各种任务管理起来,有一个整合的管理平台。

  第二,希望通过数据流的管理。我们设计这个系统的主要目的是针对互联网最常见的持续数据流,不仅把工作流管理起来,也能够把数据流用统一的界面、统一的平台管理起来。真正做到任务、计算数据在一块。

  第三,增量计算。我们做日志分析,总是增量出现,很难用全量的方式计算。增量计算在很多时候确实能够带来效率的提升。如果需要重复的计算,每次把数据计算一遍肯定会浪费。我们能够应用简化增量过程。

  我列了一些我们当时设计的时候设计目标:

  第一,我们希望设计的是一个针对持续批量数据流的处理力。持续数据流可以分为两大类型:连续的数据流、批量处理的数据流。我们设计这个系统的目标针对离线的批量数据流。

  第二,统一的数据流和工作流管理。不仅仅像常见的管理一样,希望统一起来管理。

  第三,数据生命周期管理。数据都是有生命周期的,一直觉得没有必要永久性的数据,当没有人使用、历史过久以后,希望给退休掉。这样生命周期管理,在Nova系统之前,都是应用程序给管理的,把生命周期的管理和运算逻辑、数据处理逻辑混在一起。我们希望都退休,所以不需要这些东西。

  第四,平台负责优化。这个概念跟昨天早上淘宝的讲得,有比较大的差异。在中国公司希望能够通过人力取代一些系统上的投入。对我们来说,作为人力成本相对比较高的公司,而且应用规模很大,希望通过平台优化,减少人力成本的开销,简化人。因为像研究人员都很高成本的,希望把精力放在核心代码上,而不是在一些处理逻辑方面。

  第五,跨应用的存储和计算优化,希望通过提供一个统一的平台,提供一种跨应用或者跨任务的统一计算和存储的优化。

${PageNumber}

  进入数据流的部分

  给一个很简单的抽象定义,批量优化数据流模型的定义。可以把批量化数据流模型定义,由数据块的序列组成生命流的周期。块有两种型,蓝颜色是基段,代表着数据出现时刻整个数据流的全体镜像。绿颜色是增量段,表示这个数据块出现的时刻和上一个数据块出现的时刻之间的差异部分。所以一个数据流是由一个系列的数据块组成,数据流是有不可逆的方向性的。数据块随着时间的推移向着一个方向增加的。

  在传统的这种数据模型下,传统的增量计算是怎么实现的呢?实现的是每个应用程序实现的,段与段是什么结果?每个增量块是什么样的增量,这些东西应用程序都知道。增量管理逻辑很复杂,如果数据流是一个比较规律的数据流,能够很稳定以5分钟的间隔出现,相对容易的,这个数据流是有规律的数据流。比如说我们做电子商务网站,增量管理逻辑很复杂,每个应用管理逻辑做这个事情会变得相当复杂。

  消费者数据段聚合算法,他们需要做数据块的整合算法,比如说出现了五个增量块,有能力变成一个增量块。这要对数据模式有一些了解。问题是如果有一个数据流,100个用户都要有知识。

  刚才提到增量数据的聚合和计算逻辑混合在一块。因为程序里面既要对数据进行整合,对数据进行转换,拼接在一块,还要做计算逻辑。在出现这个系统之前,每个应用程序都要做的事情。

  在Nova当中数据流怎么实现的呢?

  首先是数据通道的程序。一个数据流可以认为是数据通道,可以通过变量。数据流的数据段不可改变,这是很天然的东西,日志来了是增量的形式过来,没有理由改变。在传统意义上不会改变,一般会增添。

  支部并发读,很平常。内部支持并发写,因为特殊原因需要支持。

  确定性执行。这点非常重要,支持并发写,确定性执行,不知道在座的有没有提出异议。我们并发写之前都有一个申请的过程,这个过程是原始操作。通过这个保障并发写也是顺序化的过程,执行过程必须是确定性。我们提供一个数据溯源的功能。这个功能依赖于系统是确定的系统。数据存储是HDFS文件形式存储,但这些细节对用户程序来说都封装掉了,用户程序看到的变量代表着数据的通到。每个数据流支持数据的演化。允许在每个数据段上附加一个数据模式,可以不停的演化。

  系统维护元数据。我们希望元数据系统统一管理,对于应用程序来说,只有一个抽象的数据通道变量,一个很简单的接口。

  当有了数据通道以后,需要定义一些新的东西,生产者需要定义模式,数据会有几个基段,产生数据源或者数据源的收集者,有三个函数,凝聚函数有差异函数,能够生成出差值。合并函数,有机块、增量块可以合并。链接,可以合成增量块。有一个很重要的假设,这些函数都是由数据的产生者提供的,造成一个好处,数据的消费者不管有多少,不需要管这些东西,是帮你调用这些函数,一次性定义。极大的简化了应用开发人员的开发成本。

  现在看到的是一些凝聚的实例,大一点的表是机块表,左下是增量表,进行了覆盖,这是一种例子。这是合并类型的凝聚。

  增量表里面多一个,标识一下改变类型。加号表示增加、感叹号表示修改等,有很多其他形式表述。上面的例子增量部分是累加的过程。最主要我们做用户的累积消费量,每一个过来加上一个数值。

  增量块和全量块之间有很多种不同的组合模式,需要有数据的生产者来提供这个组合的模式。对这个组合模式做了一些研究,具体细节没有,最后给出的系统定义大概是这样的情况:

  凝聚函数各种组合的模式可以在每个数据通道上定义,也可以在每个数据段上定义。在数据段上定义的话,可以用来支持Segment的演进。凝聚函数需要满足一些基本的定律,不展开讲了。刚才说的三种基本的凝聚函数必须提供,必须要满足基本的规律。增量块先组合再跟机块组合,再组合增量块,必须是等价的,看上去很复杂,但实际上生产当中用到的大部分计算都能够满足这些条件。比如说Bag Union,合到一块做一个并级就行了。Upsert机块有出现一横就插入进去,没出现就更新掉。很多基本程序是系统提供的,非常简单,作为数据生产者标定一下用什么样的来实现就可以了。

${PageNumber}

  数据模式的演化

  系统数据的核心思想,系统管理数据的增量,生产者告诉系统该怎么做这些事情,消费者对这些事情不需要认识。在对数据模式演化的时候,数据模式实际上在每一个数据块或者数据段上的。当数据模式发生演化的时候,相邻的数据之间,生产者必须提供一个迁移函数,迁移函数很简单,输入就是老的Segment数据记录。

  系统还提供了逻辑回收与压缩的功能。这两个功能分开讲,垃圾回收,系统中没有明确已知的用户和需求,需求会使用到数据,这个数据就认为可以回收了。我看比较抽象的例子,系统实现垃圾回收,很简单。假设有两个任务,注册两个指针在数据流上,一个任务已经把数据处理到了2到3的块上,另一个在6到7的块上。当任务1的指针往前移了一格的时候,对应的数据流最前面的数据段可以被回收了。

  我们在系统中除了任务指针还有一些其他的指针,用户可以指定,保障数据一年内不会被回收。

  压缩部分,假设数据是一个连续数据流,总是增量的部分出现时,但是我的应用程序处理时,也是增量处理程序。增量处理程序可以处理前面连续五个数据块。我要处理五个数据块,生产当中程序开始执行的时候,把五个数据块求交成一个数据块。在程序没有执行之前,可以把五个数据块事先合并成一个。现在数据比较粗糙,在后半夜做这个事情,事先把数据块执行完。在这个时候有一个预压缩的过程,刚才提到我们内部支持并发写的原因。

  压缩策略器决定了这个地方可以做压缩操作的时候,会事先申请一个新的聚并。这个时候可以看到数据流分杈的,跟任务2分杈的,任务2执行6到7以后,会把6到7给删除掉,数据块的分杈就消失了。这步操作完全由系统管理的,系统对整个执行情况进行分析,在低谷的时候做这件事情。

  工作流的部分

  工作流的部分跟数据流相比我们做的工作并不是特别多,因为工作流的模型在业界比较成熟,有很多比较成熟的工作引擎。这里只做很简单的介绍。

  我们如果把工作流抽象的话,可以认为是什么东西,工作流是一个个函数,施加在数据通道上的函数,把数据通道的状态变成新的状态。所以我们整个工作流的模型就是一个环图,组成总是由数据通道、任务、工作和连接数据通道任务工作的单向边组成,这是简单的抽象。

  如果按刚才那么看的话,可以认为任务和任务之间是一个生产者和消费者的关系。一个任务消费一个通道,生产另一个通道。生产过程中对任务定三种模型,全量模型、NEW模型、OLD模型。NEW模型是可以消费上一次执行和现在的差值。OLD的模式比较特殊,可以取出上次执行地方消费的数值。只要做一个简单的指定,给出两个编量名就可以了。

  现在看到的是一个搜索引擎里面小的工作流,是对新闻去重,首先把RSS feed取出来放在一个工作流,有两个任务,发现新闻中类的模板,通过发现的模板对文章进行分类。把模板发现了以后,做过搜索。我们把模板去掉,看重复度,看是否是重复的文章。

  我们不用过所有的稍微举几个例子。首先,左边紫色的任务是输入全量的任务,发现模板,对现在新闻进行剧烈的操作,每次都把所有的文章取进来做剧烈的操作。

  右边第一个操作消费模型是NEW的模型,每次只处理新产生的文章,输出是一个增量类型,每次都是打完标签新的文章,输出永远是所有文章上面的一个模板的集合。

  对工作流里面每个任务都需要制定一下消费模型是什么、产生模型是什么。这个例子涵盖了我们实际上会有的各种数据处理的基本模型。可以看到这些例子里面涵盖了以下处理模型:

  第一,非增量模式。是一个全量的计算。

  第二,无状态增量模型。只需要拿一篇文章过来算出结果,不依赖任何其他数据。

  第三,边表的有状态增量,给每篇文章标注哪个网站、模板。在做的时候不断地查询模板表,这个表规模不是很大,发新闻网站也就是若干万的级别,是比较小的表,这张表直接弄到内存里面,用这种方法实现的,比较特殊。

  第四,去重,有状态增量。每次都与之前的进行比较。已经有页面了,看我这篇文章是不是和那篇有重复。

${PageNumber}

  为什么要提处理模型呢?

  这些处理模型在系统中都做了优化。这些都是针对这个模型设计出来的。

  下面给大家看一个很小的例子,为什么要做增量?增量是我们整个系统设计的时候重要的方向。一方面有些运算很天然适合增量在做。比如说日志处理。另一方面,如果把非增量的也增量了,可以节省资源。PPT左边上面是非增量求交。如果把增量化变得比较简单,下一次计算的结果,上一次的A和新的B,新的A增量部分和B的以前没有增量,三个部分来做。增量化运算能够把这一部分省下来,到底能省多少?做了一些实验,右边是很基本的结果,全量部分是1000万条记录。横轴是增量部分的大小,竖轴是测试的计算时间,当增量部分很小的时候,只有一千到一万个记录的时候,节约时间很多的,能够省下原来计算的不到1/3。增量部分是全量部分10%左右,仍然能够省1/3的时间。当然,增量部分比全量差不多大小的时候没有做的意义了。实际上在系统中做优化的时候,会根据增量和全量大小比例做一定的决策,是否选择这种增量计算的模式。

  我们工作流的定义跟传统的工作流没有什么区别

  工作流本身是比较抽象的东西。我们做两个任务,计算文件当中单词出现频率、统计每个单词中首字母出现的频率。很简单的事情。开发人员做开发的时候,首先工程师会生成这个工作流,这个工作流是一个抽象工作流,没有具体跟数据进行绑定。开发的时候很简单,只要定义输入数据流、输出数据流,简单的定义就可以了。当用户写这个脚本或者写处理程序的时候,实际上写完以后只是一个简单的形式定义,并没有具体的数据流进行绑定起来。

  整个数据流定义完后,用户制定输入端口,整个组成一个数据流。我们工作流绑定化,具体把输入接到数据流上面,会对数据流进行处理,变成一个绑定的数据流,绑定的数据流才是真正我们会加载到系统上执行的工作流。绑定完后,我们把它绑定到数据流上,由一系列的数据段组成,最后的实例化的过程是运行。我稍微放了一些代码,上面的代码是脚本里面会写的代码,常见的漏数据语句不存在了,把数据流当成变量出来就可以使用了。把系统实例化。系统在绑定的数据流上,帮你生成选择相应的增量块和聚合块。我们使用的是求交的聚合函数,处理在两个增量块上,这些代码是系统帮你生成的,而且运行时才是实例化的。

  PPT上半部是数据处理的脚本,没有任何数据流管理,或者数据生命周期的任何语句,所要做的是集中在任何处理上。任务流可以在不同的任务实例上出发,跟大多的工作流管理没有太大区别。

  我们支持以下的触发器:

  数据事件:新数据段生成数据段回收。

  时间事件:定时器、间隔定时器。间隔定时器是每隔多久是多少。还有每次任务开始到结束是多少之间,可以设定。

  行为事件:任务完成、任务失败。这些很常见。

  用户事件触发器,包含了很多,比如说用户希望把数据保留一年以上的时间,用户触发器可以做定义。通过统一的事件触发界面来做统一的管理。

  数据朔源功能。大家做大数据处理的时候,经常有客户提出数据怎么算出来的,什么时候算出来的?针对这个在整个系统提供的时候,提供了朔源能力。朔源能力能够帮助用户查找数据在生命中被谁算出来,什么时间点算出来,用了哪些程序、哪些模型,用了哪些输入的变量。提供两个层面的朔源,一个是数据段的数据朔源,还有数据段被哪些程序处理过,都会通过界面查询。

  应用层提供记录的数据朔源。在我们平台上还提供应用级的封装,我们对每条记录进行数据朔源。比如说我们处理一篇文章,这篇文章哪个产品在哪个时间点进行处理,这些数据都可以查询。这对长期数据平台的运营起到十分重要的作用。

  到底Nova跟传统的数据引擎相比区别在什么地方?两者都提供了工作流的管理,但是很多东西Nova提供,传统的工作引擎不提供:

  数据流管理,我们对数据流的管理进行抽象。

  数据生命周期的管理,数据的产生、消亡、重组织。

  增量运算支持,最大一部分的支持。

  数据朔源的能力,我们提供针对应用的功能。

  系统层次:物理层在数据上用了Zebra和HDFS,调度使用了Oozie,计算部分用了Mapreduce和Pig。抽象层面,数据用数据通道,调度用任务流,计算在任务。应用层对所有的数据流调动提供了Pig loader,应用层提供了CCM的模型,这个模型里面提供了数据朔源这些方面的能力,能够在这个模型里面保留数据所有被修改过的历史记录和记录之间的相互关系。

${PageNumber}

  系统实现的基本架构

  有几个比较大的块,我们加了比较大的部分像Hard Delete,本身是作为任务流实现的,像Hard Delete,我们是做删除操作的。Compaction也是任务流的。(英文)是一个决策器。( 英文)是增量运算的管理,对数据的物质化、实例化都是在这里进行的。(英文)起的作用是把数据流抽象的概念翻译成实际的(英文)表。(英文)的作用是把管理系统中的元数据。元数据最开始用MySQL实现的,后来是(英文),起的作用是突发事件的搜集工作。

  未来扩展空间:

  第一,Pig语句级跨应用优化,做这件事情很大的梦想,Pig是一个脚本语言,Pig脚本首先通过逻辑层优化、物理层优化下来。我们的想法是既然能够把今天所有的数据处理应用放平台上,希望把所有应用利民的Pig脚本全部展开,最后变化大的执行树,在执行树上做优化。

  任何一个时刻在系统当中的应用做成最大的脚本来进行优化。作为一个实验,不是一个容易的事情,但是是今后的发展方向,最终的优化不是以应用程序的逻辑决定的。系统的优化是站在系统执行的原始操作上,这是最核心的思想。

  换句话说,系统级的优化不依赖于应用程序告诉我们该怎么做。

  第二,程序自动增量化。增量化在很多情况下可以节省一些资源,提高运算效率。但是很多程序本身写的时候是非增量的实现。有没有办法自动增量化,把非增量的脚本变成增量的脚本。我也做了一些研究,在雅虎搜索引擎里面使用一些成果,但是工作有很长的路要走。

  第三,更智能的垃圾回收和压缩算法。现在来看是比较简单的,基本上需要通过用户告诉我们这个数据什么时候需要,什么时候不需要。希望有方法判断这个数据到底有没有需要被压缩。最后一个是基于负载的工作流调度。刚才提到最简单的垃圾回收和压缩,今天利用很简单的定时器的方法,在非高峰时段进行。当预测负载比较低的时候,预压缩和预加载功能实现做一点。

${PageNumber}

  技术问答环节

  【提问】Nova开源吗?

  【韩轶平】暂时没有开源计划,我很想开源,实际上开源了一部分。(英文)0.6到0.2之间大量的放。今天看到的求交优化都是在里面实现的。

  【提问】我对数据朔源很感兴趣,基本原理是什么?

  【韩轶平】我觉得在最开始设计整个系统的时候,首先脑子里面有想法要做数据朔源,无论从低层结构到上层结构,都预留空间,能够记录数据的过程。当你有这个信息很复杂,不仅仅是数据朔源,在美国版权管理严格,我们生产系统是处理雅虎的媒体信息,比如说新闻,每篇新闻稿有时效性。我们不仅做数据流的分析,还想做元数据的分析。比如说处理最后用到这篇文章、那篇文章,大家有不同的版权需求,版权也能进行计算,能算出来最后结果到底能够满足什么样的需求。如果有比较好的成果大家可能在(英文)上能看到。

  【提问】对于平台来说,处理的数据类型是多种多样的,比如说处理文档、网页、图片,这个平台是不是有抽象的数据模型,能够把不同的数据源的格式来转换成统一的数据模型进行处理?

  【韩轶平】是的。刚才提到在系统层、应用层提到了系统模型。(CCN英文)模型,就是抽象表述各种内容的模型。我们设计思想是生产者决定一切,生产者做所有的脏活,把所有操作都定义好了,每个数据元把相应的转换都做完。

  【提问】有没有一些资料可以看到?

  【韩轶平】没有开源。可能以后有机会跟大家分享一下。

  【提问】关于垃圾回收和压缩的时候,你刚才说用一个(英文)来做。这个(英文)什么时候清零,什么时候知道应该被回收掉的?

  【韩轶平】有几类,一类的是工作流的(英文),数据流的定义是有像的东西,工作流在数据流执行也是有像的东西。每次在一个数据块上减一,下一个数据块加一。像朔源,朔源的指针是时间出发的指针,这个指针随着时间会往前移动。用户指定,可以指定时间和固定的。当事件触发了指针会移动。

  【提问】您好!刚才您说了HDFS支持并发写,HDFS本身不支持并发写的。

  【韩轶平】我说的不可以并发写,内部并发写是数据流这个概念支持并发写,并发写是在做压缩的时候有分杈的操作,一个时间段不同的重组。

  【提问】数据回收了能不能还原呢?

  【韩轶平】回收是HDFS的垃圾箱里面,理论上是可以的。但是我们的模块没有实现这个功能,可以还原,可以加回收站。为什么没有实现呢?欧盟法律有要求数据到一定时候必须要删掉。

  【主持人】是不是呼声足就可以把Nova开源?

  【韩轶平】我也希望开源,确实没有理由不开源。有理由不开源的话我想是这个项目包含很多研究的东西。我们是研发院,大部分是在美国做的,雅虎的研究院和雅虎的开发团队一块做的事情,有大量研究内容。不决定是不是最有价值的东西,起到作用是开拓视野,前瞻性看一下,将来大数据处理往哪走。

1
相关文章