摘要:在第十三届中国数据库技术大会(DTCC 2022)上,阿里云数据库高级技术专家张浩然重点分享了自研云原生数据仓库AnalyticDB MySQL的高性能存储引擎。
本文内容根据演讲录音以及PPT整理而成。
AnalyticDB MySQL(以下简称ADB)是一个云原生实时数仓,采用云原生技术架构,高度兼容 MySQL 协议。关系型数据库、NoSQL、非结构化数据甚至原始的日志数据等多种数据源都可以通过DTS、DataX等工具同步到ADB中。数据导入ADB 之后,即可拥有数据库的体验,用户可以直接写 SQL 进行相对复杂、高性能、低成本的分析。
AnalyticDB高度兼容 MySQL 协议,因此可以支持非常多的数据应用。研发人员可以自己写 SQL 进行查询分析,也支持丰富的BI报表工具。为了提供云上的一站式体验,ADB还提供了数据管理,包括DMS、DataWorks 等。
上图为ADB整体架构。
底层依赖了云上技术设施,包括ECS、ECS挂载的ESSD云盘以及OSS,OSS主要用于冷数据的存储,降低用户成本。
存储层使用了分层存储,同时支持ESSD和OSS。对于数据的不同业务行为用户可以进行数据冷热分层,选择高性能或低成本的存储。同时,基于 Raft 的同步层保证了数据的高可用和强一致。为了满足不同场景的查询需求,存储层实现了行列混存,支持不同的存储格式。
第三层的XIHE为计算引擎,它具备秒级的弹性拓展能力,无状态,可任意弹出计算节点,目前单集群支持2000+以上节点,并且规模在持续增长中。其次,它为混合负载实现,无论是BSP 还是MPP,都具备良好的支持。
第四层为前端节点,负责协调、负载管理,还负责JDBC协议的支持,用户的insert语句等需要由其进行接入,同时他还包含API、优化器等。
上图为存储层架构。
最上层为JDBC协议的接入层。一个insert into语句由JDBC接入后向下发送,首先会转为 Raft command,通过 Raft 层发送给存储节点。计算层的主要功能是外表的高并发读,读取到的数据会被批量写入到存储节点。
存储节点类似于分库分表的架构,任意表会被均匀地拆到下面的若干个 shard 之上。每个Shard包含两个数据副本和一个日志副本,它并不是完全标准的 Raft ,而是 2 + 1的模式。两个数据副本负责承接写入和查询,日志副本仅参与投票,保证整体高可用的同时也节省了一份数据存储的开销以及一份用户写入的开销。
Shard内部最上层为 query merger,相当于存储层的查询接入,负责接入下推到存储的计算算子。存储引擎内部的数据分为实时数据和历史数据。
实时数据面向写进行优化,具备相对良好的写入能力,它只有数据文件和粗糙索引,不具备复杂精确索引。除此之外,还有版本管理器和delete bit-set,便于修改。实时数据通过build转化为历史数据。历史数据可以认为是经过读优化的数据,具备良好的读性能。
在历史数据中,除了数据文件以外,还有多种类型的索引,包括倒排、BKD、位图等多种类型的索引。构建过程中还进行了数据的冷热分层。
准实时数仓的写入需求一般为高吞吐(日志数据)、低延迟(业务数据),还需要兼顾写入性能以及查询性能。
前端节点为无状态,具备良好的可拓展性,可以任意横向扩展,进行高并发的写入。Raft在相对成规模的生产集群中,通常有数千个 Raft group,互相之间完全独立,相当于数千个Raft Group可同时进行并发写入。
一条 insert 语句从前端节点转成 Raft command,进入 Raft状态机进行消费之后,会转发给同步层的Dispatch queue。每个Raft group 对应一个shard或分库。若用户创建了N个表,每个分库中有N个分表。即使 Raft的并发度足够高,用户分表数也可能更多,因此需要在 Dispatch queue 中进行进一步拆分,使得写并发更高。除此之外, Dispatch queue 还负责内存管理和反压工作,写入之后会进行内存控制和反压,保证不会被写挂,防止影响线上查询。
消费到存储层之后会进行 group commit 操作,在 table engine 前进行攒批。Append only的写模式能够保证非常良好的写入性能。
ADB内部实现了 Snapshot 功能,每隔一段时间会打快照,使产品具备 time travel 查询能力的基础,但 time travel 的功能并目前没未对用户放开。同时,我们会定期将 snapshot 进行刷盘,落盘之后做checkpoint。 checkpoint 可以与raft log进行配合,重启之后可从某个 checkpoint 位点恢复,再消费少量的增量log,做到快速恢复。Snapshot还会作为 build 只读数据源进行异步构建,构建索引和分区。
上图左侧为 Replace 的原子性实现。Replace 语句被拆分为两步:先删除,再 append 。数据从 Raft command 发出后,会先同步消费,再 apply 到 table engine。 table engine 消费完成之后,用户JDBC的insert into value会立即返回,用户得到“返回成功”。此后即可对步骤③已经 apply 的数据进行查询。
但是,Replace 的实际流程为先删除后写入,因此需要进行一定的优化,防止用户的查询跳变。
如上图左下方所示,Client(1)是写入client,当前已经写到第 199 条,下一步可能要做 replace 。Client(2)为读client,要读第100条数据。当前,第100条数据还未被删除,因此可以查询得到。
如果要执行 replace ,则会先将第100条标记为删除,无法再得到查询结果。为了避免该种情况,此时会将第 100 条数据标记在RowIdMap进行删除屏蔽,在第②和③步之间保证查询没有问题。等 append即③ 完成之后,整个 replace 执行完毕,200 条数据均已经存在,查询结果将返回新数据。
以上设计实现了 replace 的原子性保证。
针对下推到存储的计算,我们也进行了一些优化。
首先,DFP(dynamic filter pushdown) 。Hash Join有小表和大表,小表往往会被 build 成 hashtable,大表用于扫描。我们对其进行了优化,优化前提为小表非常小(或过滤之后非常小)且大表有索引。将 hashtable 变成了另一种执行模式,将小表传输到大表侧,变成 in 的算子进行下推,可以直接做二级分析裁剪;其次,得益于精确的索引,单个 in 的索引开销只需在几十毫秒以内,节省了扫描大表的开销,性能也有了提高。
Hash Join的另一优化为local index join。优化的前提条件为具有比较良好的建模,做 Join时的两个表使用了同一级分区键进行一级分区,保证它们分布时是对齐的。 比如一个用户表和订单表同时按照user ID 进行一级分区,同时他们基于 user ID 进行Join。基于以上前提,可以数据完全不走网络,小表在本地直接利用大表的索引进行Index find 找到命中的行,直接实现 Local InnerJoin。
数据文件是典型的 RC File 的实现。Column Entry 记录在 Meta file 里,包含列级统计信息,包含行数有多少个null、最大值、最小值等元信息。
Block entry记录数据Block的基础元信息,包括 min、max以及offset。如果没有定义索引进行精确查找,则会通过min-max进行粗糙集过滤,进而判断是否需要读数据block。如果需要读,则再通过 offset 找到具体的数据block。
在单条记录远超常规大小的前提下,对该字段的Block进行批量加载很容易导致系统OOM。因此,这种情况下,超长字段会存储到独立的数据文件并为每条记录存在一条toast offset。最小的 IO 单元由block自动退化为value。
我们支持多种索引,此外还有分区裁剪等策略来减少以计算量。在索引的选择上,支持目前列级的索引。用户仅需对每一列选择是否单独建索引,无需感知索引类型,能够根据用户的数据类型和数据特征自动化构建索引。此外为了使用户使用简单,也无需构建任何组合索引。
对于任何查询,ADB都会将存储侧复杂查询拆分为不同的查询路径。如上图以 id= 123 and city in 'hangzhou' 为例,存储引擎会先对两个查询条件在索引内进行独立查询,此后将结果集进行取交;如果为not,则取差。最终进行多路归并,取到结果集,即为最终的查询结果,无需构建组合索引。
该方案存在的主要问题为某些索引命中率非常高导致索引效率比较低,因此存储引擎内部也实现了一条基于代价的执行计划选择器自适应选择是否使用索引。
实时数据主要面向写优化,历史数据主要面向读优化, build 能够将这两种数据进行合并产生新的历史数据。合并过程会进行面向查询的建模,包括分区、排序、构建索引、收集统计信息,是CPU密集且IO密集型的操作。
对于单机存储引擎,build除了构建索引、分区、排序等建模之外,需要兼顾实时数仓的查询和写入性能,因此,我们实现了以下三个能力:
第一,基于多版本控制的细粒度锁构建过程。实时数据负责承接写入,实时数据和历史数据共同承接查询。单存储引擎收到build命令后会进行split,将实时数据和历史数据切分为 ReadonlySnapshot,还构建了新的实时数据和 delete manager。ROSnapshot作为数据源进行异步构建,新的实时引擎和 delete manager 用于承接写入。切出的ROSnapshot、实时数据以及 delete manager共同承接查询。ROSnapshot 异步构建的过程对查询和写入完全没有受到任何影响。
Delete manager是对 ROSnapshot的删除标记,即对新的历史数据的删除标记。但因为此时还在构建过程中,新的历史数据还未产生,因此先用delete manager 记录删除。异步构建执行完成之后,会将新的历史数据替换掉ROSnapshot。替换之前会对delete操作进行进行replay 。替换完成之后,也意味着新的历史数据包括其数据建已经完成。
另外,我们对构建过程的查询和写入也进行了拆解。如果需要对历史数据进行修改,一定会在历史数据里对ROSnapshot标记为删除。一个delete会记录两份,一份是Bitset,方便查询后做归并,纯内存,因此查询性能较高;另一份为PK Entry,是 log的格式方便顺序读写。
实时数据不牵扯到对历史数据的修改,因此直接进行追加写即可。
ADB整体是一个分库分表、多副本的架构。因此一个表的 build 任务会有非常多的子任务(分库数 * 副本数)。考虑到可拓展性和可运维性,目前我们选择使用FN做全局任务管理,leader节点生成全局plan,方便运维。
同时,我们也在做另一种调度方式,每个 Raft leader 进行独立管理。其优点在于拆得足够散,调配更加灵活。而且 Raft leader在存储节点可以获取到更全面、更实时的信息,包括行数、 worker 的负载等。当然,这也带来了一定的运维复杂度。
除此之外,还以 Raft作为控制链路,因为控制链路必然需要高可靠的保证。我们需要副本间协同工作,如果没有高可靠保证,则副本间很可能不一致,导致现场处理非常复杂。相当于直接复用了 Raft 的数据链路作为控制链路。以保证任务控制的 command 一定能收到,且副本间一致。
Split 之后,只有leader会进入真正的 merge 状态,进行扫描、构建索引、构建分区,然后将扫描结果上传到DFS。上传结束之后,再看follower,leader 在merge时 follow 会进入 waiting 状态,直到被告知 leader 已经结束任务,将构建完成的热数据下载到本地。如果是冷数据,会直接应用层面的link。全部完成之后,leader和follower同时切换上线。
一个ECS 往往分布着多个Raft成员,因此只要保证 shard的 leader 足够均匀,即可保证每个机器的负载基本相同。
数据版本管理与build 紧密联系,基本基于 build 来实现。
实时数仓的一个典型使用场景为从 TP 库同步数据到 AP 库,用户希望 TP 库的所有数据更新都能实时的在 AP 库上得到体现。现在假设一个场景: 用户在 TP 库上执行了一条DDL,该DDL在TP可能耗时数十分钟。其后立即更新了大量数据并期望 AP 侧能够立即对这些数据进行查询。在该场景下要求DDL能够在AP侧做到准实时或毫秒级执行。
基于以上需求,我们首先进行了毫秒级的逻辑适配。逻辑适配完成之后,存储引擎可以接收新模式下的写入和查询请求,此时数据并没有得到物理上的修改。之后在通过build过程,对数据文件依据DDL进行真实的物理变更。执行时,若将全量数据进行重建会消耗大量时间以及资源,因此仅对有修改的分区进行重建。同时,为了满足客户不同的业务场景,也支持用户通过force partition指定重建区分区。
分区管理主要包括生命周期管理以及冷热分区转化。
见上图右侧在shard1 的定义里,lifecycle=3表示有三个分区,hot window=2表示有两个热分区。v100里有7-31、7-30、7-29三天的数据,其中7-29的为冷数据。8-1的实时数据写入后,执行build,将7-29的数据淘汰,产生8-1的新的历史数据。同时,要进行冷热转换,将7-30的数据从热数据转换为冷数据。
转换完成之后,为了进一步保证主从副本的一致性,leader做完裁决之后会将裁决信息作为 layout file 传到 之上,从副本进行apply,在复用了 build 的结果的同时也复用了整体分区管理的决策。
冷热的转化本质是分区的存储介质变化。本地存储介质一般为ESSD,上传后一般为 OSS。 数据从本地到DFS 并不是仅进行简单的上传即可进行高效的查询。针对远程数据文件的管理,我们也做了一系列的优化。
如上图左侧所示,上层为存储引擎,下层为DFS。从存储引擎到DFS,首先会经过SSD Buffer。DFS对小文件的读写并不友好,比如常见的索引构建有很多类似于外排的操作涉及大量的随机读写。若此类操作直接打穿到 DFS 上,则 导致 IOPS 非常高,对 DFS 非常不友好。因此,我们实现了SSD Buffer,先在本地聚合,将需要预处理的数据在本地完成构建。之后,将随机的小文件读写转换成流式的、批量的、高吞吐的顺序写上传到 DFS。
写的过程中,会经过 Tar FileSystem进行打包并增加一个cache,该 cache 为tar文件内子文件路径到该文件在tar内的位置的映射。Index 置于文件尾部,一旦打开文件则会将Index加载到 cache 中。cache的优势在于,在打开某个子文件时,可以少读一次元信息,同时,使得Meta类的操作不需要再读远程,而是可以直接在本地处理,对文件的meta类操作性能有显著提升。
下面的ADB FileSystem Interface是统一的文件接口层,能够屏蔽下层存储的远程实现。存储引擎只感知通用文件接口,ADB FileSystem Interface会进行具体转换自适应的操作远程文件存储或对象存储。
读取时,经过Tar FileSystem和ADB FileSystem Interface,会有SSD Cache,做了本地文件块到远程文件的映射,能够深度感知IO模型,IO模型可以分为三类:
第一类,Meta类操作 ,比如获取block位置信息等导致的随机读。
第二类,Query ,分为 index search(随机读)和data cursor(高吞吐的数据扫描)。
第三类,build,高吞吐的顺序读。
SSD Cache针对以上三种类型分别分配了独立的cache,主要包括独立的磁盘空间管理、独立的淘汰队列、独立的 block size,彼此互不干扰。引擎侧向下发 query 时,会携带 hint 信息,用于判断应该使用哪种cache。
如果发生了 cache miss,会先经过 Perfetch Service,它与IO模型紧密相连,能够感知 query 的plan,可以并发地进行预取,进一步加快对远程文件的读取性能。
内存控制主要防止 query 过于复杂,导致查询负载较高,最终导致整体存储节点的负载过高。