《数据密集型应用系统设计》整理09

第十章 批处理系统

A system cannot be successful if it is too strongly influenced by a single person. Once the initial design is complete and fairly robust, the real test begins as people with many different viewpoints undertake their own experiments.
—Donald Knuth

重点:分布式批处理引擎的编程模型:回调函数被设定为无状态,并且除了指定输出值外没有外部可见的任何副作用。这个限制使得框架隐藏了抽象背后的一些困难的分布式系统问题,从而在面对崩溃和网络问题时,可以安全的重试任务,并丢弃任何失败任务的输出。

使用UNIX工具进行批处理

UNIX设计哲学

“当需要以另一种方式处理数据时,我们应该有一些连接程序的方法,就像花园软管互相拧在一起,这就是计算机的I/O。”

  1. 每个程序做好一件事。如果要做新的工作,则建立一个全新的程序。
  2. 每个程序的输出可以成为另外一个尚未确定的程序的输入。不要使用二进制、交互式输入。
  3. 今早尝试设计和构建软件,最好在几周内完成。
  4. 优先使用工具来减轻编程任务。

统一接口:在UNIX中就是文件。
逻辑与布线分离:将输入/输出的布线连接与程序逻辑分开,程序不需要关心输入来自哪里以及输出到哪里。
透明与测试:UNIX输入的文件被视为不可变,所以可以随意运行命令;可以在任何时候结束流水线;可以将流水线某一阶段的输出写入文件,并将该文件用作下一个阶段的输入。

UNIX工具非常有用,但是也有局限:只能在单机运行。所以就有Hadoop这样的工具的工作场景。

MapReduce与分布式文件系统

MapReduce有点像分布在数千台机器上的UNIX工具。

MapReduce中的数据处理:

  1. 读取一组输入文件,并将其分解为记录;
  2. 调用mapper函数从每个输入记录中提取一个键值对。例如提取url,则key是url,value为空;
  3. 按关键字将所有的键值对排序;
  4. 调用reduce函数便利排序后的键值对。

mapper的作用是将数据放入一个适合排序的表单中,reducer的作用则是处理排序好的数据。

-----2019-07-24---11.41.04

MR会尝试在输入文件副本的某台机器上运行mapper任务,这个原理被称为将计算靠近数据。

Map任务的数量由输入文件块的数量决定,而reduce任务的数量则是由作业的作者来配置的。框架使用关键字的哈希值来确定哪个reduce任务接收特定的键值对。

MR框架对工作流并没有任何特殊的支持,所以链接方式是通过目录名隐式完成的。

一条记录与另一条记录存在关联,需要join操作。有以下方法:

Reduce端的join与分组

在reducer端进行join逻辑,mapper负责准备输入数据。
-----2019-07-27---10.04.08

排序-合并join

-----2019-07-27---10.06.12

每个需要join的输入都会由一个join关键字的mapper来处理。通过分区、排序与合并,具有相同关键字的所有记录最终都会进入对reducer的同一调用,在同一个reducer中彼此相邻。然后由这个函数输出join记录

MapReduce编程模型将计算中的物理网络通信部分(从正确的机器获取数据)从应用逻辑(处理数据)中分离出来。

处理数据倾斜

如果join输入中存在热键,则可以使用算法进行补偿。

框架随机抽样出热键或者指定热键,mapper将与热键有关的记录发送到随机选择的若干个reducer中的一个。join的其他输入需要被复制到所有热键相关的reducer上。

汇总:第一个MapReduce阶段将记录随机发送到reducer,第二个MapReduce作业将来自所有第一阶段的reducer的值合并为每个键的单一值。

Map端join

对于reducer端join操作,复制数据到reducer以及合并reducer输入可能会是非常昂贵的操作

如果可以对输入数据进行某些假设,则可以通过使用map端join来加快速度。

广播哈希join

两个join输入中的某一个数据集很小,可以完全载入内存,则不需要分区,完全加载到哈希表中。大数据集的每个分区启动一个mapper,将小数据集的哈希表加载到每个mapper中。然后一次扫描大数据集的一条记录,对每条记录进行哈希表查询。

分区哈希join

如果两个join的输入以相同的方式分区(相同关键字、相同哈希函数和相同数量的分区),则哈希表方法可以独立用于每个分区。

批处理工作流的输出

  1. 生产搜索索引:mapper根据需要对文档集进行分区,每个reducer构建其分区索引。
  2. 批处理输出键值:机器学习或推荐系统等。批处理作业创建一个全新的数据库,并将其作为文件写入分布式文件系统中的作业输出目录。比如HBase。

批处理输出中的哲学:将输入视为不可变,避免副作用。把中间结果写入磁盘,任一个mapper或reducer失败都可以当地重试。相同文件可用作不同作业中的输入。与UNIX类似,MR将逻辑与连线分开。

超越MapReduce

MR是分布式文件系统的一个相当情绪和简单的抽象,然而在性能上表现糟糕。

中间状态实体化

  • MR作业只有在前面作业中的所有任务都完成时才能启动,而UNIX的管道连接的进程可以同时启动,一旦有输出就可以被使用。
  • Mapper通常是冗余的:它们只是读取刚刚由reducer写入的同一个文件,未下一个分区和排序做准备。
  • 冗余复制:将中间状态存储在分布式文件系统中意味着文件被复制到了多个节点。

解决问题:数据流引擎

Spark、Tez、Flink等,把整个工作流作为一个作业来处理。

像MR一样,通过反复调用用户定义的函数在单个线程上一次处理一条记录。与MR不同的是,不需要严格交替map和reduce的角色,而是以更灵活的方式进行组合,成为函数运算符。

数据流引擎提供了多种不同的选项来连接一个运算符的输出到另一个的输入:

  • 通过关键字对记录进行分区,和MR的shuffle一样;
  • 读取若干输入,并以相同的方式进行分区,忽略排序,减少了分区哈希join的时间;
  • 对于广播哈希join,可以将一个运算符的输出发送到join运算符的所有分区。

优点:

  • 排序等计算代价昂贵的任务只在实际需要的节点进行;
  • 没有不必要的map任务,mapper不会更改数据集的分区,所以可以合并到前面的reduce运算符中;
  • 由于工作流中的所有join和数据依赖性都是明确声明的,所以调度器知道哪些数据在哪里是必须的,可以进行本地优化;
  • 运算符之间的中间状态保存在内存中或写入本地磁盘通常就足够了;
  • 运算符可以在输入准备就绪后立即执行;
  • 与MR每个人物启动一个JVM相比,现有的JVM可以被重用来运行新的运算符。
容错

数据流引擎避免将中间状态写入HDFS,如果有机器发生故障,且中间状态丢失,则利用其他可用的数据重新计算:例如之前的中间状态,或者原始输入数据。

为了重新计算,框架必须追踪给定的数据是如何计算的,是用来哪个输入分区,以及哪些运算符。

在重新计算时,计算需要有确定性:给定相同的输入,则运算符必须产生相同的输出。

必须实体化的部分
排序操作不可避免的要消耗全部输入才可以输出。任何需要分类的运算符都需要至少暂时的累积状态,但是工作流的其他部分可以以流水线的方式执行。

高级API和语言

MR是围绕函数回调的思想构建的:对于每个记录或者一组记录,调用由用户定义的函数,并且该函数可以灵活调用任意代码来决定输出。这种思想的优点是,可以利用现有库的大型生态系统来执行数据解析、自然语言分析、图像分析以及运行数值或同级算法等。

轻松运行任意代码是MapReduce之类的批处理系统与MPP数据库的区别所在。

comments powered by Disqus